import logging
import threading
from speech_recognition import Recognizer, Microphone
import speech_recognition as sr
import openai
import rospy
# 로깅 설정
# logging.basicConfig(level=rospy.loginfo, format='%(asctime)s - %(levelname)-7s : %(message)s\n')
class Google_STT:
def __init__(self, lang="ko-KR"):
self.r = Recognizer()
self.mic = Microphone()
self.trigger_words = ["이루", "루머", "헬로", "Hello", "안녕"]
self.lang = lang
self.qna_flag = True
def set_qna_flag(self, qna_flag):
self.qna_flag = qna_flag
def setup_mic(self):
with self.mic as source:
rospy.loginfo("주변 소음을 측정합니다.")
self.r.adjust_for_ambient_noise(source)
rospy.loginfo("소음 측정 완료.")
def listen(self, stop_listening_event):
while not stop_listening_event.is_set() and self.qna_flag:
try:
with self.mic as source:
rospy.loginfo("호출 명령어를 탐지하는 중...")
audio = self.r.listen(source, timeout=4, phrase_time_limit=2)
self.audio_queue.append(audio)
except sr.WaitTimeoutError:
pass # 타임아웃 발생 시 무시하고 계속 진행
def recognize(self, stop_listening_event):
while not stop_listening_event.is_set() and self.qna_flag :
if self.audio_queue:
audio = self.audio_queue.pop(0)
try:
text = self.r.recognize_google(audio, language=self.lang)
rospy.loginfo("[수집된 음성]: {}".format(text))
if any(trigger_word in text for trigger_word in self.trigger_words):
stop_listening_event.set()
self.triggered = text
except sr.UnknownValueError:
rospy.loginfo("호출 명령어가 없습니다.")
except Exception as e:
logging.error("Error: {}".format(e))
def listen_for_trigger(self):
self.audio_queue = []
self.triggered = None
stop_listening_event = threading.Event()
listen_thread = threading.Thread(target=self.listen, args=(stop_listening_event,))
recognize_thread = threading.Thread(target=self.recognize, args=(stop_listening_event,))
listen_thread.start()
recognize_thread.start()
listen_thread.join()
recognize_thread.join()
return self.triggered is not None and self.qna_flag
def listen_for_task(self):
try:
with self.mic as source:
rospy.loginfo("Task을 듣는 중...")
audio = self.r.listen(source, timeout=6, phrase_time_limit=5)
task_text = self.r.recognize_google(audio, language=self.lang)
rospy.loginfo("[수집된 Task 내용]: {}".format(task_text))
return task_text
except sr.UnknownValueError:
rospy.loginfo("음성 결과를 얻지 못했습니다.")
return None
except Exception as e:
logging.error("Error: {}".format(e))
return False
class STT_Agent(Google_STT):
def __init__(self, lang="ko-KR", guide=False):
self.guide = guide
super().__init__(lang)
self.destination_list = ['건설공학관','시대융합관', '시융관', '창의공학관', '제2공학관', '제이공학관']
self.PROMPT_FOR_SYSTEM = f"한국어로 대화합니다. 당신의 역할은 사용자의 질문에 대해 대답을 생성하는 것이 아닌 사용자의 질문을 3가지 경우로 필터링하는 것입니다.\
1. 사용자가 빨리 가라고 요청하면 'SF'라고만 대답해주세요.\
2. 사용자가 천천히 가라고 요청하면 'SL'라고만 대답해주세요.\
3. 그 외의 모든 경우, 사용자가 물어본 내용을 앵무새처럼 'Q:사용자의 질문' 형태로 똑같이 대답해주세요.\
'Q:사용자의 질문'은 서울시립대학교 캠퍼스 홍보 LLM 모델에 질문으로 입력되어 대답을 생성하는데 사용됩니다.\
만약 학교와 연관성이 전혀 없는 질문이거나 질문이 없거나 도덕적으로 부적절한 질문이라면, 'No'라고 대답해주세요. "
self.PROMPT_FOR_GUIDE = f"한국어로 대화합니다. 당신의 사용자가 가고 싶어하는 목적지를 똑같이 대답해야 한다.\
너가 알고 있는 목적지는 '건설공학관','시대융합관', '창의공학관', '제2공학관', '제이공학관' 뿐이다. \
만약 사용자가 가고 싶어하는 목적지가 너가 아는 곳이면 해당 목적지를 대답. \
그러나 사용자가 너가 모르는 목적지를 가고 싶어하면 너는 무조건 '죄송합니다'이라고만 대답."
def set_guide(self, guide):
self.guide = guide
def process_user_input(self, messages, model="gpt-3.5-turbo", temp=0):
response = openai.ChatCompletion.create(
model=model,
temperature=temp,
messages=messages
)['choices'][0]['message']['content']
return response
def filtering_task(self, predicted_text):
if predicted_text == None:
predicted_text = '없음'
if self.guide == True:
print("I am guide now~~~~~~~~")
messages = [
{"role": "system", "content": self.PROMPT_FOR_GUIDE},
{"role": "user", "content": "시대융합관이 어디야?"},
{"role": "assistant", "content": "시대융합관"},
{"role": "user", "content": "건공관에 데려다줘"},
{"role": "assistant", "content": "건공관"},
{"role": "user", "content": "창공관으로 가"},
{"role": "assistant", "content": "창공관"},
{"role": "user", "content": "정문으로 가줘"},
{"role": "assistant", "content": "없음"},
{"role": "user", "content": predicted_text},
]
filtered_task = self.process_user_input(messages)
return filtered_task
else:
print("I am tour now~~~~~~~~")
messages = [
{"role": "system", "content": self.PROMPT_FOR_SYSTEM},
{"role": "user", "content": "동아리는 뭐가 있어?"},
{"role": "assistant", "content": "Q:동아리는 뭐가 있어?"},
{"role": "user", "content": "속도 줄여줘"},
{"role": "assistant", "content": "SL"},
{"role": "user", "content": predicted_text},
]
filtered_task = self.process_user_input(messages)
return filtered_task
if __name__ == "__main__":
stt_agent = STT_Agent()
stt_agent.setup_mic()
if stt_agent.listen_for_trigger():
task = stt_agent.listen_for_task()
if task:
rospy.loginfo(f"Task recognized: {task}")
else:
rospy.loginfo("No task was recognized after trigger.")
else:
rospy.loginfo("No trigger word detected.")
Bash
복사