-
Notifications
You must be signed in to change notification settings - Fork 6.6k
Cloud Pub/Sub Quickstart V2 #2004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
494d194
d6f6ea4
5828619
92929c3
c3768c4
1cdfb31
233d188
a1c303b
cb03cd1
b39898d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
#!/usr/bin/env python | ||
|
||
# Copyright 2019 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
# [START pubsub_quickstart_pub_all] | ||
import argparse | ||
import time | ||
# [START pubsub_quickstart_pub_deps] | ||
from google.cloud import pubsub_v1 | ||
# [END pubsub_quickstart_pub_deps] | ||
|
||
|
||
def get_callback(api_future, data): | ||
"""Wraps message data in the context of the callback function.""" | ||
def callback(api_future): | ||
if api_future.exception(): | ||
print("There was a problem with message {}".format(data)) | ||
else: | ||
print("Published message {} now has message ID {}".format( | ||
data, api_future.result())) | ||
return callback | ||
|
||
|
||
def pub(project_id, topic_name): | ||
"""Publishes a message to a Pub/Sub topic.""" | ||
# [START pubsub_quickstart_pub_client] | ||
# Initializes the Publisher client | ||
anguillanneuf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
client = pubsub_v1.PublisherClient() | ||
# [END pubsub_quickstart_pub_client] | ||
# Creates a fully qualified identifier in the form of | ||
# `projects/{project_id}/topics/{topic_name}` | ||
topic_path = client.topic_path(project_id, topic_name) | ||
|
||
# Data sent to Cloud Pub/Sub must be a bytestring | ||
data = "Hello, World!" | ||
anguillanneuf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
data = data.encode('utf-8') | ||
|
||
# When you publish a message, the client returns a future. | ||
api_future = client.publish(topic_path, data=data) | ||
api_future.add_done_callback(get_callback(api_future, data)) | ||
|
||
# Keeps the main thread from exiting to handle message processing | ||
anguillanneuf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# in the background. | ||
while True: | ||
time.sleep(60) | ||
anguillanneuf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
if __name__ == '__main__': | ||
parser = argparse.ArgumentParser( | ||
description=__doc__, | ||
formatter_class=argparse.RawDescriptionHelpFormatter | ||
) | ||
parser.add_argument('project_id', help='Google Cloud project ID') | ||
parser.add_argument('topic_name', help='Pub/Sub topic name') | ||
|
||
args = parser.parse_args() | ||
|
||
pub(args.project_id, args.topic_name) | ||
# [END pubsub_quickstart_pub_all] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
#!/usr/bin/env python | ||
|
||
# Copyright 2019 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import mock | ||
import os | ||
import pytest | ||
import time | ||
|
||
from google.cloud import pubsub_v1 | ||
|
||
import pub | ||
|
||
PROJECT = os.environ['GCLOUD_PROJECT'] | ||
TOPIC = 'quickstart-pub-test-topic' | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def publisher_client(): | ||
yield pubsub_v1.PublisherClient() | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def topic(publisher_client): | ||
topic_path = publisher_client.topic_path(PROJECT, TOPIC) | ||
|
||
try: | ||
anguillanneuf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
publisher_client.delete_topic(topic_path) | ||
except Exception: | ||
pass | ||
|
||
publisher_client.create_topic(topic_path) | ||
|
||
yield TOPIC | ||
|
||
|
||
def _make_sleep_patch(): | ||
real_sleep = time.sleep | ||
|
||
def new_sleep(period): | ||
if period == 60: | ||
real_sleep(10) | ||
raise RuntimeError('sigil') | ||
else: | ||
real_sleep(period) | ||
|
||
return mock.patch('time.sleep', new=new_sleep) | ||
|
||
|
||
def test_pub(topic, capsys): | ||
with _make_sleep_patch(): | ||
with pytest.raises(RuntimeError, match='sigil'): | ||
pub.pub(PROJECT, topic) | ||
|
||
out, _ = capsys.readouterr() | ||
|
||
assert "Published message b'Hello, World!'" in out |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
#!/usr/bin/env python | ||
|
||
# Copyright 2019 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
# [START pubsub_quickstart_sub_all] | ||
import argparse | ||
import time | ||
# [START pubsub_quickstart_sub_deps] | ||
from google.cloud import pubsub_v1 | ||
# [END pubsub_quickstart_sub_deps] | ||
|
||
|
||
def sub(project_id, subscription_name): | ||
"""Receives messages from a Pub/Sub subscription.""" | ||
# [START pubsub_quickstart_sub_client] | ||
# Initializes the Subscriber client | ||
client = pubsub_v1.SubscriberClient() | ||
# [END pubsub_quickstart_sub_client] | ||
# Creates a fully qualified identifier in the form of | ||
# `projects/{project_id}/subscriptions/{subscription_name}` | ||
subscription_path = client.subscription_path( | ||
project_id, subscription_name) | ||
|
||
def callback(message): | ||
print('Received message {} of message ID {}'.format( | ||
anguillanneuf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
message, message.message_id)) | ||
# Acknowledges the message. Unack'ed messages will be redelivered. | ||
message.ack() | ||
|
||
client.subscribe(subscription_path, callback=callback) | ||
print('Listening for messages on {}'.format(subscription_path)) | ||
|
||
# The subscriber is non-blocking. We must keep the main thread from | ||
# exiting so it can process messages asynchronously in the background. | ||
while True: | ||
anguillanneuf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
time.sleep(60) | ||
|
||
|
||
if __name__ == '__main__': | ||
parser = argparse.ArgumentParser( | ||
description=__doc__, | ||
formatter_class=argparse.RawDescriptionHelpFormatter | ||
) | ||
parser.add_argument('project_id', help='Google Cloud project ID') | ||
parser.add_argument('subscription_name', help='Pub/Sub subscription name') | ||
|
||
args = parser.parse_args() | ||
|
||
sub(args.project_id, args.subscription_name) | ||
# [END pubsub_quickstart_sub_all] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
#!/usr/bin/env python | ||
|
||
# Copyright 2019 Google LLC | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
import mock | ||
import os | ||
import pytest | ||
import time | ||
|
||
from google.cloud import pubsub_v1 | ||
|
||
import sub | ||
|
||
|
||
PROJECT = os.environ['GCLOUD_PROJECT'] | ||
TOPIC = 'quickstart-sub-test-topic' | ||
SUBSCRIPTION = 'quickstart-sub-test-topic-sub' | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def publisher_client(): | ||
yield pubsub_v1.PublisherClient() | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def topic(publisher_client): | ||
topic_path = publisher_client.topic_path(PROJECT, TOPIC) | ||
|
||
try: | ||
anguillanneuf marked this conversation as resolved.
Show resolved
Hide resolved
|
||
publisher_client.delete_topic(topic_path) | ||
except Exception: | ||
pass | ||
|
||
publisher_client.create_topic(topic_path) | ||
|
||
yield topic_path | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def subscriber_client(): | ||
yield pubsub_v1.SubscriberClient() | ||
|
||
|
||
@pytest.fixture(scope='module') | ||
def subscription(subscriber_client, topic): | ||
subscription_path = subscriber_client.subscription_path( | ||
PROJECT, SUBSCRIPTION) | ||
|
||
try: | ||
subscriber_client.delete_subscription(subscription_path) | ||
except Exception: | ||
pass | ||
|
||
subscriber_client.create_subscription(subscription_path, topic=topic) | ||
|
||
yield SUBSCRIPTION | ||
|
||
|
||
def _publish_messages(publisher_client, topic): | ||
data = u'Hello, World!'.encode('utf-8') | ||
publisher_client.publish(topic, data=data) | ||
|
||
|
||
def _make_sleep_patch(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prefer using the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I need some help with this. In the meanwhile, since other tests are also written using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the pattern I'm describing: https://stackoverflow.com/a/29110609/101923, Now that I have a closer look, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried def mock_sleep(sleep):
time.sleep(10)
raise RuntimeError('sigil')
monkeypatch.setattr(time, 'sleep', mock_sleep)
|
||
real_sleep = time.sleep | ||
|
||
def new_sleep(period): | ||
if period == 60: | ||
real_sleep(10) | ||
raise RuntimeError('sigil') | ||
else: | ||
real_sleep(period) | ||
|
||
return mock.patch('time.sleep', new=new_sleep) | ||
|
||
|
||
def test_sub(publisher_client, topic, subscription, capsys): | ||
_publish_messages(publisher_client, topic) | ||
|
||
with _make_sleep_patch(): | ||
with pytest.raises(RuntimeError, match='sigil'): | ||
sub.sub(PROJECT, subscription) | ||
|
||
out, _ = capsys.readouterr() | ||
|
||
assert "Received message" in out |
Uh oh!
There was an error while loading. Please reload this page.