Skip to content

Commit c6f5f11

Browse files
author
Jerjou Cheng
committed
Draft 1: indefinitely-long streaming transcription
1 parent bf1e0d7 commit c6f5f11

File tree

1 file changed

+250
-0
lines changed

1 file changed

+250
-0
lines changed
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2017 Google Inc. All Rights Reserved.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
17+
"""Google Cloud Speech API sample application using the streaming API.
18+
19+
NOTE: This module requires the additional dependency `pyaudio`. To install
20+
using pip:
21+
22+
pip install pyaudio
23+
24+
Example usage:
25+
python transcribe_streaming_mic.py
26+
"""
27+
28+
# [START import_libraries]
29+
from __future__ import division
30+
31+
import collections
32+
import itertools
33+
import re
34+
import sys
35+
36+
from google.cloud import speech
37+
from google.cloud.speech import enums
38+
from google.cloud.speech import types
39+
from google import gax
40+
import grpc
41+
import pyaudio
42+
from six.moves import queue
43+
# [END import_libraries]
44+
45+
# Audio recording parameters
46+
RATE = 16000
47+
CHUNK = int(RATE / 10) # 100ms
48+
49+
50+
class MicrophoneStream(object):
51+
"""Opens a recording stream as a generator yielding the audio chunks."""
52+
def __init__(self, rate, chunk_size, max_replay_secs=5):
53+
self._rate = rate
54+
self._chunk_size = chunk_size
55+
self._max_replay_secs = max_replay_secs
56+
57+
# Create a thread-safe buffer of audio data
58+
self._buff = queue.Queue()
59+
self.closed = True
60+
61+
def __enter__(self):
62+
num_channels = 1
63+
self._audio_interface = pyaudio.PyAudio()
64+
self._audio_stream = self._audio_interface.open(
65+
format=pyaudio.paInt16,
66+
# The API currently only supports 1-channel (mono) audio
67+
# https://goo.gl/z757pE
68+
channels=num_channels, rate=self._rate,
69+
input=True, frames_per_buffer=self._chunk_size,
70+
# Run the audio stream asynchronously to fill the buffer object.
71+
# This is necessary so that the input device's buffer doesn't
72+
# overflow while the calling thread makes network requests, etc.
73+
stream_callback=self._fill_buffer,
74+
)
75+
76+
self.closed = False
77+
78+
bytes_per_sample = 2 * num_channels # 2 bytes in 16 bit samples
79+
self._bytes_per_second = self._rate * bytes_per_sample
80+
81+
bytes_per_chunk = (self._chunk_size * bytes_per_sample)
82+
chunks_per_second = self._bytes_per_second / bytes_per_chunk
83+
self._untranscribed = collections.deque(
84+
maxlen=self._max_replay_secs * chunks_per_second)
85+
86+
return self
87+
88+
def __exit__(self, type, value, traceback):
89+
self._audio_stream.stop_stream()
90+
self._audio_stream.close()
91+
self.closed = True
92+
# Signal the generator to terminate so that the client's
93+
# streaming_recognize method will not block the process termination.
94+
self._buff.put(None)
95+
self._audio_interface.terminate()
96+
97+
def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
98+
"""Continuously collect data from the audio stream, into the buffer."""
99+
self._buff.put(in_data)
100+
return None, pyaudio.paContinue
101+
102+
def on_transcribe(self, end_time):
103+
while self._untranscribed and end_time > self._untranscribed[0][1]:
104+
self._untranscribed.popleft()
105+
106+
def generator(self, resume=False):
107+
total_bytes_sent = 0
108+
if resume:
109+
# Yield all the untranscribed chunks first
110+
for chunk, _ in self._untranscribed:
111+
yield chunk
112+
while not self.closed:
113+
# Use a blocking get() to ensure there's at least one chunk of
114+
# data, and stop iteration if the chunk is None, indicating the
115+
# end of the audio stream.
116+
chunk = self._buff.get()
117+
if chunk is None:
118+
return
119+
data = [chunk]
120+
121+
# Now consume whatever other data's still buffered.
122+
while True:
123+
try:
124+
chunk = self._buff.get(block=False)
125+
if chunk is None:
126+
return
127+
data.append(chunk)
128+
except queue.Empty:
129+
break
130+
131+
byte_data = b''.join(data)
132+
133+
# Populate the replay buffer of untranscribed audio bytes
134+
total_bytes_sent += len(byte_data)
135+
chunk_end_time = total_bytes_sent / self._bytes_per_second
136+
self._untranscribed.append((byte_data, chunk_end_time))
137+
138+
yield byte_data
139+
# [END audio_stream]
140+
141+
142+
def duration_to_secs(duration):
143+
return duration.seconds + (duration.nanos / float(1e9))
144+
145+
146+
def listen_print_loop(responses, stream):
147+
"""Iterates through server responses and prints them.
148+
149+
The responses passed is a generator that will block until a response
150+
is provided by the server.
151+
152+
Each response may contain multiple results, and each result may contain
153+
multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we
154+
print only the transcription for the top alternative of the top result.
155+
156+
In this case, responses are provided for interim results as well. If the
157+
response is an interim one, print a line feed at the end of it, to allow
158+
the next result to overwrite it, until the response is a final one. For the
159+
final one, print a newline to preserve the finalized transcription.
160+
"""
161+
num_chars_printed = 0
162+
for response in responses:
163+
if not response.results:
164+
continue
165+
166+
# The `results` list is consecutive. For streaming, we only care about
167+
# the first result being considered, since once it's `is_final`, it
168+
# moves on to considering the next utterance.
169+
result = response.results[0]
170+
if not result.alternatives:
171+
continue
172+
173+
top_alternative = result.alternatives[0]
174+
# Display the transcription of the top alternative.
175+
transcript = top_alternative.transcript
176+
177+
# Display interim results, but with a carriage return at the end of the
178+
# line, so subsequent lines will overwrite them.
179+
#
180+
# If the previous result was longer than this one, we need to print
181+
# some extra spaces to overwrite the previous result
182+
overwrite_chars = ' ' * (num_chars_printed - len(transcript))
183+
184+
if not result.is_final:
185+
sys.stdout.write(transcript + overwrite_chars + '\r')
186+
sys.stdout.flush()
187+
188+
num_chars_printed = len(transcript)
189+
else:
190+
print(transcript + overwrite_chars)
191+
192+
# Exit recognition if any of the transcribed phrases could be
193+
# one of our keywords.
194+
if re.search(r'\b(exit|quit)\b', transcript, re.I):
195+
print('Exiting..')
196+
break
197+
198+
num_chars_printed = 0
199+
200+
# Keep track of what transcripts we've received, so we can resume
201+
# intelligently when we hit the deadline
202+
stream.on_transcribe(duration_to_secs(
203+
top_alternative.words[-1].end_time))
204+
205+
206+
def main():
207+
# See http://g.co/cloud/speech/docs/languages
208+
# for a list of supported languages.
209+
language_code = 'en-US' # a BCP-47 language tag
210+
211+
client = speech.SpeechClient()
212+
config = types.RecognitionConfig(
213+
encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16,
214+
sample_rate_hertz=RATE,
215+
language_code=language_code,
216+
max_alternatives=1,
217+
enable_word_time_offsets=True)
218+
streaming_config = types.StreamingRecognitionConfig(
219+
config=config,
220+
interim_results=True)
221+
222+
with MicrophoneStream(RATE, CHUNK) as stream:
223+
resume = False
224+
while True:
225+
audio_generator = stream.generator(resume=resume)
226+
requests = (types.StreamingRecognizeRequest(audio_content=content)
227+
for content in audio_generator)
228+
229+
responses = client.streaming_recognize(
230+
streaming_config, requests,
231+
options=gax.CallOptions(timeout=(60 * 4)))
232+
233+
try:
234+
# Now, put the transcription responses to use.
235+
listen_print_loop(responses, stream)
236+
break
237+
except grpc.RpcError, e: # TODO: wrong exception
238+
if e.code() != grpc.StatusCode.INVALID_ARGUMENT:
239+
raise
240+
241+
details = e.details()
242+
if 'deadline too short' not in details:
243+
raise
244+
245+
print('Resuming..')
246+
resume = True
247+
248+
249+
if __name__ == '__main__':
250+
main()

0 commit comments

Comments
 (0)