Skip to content

Commit 1c71119

Browse files
author
Jerjou Cheng
committed
Clean up & refactor of indefinite speech transcrib
1 parent c6f5f11 commit 1c71119

File tree

2 files changed

+143
-154
lines changed

2 files changed

+143
-154
lines changed

speech/cloud-client/transcribe_streaming_indefinite.py

Lines changed: 119 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,13 @@
2828
# [START import_libraries]
2929
from __future__ import division
3030

31+
import argparse
3132
import collections
3233
import itertools
3334
import re
3435
import sys
36+
import threading
37+
import time
3538

3639
from google.cloud import speech
3740
from google.cloud.speech import enums
@@ -40,64 +43,32 @@
4043
import grpc
4144
import pyaudio
4245
from six.moves import queue
46+
import six
47+
48+
import transcribe_streaming_mic
4349
# [END import_libraries]
4450

45-
# Audio recording parameters
46-
RATE = 16000
47-
CHUNK = int(RATE / 10) # 100ms
51+
52+
def duration_to_secs(duration):
53+
return duration.seconds + (duration.nanos / float(1e9))
4854

4955

50-
class MicrophoneStream(object):
56+
class ResumableMicrophoneStream(transcribe_streaming_mic.MicrophoneStream):
5157
"""Opens a recording stream as a generator yielding the audio chunks."""
5258
def __init__(self, rate, chunk_size, max_replay_secs=5):
53-
self._rate = rate
54-
self._chunk_size = chunk_size
59+
super(ResumableMicrophoneStream, self).__init__(rate, chunk_size)
5560
self._max_replay_secs = max_replay_secs
5661

57-
# Create a thread-safe buffer of audio data
58-
self._buff = queue.Queue()
59-
self.closed = True
60-
61-
def __enter__(self):
62-
num_channels = 1
63-
self._audio_interface = pyaudio.PyAudio()
64-
self._audio_stream = self._audio_interface.open(
65-
format=pyaudio.paInt16,
66-
# The API currently only supports 1-channel (mono) audio
67-
# https://goo.gl/z757pE
68-
channels=num_channels, rate=self._rate,
69-
input=True, frames_per_buffer=self._chunk_size,
70-
# Run the audio stream asynchronously to fill the buffer object.
71-
# This is necessary so that the input device's buffer doesn't
72-
# overflow while the calling thread makes network requests, etc.
73-
stream_callback=self._fill_buffer,
74-
)
75-
76-
self.closed = False
62+
# Some useful numbers
63+
# 2 bytes in 16 bit samples
64+
self._bytes_per_sample = 2 * self._num_channels
65+
self._bytes_per_second = self._rate * self._bytes_per_sample
7766

78-
bytes_per_sample = 2 * num_channels # 2 bytes in 16 bit samples
79-
self._bytes_per_second = self._rate * bytes_per_sample
80-
81-
bytes_per_chunk = (self._chunk_size * bytes_per_sample)
82-
chunks_per_second = self._bytes_per_second / bytes_per_chunk
67+
self._bytes_per_chunk = (self._chunk_size * self._bytes_per_sample)
68+
self._chunks_per_second = (
69+
self._bytes_per_second / self._bytes_per_chunk)
8370
self._untranscribed = collections.deque(
84-
maxlen=self._max_replay_secs * chunks_per_second)
85-
86-
return self
87-
88-
def __exit__(self, type, value, traceback):
89-
self._audio_stream.stop_stream()
90-
self._audio_stream.close()
91-
self.closed = True
92-
# Signal the generator to terminate so that the client's
93-
# streaming_recognize method will not block the process termination.
94-
self._buff.put(None)
95-
self._audio_interface.terminate()
96-
97-
def _fill_buffer(self, in_data, frame_count, time_info, status_flags):
98-
"""Continuously collect data from the audio stream, into the buffer."""
99-
self._buff.put(in_data)
100-
return None, pyaudio.paContinue
71+
maxlen=self._max_replay_secs * self._chunks_per_second)
10172

10273
def on_transcribe(self, end_time):
10374
while self._untranscribed and end_time > self._untranscribed[0][1]:
@@ -106,145 +77,160 @@ def on_transcribe(self, end_time):
10677
def generator(self, resume=False):
10778
total_bytes_sent = 0
10879
if resume:
80+
# Make a copy, in case on_transcribe is called while yielding them
81+
catchup = list(self._untranscribed)
10982
# Yield all the untranscribed chunks first
110-
for chunk, _ in self._untranscribed:
83+
for chunk, _ in catchup:
11184
yield chunk
112-
while not self.closed:
113-
# Use a blocking get() to ensure there's at least one chunk of
114-
# data, and stop iteration if the chunk is None, indicating the
115-
# end of the audio stream.
116-
chunk = self._buff.get()
117-
if chunk is None:
118-
return
119-
data = [chunk]
120-
121-
# Now consume whatever other data's still buffered.
122-
while True:
123-
try:
124-
chunk = self._buff.get(block=False)
125-
if chunk is None:
126-
return
127-
data.append(chunk)
128-
except queue.Empty:
129-
break
130-
131-
byte_data = b''.join(data)
13285

86+
for byte_data in super(ResumableMicrophoneStream, self).generator():
13387
# Populate the replay buffer of untranscribed audio bytes
13488
total_bytes_sent += len(byte_data)
13589
chunk_end_time = total_bytes_sent / self._bytes_per_second
13690
self._untranscribed.append((byte_data, chunk_end_time))
13791

13892
yield byte_data
139-
# [END audio_stream]
14093

14194

142-
def duration_to_secs(duration):
143-
return duration.seconds + (duration.nanos / float(1e9))
95+
class SimulatedMicrophoneStream(ResumableMicrophoneStream):
96+
def __init__(self, audio_src, *args, **kwargs):
97+
super(SimulatedMicrophoneStream, self).__init__(*args, **kwargs)
98+
self._audio_src = audio_src
14499

100+
def _delayed(self, get_data):
101+
total_bytes_read = 0
102+
start_time = time.time()
145103

146-
def listen_print_loop(responses, stream):
147-
"""Iterates through server responses and prints them.
104+
chunk = get_data(self._bytes_per_chunk)
148105

149-
The responses passed is a generator that will block until a response
150-
is provided by the server.
106+
while chunk and not self.closed:
107+
total_bytes_read += len(chunk)
108+
expected_yield_time = start_time + (
109+
total_bytes_read / self._bytes_per_second)
110+
now = time.time()
111+
if expected_yield_time > now:
112+
time.sleep(expected_yield_time - now)
151113

152-
Each response may contain multiple results, and each result may contain
153-
multiple alternatives; for details, see https://goo.gl/tjCPAU. Here we
154-
print only the transcription for the top alternative of the top result.
114+
yield chunk
155115

156-
In this case, responses are provided for interim results as well. If the
157-
response is an interim one, print a line feed at the end of it, to allow
158-
the next result to overwrite it, until the response is a final one. For the
159-
final one, print a newline to preserve the finalized transcription.
160-
"""
161-
num_chars_printed = 0
162-
for response in responses:
163-
if not response.results:
164-
continue
165-
166-
# The `results` list is consecutive. For streaming, we only care about
167-
# the first result being considered, since once it's `is_final`, it
168-
# moves on to considering the next utterance.
169-
result = response.results[0]
170-
if not result.alternatives:
171-
continue
172-
173-
top_alternative = result.alternatives[0]
174-
# Display the transcription of the top alternative.
175-
transcript = top_alternative.transcript
176-
177-
# Display interim results, but with a carriage return at the end of the
178-
# line, so subsequent lines will overwrite them.
179-
#
180-
# If the previous result was longer than this one, we need to print
181-
# some extra spaces to overwrite the previous result
182-
overwrite_chars = ' ' * (num_chars_printed - len(transcript))
183-
184-
if not result.is_final:
185-
sys.stdout.write(transcript + overwrite_chars + '\r')
186-
sys.stdout.flush()
187-
188-
num_chars_printed = len(transcript)
189-
else:
190-
print(transcript + overwrite_chars)
191-
192-
# Exit recognition if any of the transcribed phrases could be
193-
# one of our keywords.
194-
if re.search(r'\b(exit|quit)\b', transcript, re.I):
195-
print('Exiting..')
196-
break
116+
chunk = get_data(self._bytes_per_chunk)
117+
118+
def _stream_from_file(self, audio_src):
119+
with open(audio_src, 'rb') as f:
120+
for chunk in self._delayed(
121+
lambda b_per_chunk: f.read(b_per_chunk)):
122+
yield chunk
123+
124+
# Continue sending silence - 10s worth
125+
trailing_silence = six.StringIO(
126+
b'\0' * self._bytes_per_second * 10)
127+
for chunk in self._delayed(trailing_silence.read):
128+
yield chunk
129+
130+
def _thread(self):
131+
for chunk in self._stream_from_file(self._audio_src):
132+
self._fill_buffer(chunk)
133+
self._fill_buffer(None)
134+
135+
def __enter__(self):
136+
self.closed = False
137+
138+
threading.Thread(target=self._thread).start()
139+
140+
return self
141+
142+
def __exit__(self, type, value, traceback):
143+
self.closed = True
197144

198-
num_chars_printed = 0
199145

146+
def _record_keeper(responses, stream):
147+
"""Calls the stream's on_transcribe callback for each final response.
148+
149+
Args:
150+
responses - a generator of responses. The responses must already be
151+
filtered for ones with results and alternatives.
152+
stream - a ResumableMicrophoneStream.
153+
"""
154+
for r in responses:
155+
result = r.results[0]
156+
if result.is_final:
157+
top_alternative = result.alternatives[0]
200158
# Keep track of what transcripts we've received, so we can resume
201159
# intelligently when we hit the deadline
202160
stream.on_transcribe(duration_to_secs(
203161
top_alternative.words[-1].end_time))
162+
yield r
163+
204164

165+
def listen_print_loop(responses, stream):
166+
"""Iterates through server responses and prints them.
205167
206-
def main():
168+
Same as in transcribe_streaming_mic, but keeps track of when a sent
169+
audio_chunk has been transcribed.
170+
"""
171+
with_results = (r for r in responses if (
172+
r.results and r.results[0].alternatives))
173+
transcribe_streaming_mic.listen_print_loop(
174+
_record_keeper(with_results, stream))
175+
176+
177+
def main(sample_rate, audio_src):
207178
# See http://g.co/cloud/speech/docs/languages
208179
# for a list of supported languages.
209180
language_code = 'en-US' # a BCP-47 language tag
210181

211182
client = speech.SpeechClient()
212183
config = types.RecognitionConfig(
213184
encoding=enums.RecognitionConfig.AudioEncoding.LINEAR16,
214-
sample_rate_hertz=RATE,
185+
sample_rate_hertz=sample_rate,
215186
language_code=language_code,
216187
max_alternatives=1,
217188
enable_word_time_offsets=True)
218189
streaming_config = types.StreamingRecognitionConfig(
219190
config=config,
220191
interim_results=True)
221192

222-
with MicrophoneStream(RATE, CHUNK) as stream:
193+
if audio_src:
194+
mic_manager = SimulatedMicrophoneStream(
195+
audio_src, sample_rate, int(sample_rate / 10))
196+
else:
197+
mic_manager = ResumableMicrophoneStream(
198+
sample_rate, int(sample_rate / 10))
199+
200+
with mic_manager as stream:
223201
resume = False
224202
while True:
225203
audio_generator = stream.generator(resume=resume)
226204
requests = (types.StreamingRecognizeRequest(audio_content=content)
227205
for content in audio_generator)
228206

229-
responses = client.streaming_recognize(
230-
streaming_config, requests,
231-
options=gax.CallOptions(timeout=(60 * 4)))
207+
responses = client.streaming_recognize(streaming_config, requests)
232208

233209
try:
234210
# Now, put the transcription responses to use.
235211
listen_print_loop(responses, stream)
236212
break
237-
except grpc.RpcError, e: # TODO: wrong exception
238-
if e.code() != grpc.StatusCode.INVALID_ARGUMENT:
213+
except grpc.RpcError, e:
214+
if e.code() not in (grpc.StatusCode.INVALID_ARGUMENT,
215+
grpc.StatusCode.OUT_OF_RANGE):
239216
raise
240-
241217
details = e.details()
242-
if 'deadline too short' not in details:
243-
raise
218+
if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
219+
if 'deadline too short' not in details:
220+
raise
221+
else:
222+
if 'maximum allowed stream duration' not in details:
223+
raise
244224

245225
print('Resuming..')
246226
resume = True
247227

248228

249229
if __name__ == '__main__':
250-
main()
230+
parser = argparse.ArgumentParser(
231+
description=__doc__,
232+
formatter_class=argparse.RawDescriptionHelpFormatter)
233+
parser.add_argument('--rate', default=16000, help='Sample rate.', type=int)
234+
parser.add_argument('--audio_src', help='File to simulate streaming of.')
235+
args = parser.parse_args()
236+
main(args.rate, args.audio_src)

0 commit comments

Comments
 (0)