|
50 | 50 |
|
51 | 51 | from test import client_context, unittest, IntegrationTest
|
52 | 52 | from test.utils import (
|
53 |
| - camel_to_snake, rs_or_single_client, |
| 53 | + camel_to_snake, rs_or_single_client, single_client, |
54 | 54 | snake_to_camel, ScenarioDict)
|
55 | 55 |
|
56 | 56 | from test.version import Version
|
@@ -193,22 +193,18 @@ def _create_entity(self, entity_spec):
|
193 | 193 |
|
194 | 194 | entity_type, spec = next(iteritems(entity_spec))
|
195 | 195 | if entity_type == 'client':
|
196 |
| - # TODO |
197 |
| - # Add logic to respect the following fields |
198 |
| - # - uriOptions |
199 |
| - # - useMultipleMongoses |
200 |
| - uri_options = spec.get('uriOptions', {}) |
| 196 | + kwargs = {} |
201 | 197 | observe_events = spec.get('observeEvents', [])
|
202 | 198 | ignore_commands = spec.get('ignoreCommandMonitoringEvents', [])
|
203 | 199 | if len(observe_events) or len(ignore_commands):
|
204 | 200 | listener = EventListenerUtil(observe_events, ignore_commands)
|
205 |
| - client = rs_or_single_client( |
206 |
| - event_listeners=[listener], **uri_options) |
207 |
| - else: |
208 |
| - listener = None |
209 |
| - client = rs_or_single_client() |
| 201 | + self._listeners[spec['id']] = listener |
| 202 | + kwargs['event_listeners'] = [listener] |
| 203 | + if client_context.is_mongos and spec.get('useMultipleMongoses'): |
| 204 | + kwargs['host'] = client_context.mongos_seeds() |
| 205 | + kwargs.update(spec.get('uriOptions', {})) |
| 206 | + client = rs_or_single_client(**kwargs) |
210 | 207 | self[spec['id']] = client
|
211 |
| - self._listeners[spec['id']] = listener |
212 | 208 | self._test_class.addCleanup(client.close)
|
213 | 209 | return
|
214 | 210 | elif entity_type == 'database':
|
@@ -269,7 +265,7 @@ def get_listener_for_client(self, client_name):
|
269 | 265 | 'Expected entity %s to be of type MongoClient, got %s' % (
|
270 | 266 | client_name, type(client)))
|
271 | 267 |
|
272 |
| - listener = self._listeners[client_name] |
| 268 | + listener = self._listeners.get(client_name) |
273 | 269 | if not listener:
|
274 | 270 | self._test_class.fail(
|
275 | 271 | 'No listeners configured for client %s' % (client_name,))
|
@@ -726,18 +722,29 @@ def run_entity_operation(self, spec):
|
726 | 722 | if save_as_entity:
|
727 | 723 | self.entity_map[save_as_entity] = result
|
728 | 724 |
|
729 |
| - def _testOperation_failPoint(self, spec): |
730 |
| - client = self.entity_map[spec['client']] |
731 |
| - command_args = spec['failPoint'] |
| 725 | + def __set_fail_point(self, client, command_args): |
732 | 726 | cmd_on = SON([('configureFailPoint', 'failCommand')])
|
733 | 727 | cmd_on.update(command_args)
|
734 | 728 | client.admin.command(cmd_on)
|
735 | 729 | self.addCleanup(
|
736 | 730 | client.admin.command,
|
737 | 731 | 'configureFailPoint', cmd_on['configureFailPoint'], mode='off')
|
738 | 732 |
|
| 733 | + def _testOperation_failPoint(self, spec): |
| 734 | + self.__set_fail_point( |
| 735 | + client=self.entity_map[spec['client']], |
| 736 | + command_args=spec['failPoint']) |
| 737 | + |
739 | 738 | def _testOperation_targetedFailPoint(self, spec):
|
740 |
| - raise NotImplementedError |
| 739 | + session = self.entity_map[spec['session']] |
| 740 | + if not session._pinned_address: |
| 741 | + self.fail("Cannot use targetedFailPoint operation with unpinned " |
| 742 | + "session %s" % (spec['session'],)) |
| 743 | + |
| 744 | + client = single_client('%s:%s' % session._pinned_address) |
| 745 | + self.__set_fail_point( |
| 746 | + client=client, command_args=spec['failPoint']) |
| 747 | + self.addCleanup(client.close) |
741 | 748 |
|
742 | 749 | def _testOperation_assertSessionTransactionState(self, spec):
|
743 | 750 | session = self.entity_map[spec['session']]
|
|
0 commit comments