|
21 | 21 | import threading
|
22 | 22 | import time
|
23 | 23 |
|
24 |
| -from pymongo import MongoClient |
| 24 | +from bson.son import SON |
| 25 | +from bson.codec_options import DEFAULT_CODEC_OPTIONS |
| 26 | + |
| 27 | +from pymongo import MongoClient, message |
25 | 28 | from pymongo.errors import (AutoReconnect,
|
26 | 29 | ConnectionFailure,
|
27 | 30 | DuplicateKeyError,
|
@@ -259,6 +262,37 @@ def test_socket_closed(self):
|
259 | 262 | s.close()
|
260 | 263 | self.assertTrue(socket_checker.socket_closed(s))
|
261 | 264 |
|
| 265 | + def test_socket_checker(self): |
| 266 | + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 267 | + s.connect((client_context.host, client_context.port)) |
| 268 | + socket_checker = SocketChecker() |
| 269 | + # Socket has nothing to read. |
| 270 | + self.assertFalse(socket_checker.select(s, read=True)) |
| 271 | + self.assertFalse(socket_checker.select(s, read=True, timeout=0)) |
| 272 | + self.assertFalse(socket_checker.select(s, read=True, timeout=.05)) |
| 273 | + # Socket is writable. |
| 274 | + self.assertTrue(socket_checker.select(s, write=True, timeout=None)) |
| 275 | + self.assertTrue(socket_checker.select(s, write=True)) |
| 276 | + self.assertTrue(socket_checker.select(s, write=True, timeout=0)) |
| 277 | + self.assertTrue(socket_checker.select(s, write=True, timeout=.05)) |
| 278 | + # Make the socket readable |
| 279 | + _, msg, _ = message.query( |
| 280 | + 0, 'admin.$cmd', 0, -1, SON([('isMaster', 1)]), None, |
| 281 | + DEFAULT_CODEC_OPTIONS) |
| 282 | + s.sendall(msg) |
| 283 | + # Block until the socket is readable. |
| 284 | + self.assertTrue(socket_checker.select(s, read=True, timeout=None)) |
| 285 | + self.assertTrue(socket_checker.select(s, read=True)) |
| 286 | + self.assertTrue(socket_checker.select(s, read=True, timeout=0)) |
| 287 | + self.assertTrue(socket_checker.select(s, read=True, timeout=.05)) |
| 288 | + # Socket is still writable. |
| 289 | + self.assertTrue(socket_checker.select(s, write=True, timeout=None)) |
| 290 | + self.assertTrue(socket_checker.select(s, write=True)) |
| 291 | + self.assertTrue(socket_checker.select(s, write=True, timeout=0)) |
| 292 | + self.assertTrue(socket_checker.select(s, write=True, timeout=.05)) |
| 293 | + s.close() |
| 294 | + self.assertTrue(socket_checker.socket_closed(s)) |
| 295 | + |
262 | 296 | def test_return_socket_after_reset(self):
|
263 | 297 | pool = self.create_pool()
|
264 | 298 | with pool.get_socket({}) as sock:
|
|
0 commit comments