Skip to content

Commit 872c62c

Browse files
authored
Merging samples code (#2452)
* Added codelab.py * Fixed copyright year in codelab.py * Added tags to infinite streaming sample * refactored and added tags to infinite speech streaming sample
1 parent 2a35d64 commit 872c62c

File tree

2 files changed

+142
-29
lines changed

2 files changed

+142
-29
lines changed

composer/workflows/codelab.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# Copyright 2019 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Example Airflow DAG that checks if a local file exists, creates a Cloud Dataproc cluster, runs the Hadoop
16+
wordcount example, and deletes the cluster.
17+
18+
This DAG relies on three Airflow variables
19+
https://airflow.apache.org/concepts.html#variables
20+
* gcp_project - Google Cloud Project to use for the Cloud Dataproc cluster.
21+
* gce_zone - Google Compute Engine zone where Cloud Dataproc cluster should be
22+
created.
23+
* gcs_bucket - Google Cloud Storage bucket to use for result of Hadoop job.
24+
See https://cloud.google.com/storage/docs/creating-buckets for creating a
25+
bucket.
26+
"""
27+
28+
import datetime
29+
import os
30+
31+
from airflow import models
32+
from airflow.contrib.operators import dataproc_operator
33+
from airflow.operators import BashOperator
34+
from airflow.utils import trigger_rule
35+
36+
# Output file for Cloud Dataproc job.
37+
output_file = os.path.join(
38+
models.Variable.get('gcs_bucket'), 'wordcount',
39+
datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep
40+
# Path to Hadoop wordcount example available on every Dataproc cluster.
41+
WORDCOUNT_JAR = (
42+
'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar'
43+
)
44+
45+
# Path to input file for Hadoop job.
46+
input_file = '/home/airflow/gcs/data/rose.txt
47+
48+
# Arguments to pass to Cloud Dataproc job.
49+
wordcount_args = ['wordcount', input_file, output_file]
50+
51+
yesterday = datetime.datetime.combine(
52+
datetime.datetime.today() - datetime.timedelta(1),
53+
datetime.datetime.min.time())
54+
55+
default_dag_args = {
56+
# Setting start date as yesterday starts the DAG immediately when it is
57+
# detected in the Cloud Storage bucket.
58+
'start_date': yesterday,
59+
# To email on failure or retry set 'email' arg to your email and enable
60+
# emailing here.
61+
'email_on_failure': False,
62+
'email_on_retry': False,
63+
# If a task fails, retry it once after waiting at least 5 minutes
64+
'retries': 1,
65+
'retry_delay': datetime.timedelta(minutes=5),
66+
'project_id': models.Variable.get('gcp_project')
67+
}
68+
69+
with models.DAG(
70+
'Composer_sample_quickstart',
71+
# Continue to run DAG once per day
72+
schedule_interval=datetime.timedelta(days=1),
73+
default_args=default_dag_args) as dag:
74+
75+
# Check if the input file exists.
76+
check_file_existence = BashOperator(
77+
task_id=check_file_existence’,
78+
bash_command=if [ ! -f \“{}\” ]; then exit 1; fi’.format(input_file))
79+
80+
# Create a Cloud Dataproc cluster.
81+
create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator(
82+
task_id='create_dataproc_cluster',
83+
# Give the cluster a unique name by appending the date scheduled.
84+
# See https://airflow.apache.org/code.html#default-variables
85+
cluster_name='quickstart-cluster-{{ ds_nodash }}',
86+
num_workers=2,
87+
zone=models.Variable.get('gce_zone'),
88+
master_machine_type='n1-standard-1',
89+
worker_machine_type='n1-standard-1')
90+
91+
92+
# Run the Hadoop wordcount example installed on the Cloud Dataproc cluster
93+
# master node.
94+
run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator(
95+
task_id='run_dataproc_hadoop',
96+
main_jar=WORDCOUNT_JAR,
97+
cluster_name='quickstart-cluster-{{ ds_nodash }}',
98+
arguments=wordcount_args)
99+
100+
# Delete Cloud Dataproc cluster.
101+
delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator(
102+
task_id='delete_dataproc_cluster',
103+
cluster_name='quickstart-cluster-{{ ds_nodash }}',
104+
# Setting trigger_rule to ALL_DONE causes the cluster to be deleted
105+
# even if the Dataproc job fails.
106+
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
107+
108+
# Define DAG dependencies.
109+
check_file_existence >> create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster
110+

speech/microphone/transcribe_streaming_infinite.py

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -22,36 +22,35 @@
2222
pip install pyaudio
2323
pip install termcolor
2424
25+
pyaudio also requires installing "portaudio"
26+
2527
Example usage:
2628
python transcribe_streaming_infinite.py
2729
"""
2830

29-
# [START speech_transcribe_infinite_streaming]
31+
# [START speech_transcribe_infinite_streaming_imports]
3032

3133
import time
3234
import re
33-
import sys
3435

3536
# uses result_end_time currently only avaialble in v1p1beta, will be in v1 soon
3637
from google.cloud import speech_v1p1beta1 as speech
3738
import pyaudio
3839
from six.moves import queue
39-
40-
# Audio recording parameters
41-
STREAMING_LIMIT = 10000
42-
SAMPLE_RATE = 16000
43-
CHUNK_SIZE = int(SAMPLE_RATE / 10) # 100ms
44-
45-
RED = '\033[0;31m'
46-
GREEN = '\033[0;32m'
47-
YELLOW = '\033[0;33m'
48-
40+
# [END speech_transcribe_infinite_streaming_imports]
4941

5042
def get_current_time():
5143
"""Return Current Time in MS."""
5244

5345
return int(round(time.time() * 1000))
5446

47+
# [START speech_transcribe_infinite_streaming_globals]
48+
49+
# Audio recording parameters
50+
STREAMING_LIMIT = 10000
51+
SAMPLE_RATE = 16000
52+
CHUNK_SIZE = int(SAMPLE_RATE / 10) # 100ms
53+
5554

5655
class ResumableMicrophoneStream:
5756
"""Opens a recording stream as a generator yielding the audio chunks."""
@@ -85,6 +84,8 @@ def __init__(self, rate, chunk_size):
8584
stream_callback=self._fill_buffer,
8685
)
8786

87+
# [END speech_transcribe_infinite_streaming_globals]
88+
8889
def __enter__(self):
8990

9091
self.closed = False
@@ -105,6 +106,10 @@ def _fill_buffer(self, in_data, *args, **kwargs):
105106

106107
self._buff.put(in_data)
107108
return None, pyaudio.paContinue
109+
110+
# [END speech_transcribe_infinite_streaming_init]
111+
112+
# [START speech_transcribe_infinite_streaming_generator]
108113

109114
def generator(self):
110115
"""Stream Audio from microphone to API and to local buffer"""
@@ -160,6 +165,9 @@ def generator(self):
160165

161166
yield b''.join(data)
162167

168+
# [END speech_transcribe_infinite_streaming_generator]
169+
170+
# [START speech_transcribe_infinite_streaming_output]
163171

164172
def listen_print_loop(responses, stream):
165173
"""Iterates through server responses and prints them.
@@ -212,28 +220,26 @@ def listen_print_loop(responses, stream):
212220

213221
if result.is_final:
214222

215-
sys.stdout.write(GREEN)
216-
sys.stdout.write('\033[K')
217-
sys.stdout.write(str(corrected_time) + ': ' + transcript + '\n')
223+
print(str(corrected_time) + ': ' + transcript + '\n')
218224

219225
stream.is_final_end_time = stream.result_end_time
220226
stream.last_transcript_was_final = True
221227

222228
# Exit recognition if any of the transcribed phrases could be
223229
# one of our keywords.
224230
if re.search(r'\b(exit|quit)\b', transcript, re.I):
225-
sys.stdout.write(YELLOW)
226-
sys.stdout.write('Exiting...\n')
231+
print('Exiting...\n')
227232
stream.closed = True
228233
break
229234

230235
else:
231-
sys.stdout.write(RED)
232-
sys.stdout.write('\033[K')
233-
sys.stdout.write(str(corrected_time) + ': ' + transcript + '\r')
236+
print(str(corrected_time) + ': ' + "PROCESSING:" + transcript + '\r')
234237

235238
stream.last_transcript_was_final = False
236239

240+
# [END speech_transcribe_infinite_streaming_output]
241+
242+
# [START speech_transcribe_infinite_streaming_main]
237243

238244
def main():
239245
"""start bidirectional streaming from microphone input to speech API"""
@@ -250,17 +256,14 @@ def main():
250256

251257
mic_manager = ResumableMicrophoneStream(SAMPLE_RATE, CHUNK_SIZE)
252258
print(mic_manager.chunk_size)
253-
sys.stdout.write(YELLOW)
254-
sys.stdout.write('\nListening, say "Quit" or "Exit" to stop.\n\n')
255-
sys.stdout.write('End (ms) Transcript Results/Status\n')
256-
sys.stdout.write('=====================================================\n')
259+
print('\nListening, say "Quit" or "Exit" to stop.\n\n')
260+
print('End (ms) Transcript Results/Status\n')
261+
print('=====================================================\n')
257262

258263
with mic_manager as stream:
259264

260265
while not stream.closed:
261-
sys.stdout.write(YELLOW)
262-
sys.stdout.write('\n' + str(
263-
STREAMING_LIMIT * stream.restart_counter) + ': NEW REQUEST\n')
266+
print('\n' + str(STREAMING_LIMIT * stream.restart_counter) + ': NEW REQUEST\n')
264267

265268
stream.audio_input = []
266269
audio_generator = stream.generator()
@@ -283,12 +286,12 @@ def main():
283286
stream.restart_counter = stream.restart_counter + 1
284287

285288
if not stream.last_transcript_was_final:
286-
sys.stdout.write('\n')
289+
print('\n')
287290
stream.new_stream = True
288291

289292

290293
if __name__ == '__main__':
291294

292295
main()
293296

294-
# [END speech_transcribe_infinite_streaming]
297+
# [END speech_transcribe_infinite_streaming_main]

0 commit comments

Comments
 (0)