28
28
IPADDR_SAFE as _IPADDR_SAFE )
29
29
30
30
from bson import DEFAULT_CODEC_OPTIONS
31
- from bson .py3compat import imap , itervalues , _unicode
31
+ from bson .py3compat import imap , itervalues , _unicode , PY3
32
32
from bson .son import SON
33
33
from pymongo import auth , helpers , thread_util , __version__
34
34
from pymongo .client_session import _validate_session_write_concern
35
35
from pymongo .common import (MAX_BSON_SIZE ,
36
+ MAX_CONNECTING ,
36
37
MAX_IDLE_TIME_SEC ,
37
38
MAX_MESSAGE_SIZE ,
38
39
MAX_POOL_SIZE ,
@@ -285,6 +286,18 @@ def _raise_connection_failure(address, error, msg_prefix=None):
285
286
else :
286
287
raise AutoReconnect (msg )
287
288
289
+ if PY3 :
290
+ def _cond_wait (condition , timeout , deadline ):
291
+ return condition .wait (timeout )
292
+ else :
293
+ def _cond_wait (condition , timeout , deadline ):
294
+ condition .wait (timeout )
295
+ # Python 2.7 always returns False for wait(),
296
+ # manually check for a timeout.
297
+ if timeout and _time () >= deadline :
298
+ return False
299
+ return True
300
+
288
301
289
302
class PoolOptions (object ):
290
303
@@ -294,7 +307,7 @@ class PoolOptions(object):
294
307
'__wait_queue_timeout' , '__wait_queue_multiple' ,
295
308
'__ssl_context' , '__ssl_match_hostname' , '__socket_keepalive' ,
296
309
'__event_listeners' , '__appname' , '__driver' , '__metadata' ,
297
- '__compression_settings' )
310
+ '__compression_settings' , '__max_connecting' )
298
311
299
312
def __init__ (self , max_pool_size = MAX_POOL_SIZE ,
300
313
min_pool_size = MIN_POOL_SIZE ,
@@ -303,7 +316,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
303
316
wait_queue_multiple = None , ssl_context = None ,
304
317
ssl_match_hostname = True , socket_keepalive = True ,
305
318
event_listeners = None , appname = None , driver = None ,
306
- compression_settings = None ):
319
+ compression_settings = None , max_connecting = MAX_CONNECTING ):
307
320
308
321
self .__max_pool_size = max_pool_size
309
322
self .__min_pool_size = min_pool_size
@@ -319,6 +332,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
319
332
self .__appname = appname
320
333
self .__driver = driver
321
334
self .__compression_settings = compression_settings
335
+ self .__max_connecting = max_connecting
322
336
self .__metadata = copy .deepcopy (_METADATA )
323
337
if appname :
324
338
self .__metadata ['application' ] = {'name' : appname }
@@ -357,6 +371,8 @@ def non_default_options(self):
357
371
opts ['maxIdleTimeMS' ] = self .__max_idle_time_seconds * 1000
358
372
if self .__wait_queue_timeout != WAIT_QUEUE_TIMEOUT :
359
373
opts ['waitQueueTimeoutMS' ] = self .__wait_queue_timeout * 1000
374
+ if self .__max_connecting != MAX_CONNECTING :
375
+ opts ['maxConnecting' ] = self .__max_connecting
360
376
return opts
361
377
362
378
@property
@@ -381,6 +397,13 @@ def min_pool_size(self):
381
397
"""
382
398
return self .__min_pool_size
383
399
400
+ @property
401
+ def max_connecting (self ):
402
+ """The maximum number of concurrent connection creation attempts per
403
+ pool. Defaults to 2.
404
+ """
405
+ return self .__max_connecting
406
+
384
407
@property
385
408
def max_idle_time_seconds (self ):
386
409
"""The maximum number of seconds that a connection can remain
@@ -1080,6 +1103,9 @@ def __init__(self, address, options, handshake=True):
1080
1103
1081
1104
self ._socket_semaphore = thread_util .create_semaphore (
1082
1105
self .opts .max_pool_size , max_waiters )
1106
+ self ._max_connecting_cond = threading .Condition (self .lock )
1107
+ self ._max_connecting = self .opts .max_connecting
1108
+ self ._pending = 0
1083
1109
if self .enabled_for_cmap :
1084
1110
self .opts .event_listeners .publish_pool_created (
1085
1111
self .address , self .opts .non_default_options )
@@ -1143,21 +1169,34 @@ def remove_stale_sockets(self, reference_generation, all_credentials):
1143
1169
if (len (self .sockets ) + self .active_sockets >=
1144
1170
self .opts .min_pool_size ):
1145
1171
# There are enough sockets in the pool.
1146
- break
1172
+ return
1147
1173
1148
1174
# We must acquire the semaphore to respect max_pool_size.
1149
1175
if not self ._socket_semaphore .acquire (False ):
1150
- break
1176
+ return
1177
+ incremented = False
1151
1178
try :
1179
+ with self ._max_connecting_cond :
1180
+ # If maxConnecting connections are already being created
1181
+ # by this pool then try again later instead of waiting.
1182
+ if self ._pending >= self ._max_connecting :
1183
+ return
1184
+ self ._pending += 1
1185
+ incremented = True
1152
1186
sock_info = self .connect (all_credentials )
1153
1187
with self .lock :
1154
1188
# Close connection and return if the pool was reset during
1155
1189
# socket creation or while acquiring the pool lock.
1156
1190
if self .generation != reference_generation :
1157
1191
sock_info .close_socket (ConnectionClosedReason .STALE )
1158
- break
1192
+ return
1159
1193
self .sockets .appendleft (sock_info )
1160
1194
finally :
1195
+ if incremented :
1196
+ # Notify after adding the socket to the pool.
1197
+ with self ._max_connecting_cond :
1198
+ self ._pending -= 1
1199
+ self ._max_connecting_cond .notify ()
1161
1200
self ._socket_semaphore .release ()
1162
1201
1163
1202
def connect (self , all_credentials = None ):
@@ -1260,28 +1299,56 @@ def _get_socket(self, all_credentials):
1260
1299
'pool' )
1261
1300
1262
1301
# Get a free socket or create one.
1302
+ if self .opts .wait_queue_timeout :
1303
+ deadline = _time () + self .opts .wait_queue_timeout
1304
+ else :
1305
+ deadline = None
1263
1306
if not self ._socket_semaphore .acquire (
1264
1307
True , self .opts .wait_queue_timeout ):
1265
1308
self ._raise_wait_queue_timeout ()
1266
1309
1267
1310
# We've now acquired the semaphore and must release it on error.
1268
1311
sock_info = None
1269
1312
incremented = False
1313
+ emitted_event = False
1270
1314
try :
1271
1315
with self .lock :
1272
1316
self .active_sockets += 1
1273
1317
incremented = True
1274
1318
1275
1319
while sock_info is None :
1276
- try :
1277
- with self .lock :
1320
+ # CMAP: we MUST wait for either maxConnecting OR for a socket
1321
+ # to be checked back into the pool.
1322
+ with self ._max_connecting_cond :
1323
+ while (self ._pending >= self ._max_connecting and
1324
+ not self .sockets ):
1325
+ if self .opts .wait_queue_timeout :
1326
+ # TODO: What if timeout is <= zero here?
1327
+ # timeout = max(deadline - _time(), .001)
1328
+ timeout = deadline - _time ()
1329
+ else :
1330
+ timeout = None
1331
+ if not _cond_wait (self ._max_connecting_cond ,
1332
+ timeout , deadline ):
1333
+ # timeout
1334
+ emitted_event = True
1335
+ self ._raise_wait_queue_timeout ()
1336
+
1337
+ try :
1278
1338
sock_info = self .sockets .popleft ()
1279
- except IndexError :
1280
- # Can raise ConnectionFailure or CertificateError.
1281
- sock_info = self .connect (all_credentials )
1282
- else :
1339
+ except IndexError :
1340
+ self ._pending += 1
1341
+ if sock_info : # We got a socket from the pool
1283
1342
if self ._perished (sock_info ):
1284
1343
sock_info = None
1344
+ continue
1345
+ else : # We need to create a new connection
1346
+ try :
1347
+ sock_info = self .connect (all_credentials )
1348
+ finally :
1349
+ with self ._max_connecting_cond :
1350
+ self ._pending -= 1
1351
+ self ._max_connecting_cond .notify ()
1285
1352
sock_info .check_auth (all_credentials )
1286
1353
except Exception :
1287
1354
if sock_info :
@@ -1293,7 +1360,7 @@ def _get_socket(self, all_credentials):
1293
1360
with self .lock :
1294
1361
self .active_sockets -= 1
1295
1362
1296
- if self .enabled_for_cmap :
1363
+ if self .enabled_for_cmap and not emitted_event :
1297
1364
self .opts .event_listeners .publish_connection_check_out_failed (
1298
1365
self .address , ConnectionCheckOutFailedReason .CONN_ERROR )
1299
1366
raise
@@ -1324,6 +1391,8 @@ def return_socket(self, sock_info):
1324
1391
sock_info .update_last_checkin_time ()
1325
1392
sock_info .update_is_writable (self .is_writable )
1326
1393
self .sockets .appendleft (sock_info )
1394
+ # Notify any threads waiting to create a connection.
1395
+ self ._max_connecting_cond .notify ()
1327
1396
1328
1397
self ._socket_semaphore .release ()
1329
1398
with self .lock :
0 commit comments