Skip to content

Commit 5b457b2

Browse files
committed
PYTHON-2462 Implement prose test "Connection Pool Management"
1 parent d1e6435 commit 5b457b2

File tree

4 files changed

+70
-18
lines changed

4 files changed

+70
-18
lines changed

test/test_discovery_and_monitoring.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,14 @@
3939
from test import unittest, IntegrationTest
4040
from test.utils import (assertion_context,
4141
cdecimal_patched,
42+
CMAPListener,
4243
client_context,
4344
Barrier,
4445
get_pool,
46+
HeartbeatEventListener,
4547
server_name_to_type,
4648
rs_or_single_client,
49+
single_client,
4750
TestCreator,
4851
wait_until)
4952
from test.utils_spec_runner import SpecRunner, SpecRunnerThread
@@ -299,6 +302,46 @@ def insert_command(i):
299302
client.admin.command('ping')
300303

301304

305+
class CMAPHeartbeatListener(HeartbeatEventListener, CMAPListener):
306+
pass
307+
308+
309+
class TestPoolManagement(IntegrationTest):
310+
@client_context.require_failCommand_appName()
311+
def test_pool_unpause(self):
312+
# This test implements the prose test "Connection Pool Management"
313+
listener = CMAPHeartbeatListener()
314+
client = single_client(appName="SDAMPoolManagementTest",
315+
heartbeatFrequencyMS=500,
316+
event_listeners=[listener])
317+
self.addCleanup(client.close)
318+
# Assert that ConnectionPoolReadyEvent occurs after the first
319+
# ServerHeartbeatSucceededEvent.
320+
listener.wait_for_event(monitoring.PoolReadyEvent, 1)
321+
pool_ready = listener.events_by_type(monitoring.PoolReadyEvent)[0]
322+
hb_succeeded = listener.events_by_type(
323+
monitoring.ServerHeartbeatSucceededEvent)[0]
324+
self.assertGreater(
325+
listener.events.index(pool_ready),
326+
listener.events.index(hb_succeeded))
327+
328+
listener.reset()
329+
fail_ismaster = {
330+
'mode': {'times': 1},
331+
'data': {
332+
'failCommands': ['isMaster'],
333+
'errorCode': 1234,
334+
'appName': 'SDAMPoolManagementTest',
335+
},
336+
}
337+
with self.fail_point(fail_ismaster):
338+
listener.wait_for_event(monitoring.ServerHeartbeatFailedEvent, 1)
339+
listener.wait_for_event(monitoring.PoolClearedEvent, 1)
340+
listener.wait_for_event(
341+
monitoring.ServerHeartbeatSucceededEvent, 1)
342+
listener.wait_for_event(monitoring.PoolReadyEvent, 1)
343+
344+
302345
class TestIntegration(SpecRunner):
303346
# Location of JSON test specifications.
304347
TEST_PATH = os.path.join(

test/test_heartbeat_monitoring.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,12 @@ def _check_with_socket(self, *args, **kwargs):
5151
# monitor thread may run multiple times during the execution
5252
# of this test.
5353
wait_until(
54-
lambda: len(listener.results) >= expected_len,
54+
lambda: len(listener.events) >= expected_len,
5555
"publish all events")
5656

5757
try:
5858
# zip gives us len(expected_results) pairs.
59-
for expected, actual in zip(expected_results, listener.results):
59+
for expected, actual in zip(expected_results, listener.events):
6060
self.assertEqual(expected,
6161
actual.__class__.__name__)
6262
self.assertEqual(actual.connection_id,

test/test_streaming_protocol.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ def hb_failed(event):
203203
self.assertTrue(hb_failed_events[0].awaited)
204204
# Depending on thread scheduling, the failed heartbeat could occur on
205205
# the second or third check.
206-
events = [type(e) for e in hb_listener.results[:4]]
206+
events = [type(e) for e in hb_listener.events[:4]]
207207
if events == [monitoring.ServerHeartbeatStartedEvent,
208208
monitoring.ServerHeartbeatSucceededEvent,
209209
monitoring.ServerHeartbeatStartedEvent,

test/utils.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
IMPOSSIBLE_WRITE_CONCERN = WriteConcern(w=50)
6161

6262

63-
class CMAPListener(ConnectionPoolListener):
63+
class BaseListener(ConnectionPoolListener):
6464
def __init__(self):
6565
self.events = []
6666

@@ -71,9 +71,26 @@ def add_event(self, event):
7171
self.events.append(event)
7272

7373
def event_count(self, event_type):
74-
return len([event for event in self.events[:]
75-
if isinstance(event, event_type)])
74+
return len(self.events_by_type(event_type))
7675

76+
def events_by_type(self, event_type):
77+
"""Return the matching events by event class.
78+
79+
event_type can be a single class or a tuple of classes.
80+
"""
81+
return self.matching(lambda e: isinstance(e, event_type))
82+
83+
def matching(self, matcher):
84+
"""Return the matching events."""
85+
return [event for event in self.events[:] if matcher(event)]
86+
87+
def wait_for_event(self, event, count):
88+
"""Wait for a number of events to be published, or fail."""
89+
wait_until(lambda: self.event_count(event) >= count,
90+
'find %s %s event(s)' % (count, event))
91+
92+
93+
class CMAPListener(BaseListener, monitoring.ConnectionPoolListener):
7794
def connection_created(self, event):
7895
self.add_event(event)
7996

@@ -199,25 +216,17 @@ class ServerAndTopologyEventListener(ServerEventListener,
199216
"""Listens to Server and Topology events."""
200217

201218

202-
class HeartbeatEventListener(monitoring.ServerHeartbeatListener):
219+
class HeartbeatEventListener(BaseListener, monitoring.ServerHeartbeatListener):
203220
"""Listens to only server heartbeat events."""
204221

205-
def __init__(self):
206-
self.results = []
207-
208222
def started(self, event):
209-
self.results.append(event)
223+
self.add_event(event)
210224

211225
def succeeded(self, event):
212-
self.results.append(event)
226+
self.add_event(event)
213227

214228
def failed(self, event):
215-
self.results.append(event)
216-
217-
def matching(self, matcher):
218-
"""Return the matching events."""
219-
results = self.results[:]
220-
return [event for event in results if matcher(event)]
229+
self.add_event(event)
221230

222231

223232
class MockSocketInfo(object):

0 commit comments

Comments
 (0)