Skip to content

Commit a83a307

Browse files
Pub/Sub to GCS using Dataflow Tutorial (#2385)
* Add README * Add requirements.txt * Add sample * Update readme * Working example * Submit for review * lint * Remove Java links * David's suggestions * nits: license header, docstring, indentation * Add a note, comments, input args, op name * change message to str, json dumps batch * update requirement * Add test, should pass lint & py36 * OK to not delete topic * use storage client * update storage client to gcs client
1 parent c8a2ce2 commit a83a307

File tree

4 files changed

+432
-0
lines changed

4 files changed

+432
-0
lines changed
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Copyright 2019 Google LLC.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# [START pubsub_to_gcs]
16+
import argparse
17+
import datetime
18+
import json
19+
import logging
20+
21+
import apache_beam as beam
22+
import apache_beam.transforms.window as window
23+
from apache_beam.options.pipeline_options import PipelineOptions
24+
25+
26+
class GroupWindowsIntoBatches(beam.PTransform):
27+
"""A composite transform that groups Pub/Sub messages based on publish
28+
time and outputs a list of dictionaries, where each contains one message
29+
and its publish timestamp.
30+
"""
31+
32+
def __init__(self, window_size):
33+
# Convert minutes into seconds.
34+
self.window_size = int(window_size * 60)
35+
36+
def expand(self, pcoll):
37+
return (pcoll
38+
# Assigns window info to each Pub/Sub message based on its
39+
# publish timestamp.
40+
| 'Window into Fixed Intervals' >> beam.WindowInto(
41+
window.FixedWindows(self.window_size))
42+
| 'Add timestamps to messages' >> (beam.ParDo(AddTimestamps()))
43+
# Use a dummy key to group the elements in the same window.
44+
# Note that all the elements in one window must fit into memory
45+
# for this. If the windowed elements do not fit into memory,
46+
# please consider using `beam.util.BatchElements`.
47+
# https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
48+
| 'Add Dummy Key' >> beam.Map(lambda elem: (None, elem))
49+
| 'Groupby' >> beam.GroupByKey()
50+
| 'Abandon Dummy Key' >> beam.MapTuple(lambda _, val: val))
51+
52+
53+
class AddTimestamps(beam.DoFn):
54+
55+
def process(self, element, publish_time=beam.DoFn.TimestampParam):
56+
"""Processes each incoming windowed element by extracting the Pub/Sub
57+
message and its publish timestamp into a dictionary. `publish_time`
58+
defaults to the publish timestamp returned by the Pub/Sub server. It
59+
is bound to each element by Beam at runtime.
60+
"""
61+
62+
yield {
63+
'message_body': element.decode('utf-8'),
64+
'publish_time': datetime.datetime.utcfromtimestamp(
65+
float(publish_time)).strftime("%Y-%m-%d %H:%M:%S.%f"),
66+
}
67+
68+
69+
class WriteBatchesToGCS(beam.DoFn):
70+
71+
def __init__(self, output_path):
72+
self.output_path = output_path
73+
74+
def process(self, batch, window=beam.DoFn.WindowParam):
75+
"""Write one batch per file to a Google Cloud Storage bucket. """
76+
77+
ts_format = '%H:%M'
78+
window_start = window.start.to_utc_datetime().strftime(ts_format)
79+
window_end = window.end.to_utc_datetime().strftime(ts_format)
80+
filename = '-'.join([self.output_path, window_start, window_end])
81+
82+
with beam.io.gcp.gcsio.GcsIO().open(filename=filename, mode='w') as f:
83+
for element in batch:
84+
f.write('{}\n'.format(json.dumps(element)).encode('utf-8'))
85+
86+
87+
def run(input_topic, output_path, window_size=1.0, pipeline_args=None):
88+
# `save_main_session` is set to true because some DoFn's rely on
89+
# globally imported modules.
90+
pipeline_options = PipelineOptions(
91+
pipeline_args, streaming=True, save_main_session=True)
92+
93+
with beam.Pipeline(options=pipeline_options) as pipeline:
94+
(pipeline
95+
| 'Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic)
96+
| 'Window into' >> GroupWindowsIntoBatches(window_size)
97+
| 'Write to GCS' >> beam.ParDo(WriteBatchesToGCS(output_path)))
98+
99+
100+
if __name__ == '__main__': # noqa
101+
logging.getLogger().setLevel(logging.INFO)
102+
103+
parser = argparse.ArgumentParser()
104+
parser.add_argument(
105+
'--input_topic',
106+
help='The Cloud Pub/Sub topic to read from.\n'
107+
'"projects/<PROJECT_NAME>/topics/<TOPIC_NAME>".')
108+
parser.add_argument(
109+
'--window_size',
110+
type=float,
111+
default=1.0,
112+
help='Output file\'s window size in number of minutes.')
113+
parser.add_argument(
114+
'--output_path',
115+
help='GCS Path of the output file including filename prefix.')
116+
known_args, pipeline_args = parser.parse_known_args()
117+
118+
run(known_args.input_topic, known_args.output_path, known_args.window_size,
119+
pipeline_args)
120+
# [END pubsub_to_gcs]
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
# Copyright 2019 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the 'License');
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an 'AS IS' BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import multiprocessing as mp
16+
import os
17+
import pytest
18+
import subprocess as sp
19+
import tempfile
20+
import time
21+
import uuid
22+
23+
import apache_beam as beam
24+
from google.cloud import pubsub_v1
25+
26+
27+
PROJECT = os.environ['GCLOUD_PROJECT']
28+
BUCKET = os.environ['CLOUD_STORAGE_BUCKET']
29+
TOPIC = 'test-topic'
30+
UUID = uuid.uuid4().hex
31+
32+
33+
@pytest.fixture
34+
def publisher_client():
35+
yield pubsub_v1.PublisherClient()
36+
37+
38+
@pytest.fixture
39+
def topic_path(publisher_client):
40+
topic_path = publisher_client.topic_path(PROJECT, TOPIC)
41+
42+
try:
43+
publisher_client.delete_topic(topic_path)
44+
except Exception:
45+
pass
46+
47+
response = publisher_client.create_topic(topic_path)
48+
yield response.name
49+
50+
51+
def _infinite_publish_job(publisher_client, topic_path):
52+
while True:
53+
future = publisher_client.publish(
54+
topic_path, data='Hello World!'.encode('utf-8'))
55+
future.result()
56+
time.sleep(10)
57+
58+
59+
def test_run(publisher_client, topic_path):
60+
"""This is an integration test that runs `PubSubToGCS.py` in its entirety.
61+
It checks for output files on GCS.
62+
"""
63+
64+
# Use one process to publish messages to a topic.
65+
publish_process = mp.Process(
66+
target=lambda: _infinite_publish_job(publisher_client, topic_path))
67+
68+
# Use another process to run the streaming pipeline that should write one
69+
# file to GCS every minute (according to the default window size).
70+
pipeline_process = mp.Process(
71+
target=lambda: sp.call([
72+
'python', 'PubSubToGCS.py',
73+
'--project', PROJECT,
74+
'--runner', 'DirectRunner',
75+
'--temp_location', tempfile.mkdtemp(),
76+
'--input_topic', topic_path,
77+
'--output_path', 'gs://{}/pubsub/{}/output'.format(BUCKET, UUID),
78+
])
79+
)
80+
81+
publish_process.start()
82+
pipeline_process.start()
83+
84+
# Times out the streaming pipeline after 90 seconds.
85+
pipeline_process.join(timeout=90)
86+
# Immediately kills the publish process after the pipeline shuts down.
87+
publish_process.join(timeout=0)
88+
89+
pipeline_process.terminate()
90+
publish_process.terminate()
91+
92+
# Check for output files on GCS.
93+
gcs_client = beam.io.gcp.gcsio.GcsIO()
94+
# This returns a dictionary.
95+
files = gcs_client.list_prefix('gs://{}/pubsub/{}'.format(BUCKET, UUID))
96+
assert len(files) > 0
97+
98+
# Clean up. Delete topic. Delete files.
99+
publisher_client.delete_topic(topic_path)
100+
gcs_client.delete_batch(list(files))

0 commit comments

Comments
 (0)