30
30
from bson import DEFAULT_CODEC_OPTIONS
31
31
from bson .py3compat import imap , itervalues , _unicode , PY3
32
32
from bson .son import SON
33
- from pymongo import auth , helpers , thread_util , __version__
33
+ from pymongo import auth , helpers , __version__
34
34
from pymongo .client_session import _validate_session_write_concern
35
35
from pymongo .common import (MAX_BSON_SIZE ,
36
36
MAX_CONNECTING ,
46
46
CertificateError ,
47
47
ConnectionFailure ,
48
48
ConfigurationError ,
49
+ ExceededMaxWaiters ,
49
50
InvalidOperation ,
50
51
DocumentTooLarge ,
51
52
NetworkTimeout ,
@@ -309,7 +310,8 @@ class PoolOptions(object):
309
310
'__wait_queue_timeout' , '__wait_queue_multiple' ,
310
311
'__ssl_context' , '__ssl_match_hostname' , '__socket_keepalive' ,
311
312
'__event_listeners' , '__appname' , '__driver' , '__metadata' ,
312
- '__compression_settings' , '__max_connecting' )
313
+ '__compression_settings' , '__max_connecting' ,
314
+ '__pause_enabled' )
313
315
314
316
def __init__ (self , max_pool_size = MAX_POOL_SIZE ,
315
317
min_pool_size = MIN_POOL_SIZE ,
@@ -318,7 +320,8 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
318
320
wait_queue_multiple = None , ssl_context = None ,
319
321
ssl_match_hostname = True , socket_keepalive = True ,
320
322
event_listeners = None , appname = None , driver = None ,
321
- compression_settings = None , max_connecting = MAX_CONNECTING ):
323
+ compression_settings = None , max_connecting = MAX_CONNECTING ,
324
+ pause_enabled = True ):
322
325
323
326
self .__max_pool_size = max_pool_size
324
327
self .__min_pool_size = min_pool_size
@@ -335,6 +338,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
335
338
self .__driver = driver
336
339
self .__compression_settings = compression_settings
337
340
self .__max_connecting = max_connecting
341
+ self .__pause_enabled = pause_enabled
338
342
self .__metadata = copy .deepcopy (_METADATA )
339
343
if appname :
340
344
self .__metadata ['application' ] = {'name' : appname }
@@ -406,6 +410,10 @@ def max_connecting(self):
406
410
"""
407
411
return self .__max_connecting
408
412
413
+ @property
414
+ def pause_enabled (self ):
415
+ return self .__pause_enabled
416
+
409
417
@property
410
418
def max_idle_time_seconds (self ):
411
419
"""The maximum number of seconds that a connection can remain
@@ -1058,6 +1066,8 @@ class _PoolClosedError(PyMongoError):
1058
1066
pass
1059
1067
1060
1068
1069
+ PAUSED , READY , CLOSED = range (3 )
1070
+
1061
1071
# Do *not* explicitly inherit from object or Jython won't call __del__
1062
1072
# http://bugs.jython.org/issue1057
1063
1073
class Pool :
@@ -1068,6 +1078,9 @@ def __init__(self, address, options, handshake=True):
1068
1078
- `options`: a PoolOptions instance
1069
1079
- `handshake`: whether to call ismaster for each new SocketInfo
1070
1080
"""
1081
+ self .state = READY
1082
+ if options .pause_enabled :
1083
+ self .state = PAUSED
1071
1084
# Check a socket's health with socket_closed() every once in a while.
1072
1085
# Can override for testing: 0 to always check, None to never check.
1073
1086
self ._check_interval_seconds = 1
@@ -1079,7 +1092,6 @@ def __init__(self, address, options, handshake=True):
1079
1092
self .active_sockets = 0
1080
1093
# Monotonically increasing connection ID required for CMAP Events.
1081
1094
self .next_connection_id = 1
1082
- self .closed = False
1083
1095
# Track whether the sockets in this pool are writeable or not.
1084
1096
self .is_writable = None
1085
1097
@@ -1098,13 +1110,23 @@ def __init__(self, address, options, handshake=True):
1098
1110
1099
1111
if (self .opts .wait_queue_multiple is None or
1100
1112
self .opts .max_pool_size is None ):
1101
- max_waiters = None
1113
+ max_waiters = float ( 'inf' )
1102
1114
else :
1103
1115
max_waiters = (
1104
1116
self .opts .max_pool_size * self .opts .wait_queue_multiple )
1105
-
1106
- self ._socket_semaphore = thread_util .create_semaphore (
1107
- self .opts .max_pool_size , max_waiters )
1117
+ # The first portion of the wait queue.
1118
+ # Enforces: maxPoolSize and waitQueueMultiple
1119
+ # Also used for: clearing the wait queue
1120
+ self .size_cond = threading .Condition (self .lock )
1121
+ self .requests = 0
1122
+ self .max_pool_size = self .opts .max_pool_size
1123
+ if self .max_pool_size is None :
1124
+ self .max_pool_size = float ('inf' )
1125
+ self .waiters = 0
1126
+ self .max_waiters = max_waiters
1127
+ # The second portion of the wait queue.
1128
+ # Enforces: maxConnecting
1129
+ # Also used for: clearing the wait queue
1108
1130
self ._max_connecting_cond = threading .Condition (self .lock )
1109
1131
self ._max_connecting = self .opts .max_connecting
1110
1132
self ._pending = 0
@@ -1114,10 +1136,22 @@ def __init__(self, address, options, handshake=True):
1114
1136
# Similar to active_sockets but includes threads in the wait queue.
1115
1137
self .operation_count = 0
1116
1138
1139
+ def ready (self ):
1140
+ old_state , self .state = self .state , READY
1141
+ if old_state != READY :
1142
+ if self .enabled_for_cmap :
1143
+ self .opts .event_listeners .publish_pool_ready (self .address )
1144
+
1145
+ @property
1146
+ def closed (self ):
1147
+ return self .state == CLOSED
1148
+
1117
1149
def _reset (self , close ):
1118
- with self .lock :
1150
+ with self .size_cond :
1119
1151
if self .closed :
1120
1152
return
1153
+ if self .opts .pause_enabled :
1154
+ self .state = PAUSED
1121
1155
self .generation += 1
1122
1156
newpid = os .getpid ()
1123
1157
if self .pid != newpid :
@@ -1126,7 +1160,10 @@ def _reset(self, close):
1126
1160
self .operation_count = 0
1127
1161
sockets , self .sockets = self .sockets , collections .deque ()
1128
1162
if close :
1129
- self .closed = True
1163
+ self .state = CLOSED
1164
+ # Clear the wait queue
1165
+ self ._max_connecting_cond .notify_all ()
1166
+ self .size_cond .notify_all ()
1130
1167
1131
1168
listeners = self .opts .event_listeners
1132
1169
# CMAP spec says that close() MUST close sockets before publishing the
@@ -1164,6 +1201,9 @@ def remove_stale_sockets(self, reference_generation, all_credentials):
1164
1201
`generation` at the point in time this operation was requested on the
1165
1202
pool.
1166
1203
"""
1204
+ if self .state != READY :
1205
+ return
1206
+
1167
1207
if self .opts .max_idle_time_seconds is not None :
1168
1208
with self .lock :
1169
1209
while (self .sockets and
@@ -1172,15 +1212,14 @@ def remove_stale_sockets(self, reference_generation, all_credentials):
1172
1212
sock_info .close_socket (ConnectionClosedReason .IDLE )
1173
1213
1174
1214
while True :
1175
- with self .lock :
1215
+ with self .size_cond :
1216
+ # There are enough sockets in the pool.
1176
1217
if (len (self .sockets ) + self .active_sockets >=
1177
1218
self .opts .min_pool_size ):
1178
- # There are enough sockets in the pool.
1179
1219
return
1180
-
1181
- # We must acquire the semaphore to respect max_pool_size.
1182
- if not self ._socket_semaphore .acquire (False ):
1183
- return
1220
+ if self .requests >= self .opts .min_pool_size :
1221
+ return
1222
+ self .requests += 1
1184
1223
incremented = False
1185
1224
try :
1186
1225
with self ._max_connecting_cond :
@@ -1204,7 +1243,10 @@ def remove_stale_sockets(self, reference_generation, all_credentials):
1204
1243
with self ._max_connecting_cond :
1205
1244
self ._pending -= 1
1206
1245
self ._max_connecting_cond .notify ()
1207
- self ._socket_semaphore .release ()
1246
+
1247
+ with self .size_cond :
1248
+ self .requests -= 1
1249
+ self .size_cond .notify ()
1208
1250
1209
1251
def connect (self , all_credentials = None ):
1210
1252
"""Connect to Mongo and return a new SocketInfo.
@@ -1289,6 +1331,15 @@ def get_socket(self, all_credentials, checkout=False):
1289
1331
if not checkout :
1290
1332
self .return_socket (sock_info )
1291
1333
1334
+ def _raise_if_not_ready (self ):
1335
+ if self .opts .pause_enabled and self .state == PAUSED :
1336
+ if self .enabled_for_cmap :
1337
+ self .opts .event_listeners .publish_connection_check_out_failed (
1338
+ self .address , ConnectionCheckOutFailedReason .CONN_ERROR )
1339
+ # TODO: ensure this error is retryable
1340
+ _raise_connection_failure (
1341
+ self .address , AutoReconnect ('connection pool paused' ))
1342
+
1292
1343
def _get_socket (self , all_credentials ):
1293
1344
"""Get or create a SocketInfo. Can raise ConnectionFailure."""
1294
1345
# We use the pid here to avoid issues with fork / multiprocessing.
@@ -1313,9 +1364,26 @@ def _get_socket(self, all_credentials):
1313
1364
deadline = _time () + self .opts .wait_queue_timeout
1314
1365
else :
1315
1366
deadline = None
1316
- if not self ._socket_semaphore .acquire (
1317
- True , self .opts .wait_queue_timeout ):
1318
- self ._raise_wait_queue_timeout ()
1367
+
1368
+ with self .size_cond :
1369
+ self ._raise_if_not_ready ()
1370
+ if self .waiters >= self .max_waiters :
1371
+ raise ExceededMaxWaiters (
1372
+ 'exceeded max waiters: %s threads already waiting' % (
1373
+ self .waiters ))
1374
+ self .waiters += 1
1375
+ try :
1376
+ while not (self .requests < self .max_pool_size ):
1377
+ if not _cond_wait (self .size_cond , deadline ):
1378
+ # Timed out, notify the next thread to ensure a
1379
+ # timeout doesn't consume the condition.
1380
+ if self .requests < self .max_pool_size :
1381
+ self .size_cond .notify ()
1382
+ self ._raise_wait_queue_timeout ()
1383
+ self ._raise_if_not_ready ()
1384
+ finally :
1385
+ self .waiters -= 1
1386
+ self .requests += 1
1319
1387
1320
1388
# We've now acquired the semaphore and must release it on error.
1321
1389
sock_info = None
@@ -1330,6 +1398,9 @@ def _get_socket(self, all_credentials):
1330
1398
# CMAP: we MUST wait for either maxConnecting OR for a socket
1331
1399
# to be checked back into the pool.
1332
1400
with self ._max_connecting_cond :
1401
+ emitted_event = True
1402
+ self ._raise_if_not_ready ()
1403
+ emitted_event = False
1333
1404
while not (self .sockets or
1334
1405
self ._pending < self ._max_connecting ):
1335
1406
if not _cond_wait (self ._max_connecting_cond , deadline ):
@@ -1340,6 +1411,9 @@ def _get_socket(self, all_credentials):
1340
1411
self ._max_connecting_cond .notify ()
1341
1412
emitted_event = True
1342
1413
self ._raise_wait_queue_timeout ()
1414
+ emitted_event = True
1415
+ self ._raise_if_not_ready ()
1416
+ emitted_event = False
1343
1417
1344
1418
try :
1345
1419
sock_info = self .sockets .popleft ()
@@ -1361,11 +1435,11 @@ def _get_socket(self, all_credentials):
1361
1435
if sock_info :
1362
1436
# We checked out a socket but authentication failed.
1363
1437
sock_info .close_socket (ConnectionClosedReason .ERROR )
1364
- self ._socket_semaphore .release ()
1365
-
1366
- if incremented :
1367
- with self .lock :
1438
+ with self .size_cond :
1439
+ self .requests -= 1
1440
+ if incremented :
1368
1441
self .active_sockets -= 1
1442
+ self .size_cond .notify ()
1369
1443
1370
1444
if self .enabled_for_cmap and not emitted_event :
1371
1445
self .opts .event_listeners .publish_connection_check_out_failed (
@@ -1401,10 +1475,11 @@ def return_socket(self, sock_info):
1401
1475
# Notify any threads waiting to create a connection.
1402
1476
self ._max_connecting_cond .notify ()
1403
1477
1404
- self ._socket_semaphore . release ()
1405
- with self .lock :
1478
+ with self .size_cond :
1479
+ self .requests -= 1
1406
1480
self .active_sockets -= 1
1407
1481
self .operation_count -= 1
1482
+ self .size_cond .notify ()
1408
1483
1409
1484
def _perished (self , sock_info ):
1410
1485
"""Return True and close the connection if it is "perished".
0 commit comments