-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
142 lines (115 loc) · 5.47 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
"""
This is the main file where Pepper will communicate with a server.
First, Pepper will send an audio file it's recorded of someone speaking. The server will then return that audio
after it has been turned to text, and given to an AI model instructed to be Pepper.
Pepper will then say the response, and the cycle will repeat, until the AI believes the conversation has ended.
Each AI response will be tied to an emotion and depending on this emotion, Pepper will randomly select from a list,
an animation to perform based on this emotion.
The hardest part of this is the way Pepper determines when someone has finished talking.
To do this, Pepper will keep recording until the volume of the audio it's picking up is below a certain level for
around 3 - 5 seconds.
Lachlan Paul, 2024
"""
import os
import random
import sys
import time
import naoqi
import requests
from naoqi import ALProxy, ALBroker
from dotenv import load_dotenv
class PepperGPT(naoqi.ALModule):
def __init__(self, ip, port, whisper_server, server_has_passcode, param):
self.ip = ip
self.port = port
self.recording_location = "home/nao/PepperGPT/temp_recording/temp.wav"
# This should be the link to your server where the text is transcribed and then fed to GPT.
self.whisper_server = whisper_server
self.server_has_passcode = server_has_passcode
if self.server_has_passcode:
load_dotenv()
self.SERVER_PASSCODE = os.getenv("PASSCODE")
self.broker = ALBroker("broker", "0.0.0.0", 0, self.ip, int(self.port))
naoqi.ALModule.__init__(self, param)
self.memory = ALProxy("ALMemory")
self.speech_recognition = ALProxy("ALSpeechRecognition", self.ip, self.port)
self.audio_recorder = ALProxy("ALAudioRecorder", self.ip, self.port)
# TODO: Fil the Big-Fat-List-Of-Animation-Names(tm) with animation names (what else?)
# Big-Fat-List-Of-Animation-Names(tm)
self.EMOTIONS = {
"HAPPY": ["placeholder"],
"SAD": ["placeholder"],
"CONFUSED": ["placeholder"],
"SORRY": ["placeholder"],
"ANGRY": ["placeholder"],
"GREETING": ["placeholder"],
"END": ["placeholder"]
}
def start(self):
# A current limitation is that Pepper only begins recording after they hear speech,
# so it will wait for the current speech to stop, then start recording.
# I have not yet figured out a workaround for this.
self.speech_recognition.subscribe("Test_ASR")
self.memory.subscribeToEvent("WordRecognized", self.getName(), "processRemote")
print("---Started!---")
def stop(self):
self.speech_recognition.unsubscribe("Test_ASR")
self.broker.shutdown()
self.memory.unsubscribeToEvent("WordRecognized", self.getName())
print("---Stopped!---")
def run(self):
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Interrupted by user, shutting down")
self.stop()
sys.exit(0)
def processRemote(self, signalName, message):
self.record_audio()
response = self.upload_audio()
self.say_response(response)
def record_audio(self):
# TODO: Make this record for as long as it can detect sound above a certain level.
print("---Recording audio---")
self.audio_recorder.startMicrophonesRecording(self.recording_location, "wav", 16000, (0, 0, 1, 0))
time.sleep(10)
print("---Finished recording audio---")
self.audio_recorder.stopMicrophonesRecording()
def upload_audio(self):
# Uploads the audio file to the server url set in the init phase
with open(self.recording_location, 'rb') as audio_file:
files = {"audio_file": audio_file}
# My server has a passcode, and yours should too, to avoid people leeching off of your GPT API.
# Make sure to hide the passcode in a .env file.
if self.server_has_passcode:
headers = {"Passcode": self.SERVER_PASSCODE}
else:
headers = {}
# Gets the text response from the server
return requests.post(self.transcription_gpt_server, files=files, headers=headers).text
def say_response(self, response):
"""
Says the text, and if found, plays an animation related to an animation.
:param response: the text to say. should have an emotion tied to the end, eg; "I am happy! | HAPPY"
"""
try:
emotion, response = response.split("|")
for feeling in self.EMOTIONS.keys():
if feeling == emotion:
animation_to_play = random.choice(self.EMOTIONS[feeling])
# TODO: Make this the correct path
# self.ANIMATION.run(animation_to_play)
self.TTS.say(response)
except RuntimeError:
# TODO: Place animation here. Thinking or shrugging
self.TTS.say("Hmm, it seems I'm not connected to the internet, check my wifi connection, or let my "
"programmer know.")
if __name__ == "__main__":
ip = "10.174.154.14"
port = 9559
whisper_server = "http://127.0.0.1:5000/upload"
server_has_passcode = True
pepper_gpt = PepperGPT(ip, port, whisper_server, server_has_passcode, "pepper_gpt")
pepper_gpt.start()
pepper_gpt.run()