-
Notifications
You must be signed in to change notification settings - Fork 0
/
my_live_asr.py
163 lines (135 loc) · 6.15 KB
/
my_live_asr.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# Ein komplexeres Beispiel zur automatischen Spracherkennung und Textvisualisierung
# mit der Google Cloud Speech Recognition.
# Das Skript läuft im Terminal.
from __future__ import division
from google.cloud import speech
import re
import sys
import pyaudio
import queue
#Aufzeichnungsparameter
RATE = 16000 #16 khz
CHUNK = int(RATE / 10) #100ms
language_code = "de-DE" #Erkennung folgt der deutschen Sprache
class MicrophoneStream(object):
# Öffnet einen Aufnahmestream als Generator, der die Audioblöcke erzeugt.
def __init__(self, rate, chunk):
self._rate = rate
self._chunk = chunk
# Puffer für die Audio-Daten erstellen.
self._buff = queue.Queue()
self.closed = True
def __enter__(self):
self._audio_interface = pyaudio.PyAudio()
self._audio_stream = self._audio_interface.open(
format=pyaudio.paInt16,
# Die API unterstützt derzeit nur 1-Kanal-Audio (Mono)
# https://goo.gl/z757pE
channels=1,
rate=self._rate,
input=True,
frames_per_buffer=self._chunk,
# Der audio stream wird asynchron ausgeführt, um den Puffer zu füllen.
# Das ist notwendig, damit der Puffer des Eingabegeräts nicht
# überläuft, während der aufrufende Thread bspw. Netzwerkanfragen stellt.
stream_callback=self._fill_buffer,
)
self.closed = False
return self
def __exit__(self, type, value, traceback):
# Schließt den audio stream
self._audio_stream.stop_stream()
self._audio_stream.close()
self.closed = True
self._buff.put(None)
self._audio_interface.terminate()
def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
# Sammelt fortlaufend Daten aus dem audio stream im Puffer.
self._buff.put(in_data)
return None, pyaudio.paContinue
def generator(self):
# So lange der audio stream offen ist werden gepufferte Daten verarbeitet.
while not self.closed:
# Blockierendes get(), um sicherzustellen, dass mindestens einen DatenChunk
# vorhanden ist, und stoppen der Iteration, wenn der Chunk None ist.
# Ist der Chunk None, wird das Ende des audio streams angezeigt.
chunk = self._buff.get()
if chunk is None:
return
data = [chunk]
# Jetzt werden alle anderen Daten verarbeitet, die noch gepuffert sind.
while True:
try:
chunk = self._buff.get(block=False)
if chunk is None:
return
data.append(chunk)
except queue.Empty:
break
yield b"".join(data)
def listen_print_loop(responses):
# Iteriert durch die Serverantworten und gibt sie aus.
# Die übermittelte Serverantwort ist ein Generator, der solange blockiert,
# bis eine Antwort vom Server geliefert wird. Jede Antwort kann mehrere Ergebnisse enthalten.
# Jedes Ergebnis kann mehrere Alternativen enthalten (mehr unter https://goo.gl/tjCPAU).
# In diesem Beispiel wird nur die Transkription für die oberste Alternative des
# obersten Ergebnisses zurückgegeben. Es werden auch Antworten für Zwischenergebnisse geliefert.
# Wenn die Antwort eine Zwischenantwort ist, wird am Ende ein Zeilenvorschub gedruckt,
# damit damit das nächste Ergebnis es überschreiben kann, bis die Antwort eine finale ist.
# Für die finale Antwort wird ein Zeilenumbruch gedruckt,
# damit die endgültige Transkription erhalten bleibt.
num_chars_printed = 0
for response in responses:
if not response.results:
continue
# Die result-Liste ist fortlaufend. Für den Stream interessiert uns nur
# das erste Ergebnis, das berücksichtigt wird, denn sobald es `is_final` ist,
# wird die nächste Äußerung verarbeitet.
result = response.results[0]
if not result.alternatives:
continue
# Zeigt die erste Alternative an
transcript = result.alternatives[0].transcript
# Zwischenergebnisse anzeigen, aber mit einem Carriage Return ('\r')
# am Ende der Zeile, so dass nachfolgende Zeilen diese überschreiben.
# Wenn das vorherige Ergebnis länger war als dieses, müssen
# einige zusätzliche Leerzeichen gesetzt werden,
# um das vorherige Ergebnis zu überschreiben.
overwrite_chars = " " * (num_chars_printed - len(transcript))
if not result.is_final:
sys.stdout.write(transcript + overwrite_chars + "\r")
sys.stdout.flush()
num_chars_printed = len(transcript)
else:
print(transcript + overwrite_chars)
# Wird das Wort exit oder tschüss erkannt, bricht das Skript ab
if re.search(r"\b(quit|tschüss)\b", transcript, re.I):
print("Bis bald!...")
break
num_chars_printed = 0
# Main Loop, hier wird das Skript gestartet.
# Der Main Loop wird verwendet, um auf Ereignisse zu warten und auf sie zu reagieren.
# Dies können Benutzereingaben, Systemereignisse oder andere Arten von Ereignissen sein,
# die in der Anwendung auftreten.
# Der Main Loop überprüft regelmäßig auf neue Ereignisse und führt entsprechende Aktionen
# oder Funktionen aus, um auf diese Ereignisse zu reagieren.
def main():
client = speech.SpeechClient()
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=RATE,
language_code=language_code,
)
streaming_config = speech.StreamingRecognitionConfig(
config=config, interim_results=True
)
with MicrophoneStream(RATE, CHUNK) as stream:
audio_generator = stream.generator()
requests = (
speech.StreamingRecognizeRequest(audio_content=content)
for content in audio_generator
)
responses = client.streaming_recognize(streaming_config, requests)
listen_print_loop(responses)
if __name__ == "__main__":
main()