28
28
IPADDR_SAFE as _IPADDR_SAFE )
29
29
30
30
from bson import DEFAULT_CODEC_OPTIONS
31
- from bson .py3compat import imap , itervalues , _unicode
31
+ from bson .py3compat import imap , itervalues , _unicode , PY3
32
32
from bson .son import SON
33
33
from pymongo import auth , helpers , thread_util , __version__
34
34
from pymongo .client_session import _validate_session_write_concern
35
35
from pymongo .common import (MAX_BSON_SIZE ,
36
+ MAX_CONNECTING ,
36
37
MAX_IDLE_TIME_SEC ,
37
38
MAX_MESSAGE_SIZE ,
38
39
MAX_POOL_SIZE ,
@@ -285,6 +286,20 @@ def _raise_connection_failure(address, error, msg_prefix=None):
285
286
else :
286
287
raise AutoReconnect (msg )
287
288
289
+ if PY3 :
290
+ def _cond_wait (condition , deadline ):
291
+ timeout = deadline - _time () if deadline else None
292
+ return condition .wait (timeout )
293
+ else :
294
+ def _cond_wait (condition , deadline ):
295
+ timeout = deadline - _time () if deadline else None
296
+ condition .wait (timeout )
297
+ # Python 2.7 always returns False for wait(),
298
+ # manually check for a timeout.
299
+ if timeout and _time () >= deadline :
300
+ return False
301
+ return True
302
+
288
303
289
304
class PoolOptions (object ):
290
305
@@ -294,7 +309,7 @@ class PoolOptions(object):
294
309
'__wait_queue_timeout' , '__wait_queue_multiple' ,
295
310
'__ssl_context' , '__ssl_match_hostname' , '__socket_keepalive' ,
296
311
'__event_listeners' , '__appname' , '__driver' , '__metadata' ,
297
- '__compression_settings' )
312
+ '__compression_settings' , '__max_connecting' )
298
313
299
314
def __init__ (self , max_pool_size = MAX_POOL_SIZE ,
300
315
min_pool_size = MIN_POOL_SIZE ,
@@ -303,7 +318,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
303
318
wait_queue_multiple = None , ssl_context = None ,
304
319
ssl_match_hostname = True , socket_keepalive = True ,
305
320
event_listeners = None , appname = None , driver = None ,
306
- compression_settings = None ):
321
+ compression_settings = None , max_connecting = MAX_CONNECTING ):
307
322
308
323
self .__max_pool_size = max_pool_size
309
324
self .__min_pool_size = min_pool_size
@@ -319,6 +334,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
319
334
self .__appname = appname
320
335
self .__driver = driver
321
336
self .__compression_settings = compression_settings
337
+ self .__max_connecting = max_connecting
322
338
self .__metadata = copy .deepcopy (_METADATA )
323
339
if appname :
324
340
self .__metadata ['application' ] = {'name' : appname }
@@ -357,6 +373,8 @@ def non_default_options(self):
357
373
opts ['maxIdleTimeMS' ] = self .__max_idle_time_seconds * 1000
358
374
if self .__wait_queue_timeout != WAIT_QUEUE_TIMEOUT :
359
375
opts ['waitQueueTimeoutMS' ] = self .__wait_queue_timeout * 1000
376
+ if self .__max_connecting != MAX_CONNECTING :
377
+ opts ['maxConnecting' ] = self .__max_connecting
360
378
return opts
361
379
362
380
@property
@@ -381,6 +399,13 @@ def min_pool_size(self):
381
399
"""
382
400
return self .__min_pool_size
383
401
402
+ @property
403
+ def max_connecting (self ):
404
+ """The maximum number of concurrent connection creation attempts per
405
+ pool. Defaults to 2.
406
+ """
407
+ return self .__max_connecting
408
+
384
409
@property
385
410
def max_idle_time_seconds (self ):
386
411
"""The maximum number of seconds that a connection can remain
@@ -1080,6 +1105,9 @@ def __init__(self, address, options, handshake=True):
1080
1105
1081
1106
self ._socket_semaphore = thread_util .create_semaphore (
1082
1107
self .opts .max_pool_size , max_waiters )
1108
+ self ._max_connecting_cond = threading .Condition (self .lock )
1109
+ self ._max_connecting = self .opts .max_connecting
1110
+ self ._pending = 0
1083
1111
if self .enabled_for_cmap :
1084
1112
self .opts .event_listeners .publish_pool_created (
1085
1113
self .address , self .opts .non_default_options )
@@ -1143,21 +1171,34 @@ def remove_stale_sockets(self, reference_generation, all_credentials):
1143
1171
if (len (self .sockets ) + self .active_sockets >=
1144
1172
self .opts .min_pool_size ):
1145
1173
# There are enough sockets in the pool.
1146
- break
1174
+ return
1147
1175
1148
1176
# We must acquire the semaphore to respect max_pool_size.
1149
1177
if not self ._socket_semaphore .acquire (False ):
1150
- break
1178
+ return
1179
+ incremented = False
1151
1180
try :
1181
+ with self ._max_connecting_cond :
1182
+ # If maxConnecting connections are already being created
1183
+ # by this pool then try again later instead of waiting.
1184
+ if self ._pending >= self ._max_connecting :
1185
+ return
1186
+ self ._pending += 1
1187
+ incremented = True
1152
1188
sock_info = self .connect (all_credentials )
1153
1189
with self .lock :
1154
1190
# Close connection and return if the pool was reset during
1155
1191
# socket creation or while acquiring the pool lock.
1156
1192
if self .generation != reference_generation :
1157
1193
sock_info .close_socket (ConnectionClosedReason .STALE )
1158
- break
1194
+ return
1159
1195
self .sockets .appendleft (sock_info )
1160
1196
finally :
1197
+ if incremented :
1198
+ # Notify after adding the socket to the pool.
1199
+ with self ._max_connecting_cond :
1200
+ self ._pending -= 1
1201
+ self ._max_connecting_cond .notify ()
1161
1202
self ._socket_semaphore .release ()
1162
1203
1163
1204
def connect (self , all_credentials = None ):
@@ -1260,28 +1301,53 @@ def _get_socket(self, all_credentials):
1260
1301
'pool' )
1261
1302
1262
1303
# Get a free socket or create one.
1304
+ if self .opts .wait_queue_timeout :
1305
+ deadline = _time () + self .opts .wait_queue_timeout
1306
+ else :
1307
+ deadline = None
1263
1308
if not self ._socket_semaphore .acquire (
1264
1309
True , self .opts .wait_queue_timeout ):
1265
1310
self ._raise_wait_queue_timeout ()
1266
1311
1267
1312
# We've now acquired the semaphore and must release it on error.
1268
1313
sock_info = None
1269
1314
incremented = False
1315
+ emitted_event = False
1270
1316
try :
1271
1317
with self .lock :
1272
1318
self .active_sockets += 1
1273
1319
incremented = True
1274
1320
1275
1321
while sock_info is None :
1276
- try :
1277
- with self .lock :
1322
+ # CMAP: we MUST wait for either maxConnecting OR for a socket
1323
+ # to be checked back into the pool.
1324
+ with self ._max_connecting_cond :
1325
+ while not (self .sockets or
1326
+ self ._pending < self ._max_connecting ):
1327
+ if not _cond_wait (self ._max_connecting_cond , deadline ):
1328
+ # Timed out, notify the next thread to ensure a
1329
+ # timeout doesn't consume the condition.
1330
+ if (self .sockets or
1331
+ self ._pending < self ._max_connecting ):
1332
+ self ._max_connecting_cond .notify ()
1333
+ emitted_event = True
1334
+ self ._raise_wait_queue_timeout ()
1335
+
1336
+ try :
1278
1337
sock_info = self .sockets .popleft ()
1279
- except IndexError :
1280
- # Can raise ConnectionFailure or CertificateError.
1281
- sock_info = self .connect (all_credentials )
1282
- else :
1338
+ except IndexError :
1339
+ self ._pending += 1
1340
+ if sock_info : # We got a socket from the pool
1283
1341
if self ._perished (sock_info ):
1284
1342
sock_info = None
1343
+ continue
1344
+ else : # We need to create a new connection
1345
+ try :
1346
+ sock_info = self .connect (all_credentials )
1347
+ finally :
1348
+ with self ._max_connecting_cond :
1349
+ self ._pending -= 1
1350
+ self ._max_connecting_cond .notify ()
1285
1351
sock_info .check_auth (all_credentials )
1286
1352
except Exception :
1287
1353
if sock_info :
@@ -1293,7 +1359,7 @@ def _get_socket(self, all_credentials):
1293
1359
with self .lock :
1294
1360
self .active_sockets -= 1
1295
1361
1296
- if self .enabled_for_cmap :
1362
+ if self .enabled_for_cmap and not emitted_event :
1297
1363
self .opts .event_listeners .publish_connection_check_out_failed (
1298
1364
self .address , ConnectionCheckOutFailedReason .CONN_ERROR )
1299
1365
raise
@@ -1324,6 +1390,8 @@ def return_socket(self, sock_info):
1324
1390
sock_info .update_last_checkin_time ()
1325
1391
sock_info .update_is_writable (self .is_writable )
1326
1392
self .sockets .appendleft (sock_info )
1393
+ # Notify any threads waiting to create a connection.
1394
+ self ._max_connecting_cond .notify ()
1327
1395
1328
1396
self ._socket_semaphore .release ()
1329
1397
with self .lock :
0 commit comments