Skip to content

Commit b58d0d8

Browse files
authored
feat: add context manager capability to subscriber (#32)
* feat: add context manager capability to subscriber * Do not use shared subscriber in socket leak test
1 parent 1feaa24 commit b58d0d8

File tree

4 files changed

+86
-2
lines changed

4 files changed

+86
-2
lines changed

google/cloud/pubsub_v1/subscriber/client.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,3 +228,19 @@ def callback(message):
228228
manager.open(callback=callback, on_callback_error=future.set_exception)
229229

230230
return future
231+
232+
def close(self):
233+
"""Close the underlying channel to release socket resources.
234+
235+
After a channel has been closed, the client instance cannot be used
236+
anymore.
237+
238+
This method is idempotent.
239+
"""
240+
self.api.transport.channel.close()
241+
242+
def __enter__(self):
243+
return self
244+
245+
def __exit__(self, exc_type, exc_val, exc_tb):
246+
self.close()

noxfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def system(session):
110110

111111
# Install all test dependencies, then install this package into the
112112
# virtualenv's dist-packages.
113-
session.install("mock", "pytest")
113+
session.install("mock", "pytest", "psutil")
114114

115115
session.install("-e", "test_utils")
116116
session.install("-e", ".")

tests/system.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import itertools
1919
import operator as op
2020
import os
21+
import psutil
2122
import threading
2223
import time
2324

@@ -46,7 +47,7 @@ def publisher():
4647
yield pubsub_v1.PublisherClient()
4748

4849

49-
@pytest.fixture(scope=u"module")
50+
@pytest.fixture(scope="module")
5051
def subscriber():
5152
yield pubsub_v1.SubscriberClient()
5253

@@ -383,6 +384,54 @@ def test_managing_subscription_iam_policy(
383384
assert bindings[1].members == ["group:[email protected]"]
384385

385386

387+
def test_subscriber_not_leaking_open_sockets(
388+
publisher, topic_path, subscription_path, cleanup
389+
):
390+
# Make sure the topic and the supscription get deleted.
391+
# NOTE: Since subscriber client will be closed in the test, we should not
392+
# use the shared `subscriber` fixture, but instead construct a new client
393+
# in this test.
394+
# Also, since the client will get closed, we need another subscriber client
395+
# to clean up the subscription. We also need to make sure that auxiliary
396+
# subscriber releases the sockets, too.
397+
subscriber = pubsub_v1.SubscriberClient()
398+
subscriber_2 = pubsub_v1.SubscriberClient()
399+
cleanup.append((subscriber_2.delete_subscription, subscription_path))
400+
401+
def one_arg_close(subscriber): # the cleanup helper expects exactly one argument
402+
subscriber.close()
403+
404+
cleanup.append((one_arg_close, subscriber_2))
405+
cleanup.append((publisher.delete_topic, topic_path))
406+
407+
# Create topic before starting to track connection count (any sockets opened
408+
# by the publisher client are not counted by this test).
409+
publisher.create_topic(topic_path)
410+
411+
current_process = psutil.Process()
412+
conn_count_start = len(current_process.connections())
413+
414+
# Publish a few messages, then synchronously pull them and check that
415+
# no sockets are leaked.
416+
with subscriber:
417+
subscriber.create_subscription(name=subscription_path, topic=topic_path)
418+
419+
# Publish a few messages, wait for the publish to succeed.
420+
publish_futures = [
421+
publisher.publish(topic_path, u"message {}".format(i).encode())
422+
for i in range(1, 4)
423+
]
424+
for future in publish_futures:
425+
future.result()
426+
427+
# Synchronously pull messages.
428+
response = subscriber.pull(subscription_path, max_messages=3)
429+
assert len(response.received_messages) == 3
430+
431+
conn_count_end = len(current_process.connections())
432+
assert conn_count_end == conn_count_start
433+
434+
386435
class TestStreamingPull(object):
387436
def test_streaming_pull_callback_error_propagation(
388437
self, publisher, topic_path, subscriber, subscription_path, cleanup

tests/unit/pubsub_v1/subscriber/test_subscriber_client.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,3 +106,22 @@ def test_subscribe_options(manager_open):
106106
callback=mock.sentinel.callback,
107107
on_callback_error=future.set_exception,
108108
)
109+
110+
111+
def test_close():
112+
mock_transport = mock.NonCallableMock()
113+
client = subscriber.Client(transport=mock_transport)
114+
115+
client.close()
116+
117+
mock_transport.channel.close.assert_called()
118+
119+
120+
def test_closes_channel_as_context_manager():
121+
mock_transport = mock.NonCallableMock()
122+
client = subscriber.Client(transport=mock_transport)
123+
124+
with client:
125+
pass
126+
127+
mock_transport.channel.close.assert_called()

0 commit comments

Comments
 (0)