-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-2363 Rate limit new connection creations via maxConnecting #511
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
2f9c780
65c98f1
d151589
68286c1
92d48b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,17 +58,32 @@ to 100. If there are ``maxPoolSize`` connections to a server and all are in | |
use, the next request to that server will wait until one of the connections | ||
becomes available. | ||
|
||
The client instance opens one additional socket per server in your MongoDB | ||
The client instance opens two additional sockets per server in your MongoDB | ||
topology for monitoring the server's state. | ||
|
||
For example, a client connected to a 3-node replica set opens 3 monitoring | ||
For example, a client connected to a 3-node replica set opens 6 monitoring | ||
sockets. It also opens as many sockets as needed to support a multi-threaded | ||
application's concurrent operations on each server, up to ``maxPoolSize``. With | ||
a ``maxPoolSize`` of 100, if the application only uses the primary (the | ||
default), then only the primary connection pool grows and the total connections | ||
is at most 103. If the application uses a | ||
is at most 106. If the application uses a | ||
:class:`~pymongo.read_preferences.ReadPreference` to query the secondaries, | ||
their pools also grow and the total connections can reach 303. | ||
their pools also grow and the total connections can reach 306. | ||
|
||
Additionally, the pools are rate limited such that each connection pool can | ||
only create at most 2 connections in parallel at any time. The connection | ||
creation covers covers all the work required to setup a new connection | ||
including DNS, TCP, SSL/TLS, MongoDB handshake, and MongoDB authentication. | ||
For example, if three threads concurrently attempt to check out a connection | ||
from an empty pool, the first two threads will begin creating new connections | ||
while the third thread will wait. The third thread stops waiting when either: | ||
|
||
- one of the first two threads finishes creating a connection, or | ||
- an existing connection is checked back into the pool. | ||
|
||
Rate limiting concurrent connection creation reduces the likelihood of | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we also say that users who are concerned about any additional latency this might result in can increase their Also, should we add a note about the impact this has at application start time when a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rather not suggest increasing minPoolSize. We actually want threads to wait a bit before creating connections because it yields better throughput and latency in most cases (as evidenced by
Pools shouldn't take longer because minPoolSize connection creation is (and always was) single threaded. MongoClient only creates a single connection for a single pool at once. |
||
connection storms and improves the driver's ability to reuse existing | ||
connections. | ||
|
||
It is possible to set the minimum number of concurrent connections to each | ||
server with ``minPoolSize``, which defaults to 0. The connection pool will be | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,11 +28,12 @@ | |
IPADDR_SAFE as _IPADDR_SAFE) | ||
|
||
from bson import DEFAULT_CODEC_OPTIONS | ||
from bson.py3compat import imap, itervalues, _unicode | ||
from bson.py3compat import imap, itervalues, _unicode, PY3 | ||
from bson.son import SON | ||
from pymongo import auth, helpers, thread_util, __version__ | ||
from pymongo.client_session import _validate_session_write_concern | ||
from pymongo.common import (MAX_BSON_SIZE, | ||
MAX_CONNECTING, | ||
MAX_IDLE_TIME_SEC, | ||
MAX_MESSAGE_SIZE, | ||
MAX_POOL_SIZE, | ||
|
@@ -285,6 +286,20 @@ def _raise_connection_failure(address, error, msg_prefix=None): | |
else: | ||
raise AutoReconnect(msg) | ||
|
||
if PY3: | ||
def _cond_wait(condition, deadline): | ||
timeout = deadline - _time() if deadline else None | ||
return condition.wait(timeout) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets add a comment here that |
||
else: | ||
def _cond_wait(condition, deadline): | ||
timeout = deadline - _time() if deadline else None | ||
condition.wait(timeout) | ||
# Python 2.7 always returns False for wait(), | ||
# manually check for a timeout. | ||
if timeout and _time() >= deadline: | ||
return False | ||
return True | ||
|
||
|
||
class PoolOptions(object): | ||
|
||
|
@@ -294,7 +309,7 @@ class PoolOptions(object): | |
'__wait_queue_timeout', '__wait_queue_multiple', | ||
'__ssl_context', '__ssl_match_hostname', '__socket_keepalive', | ||
'__event_listeners', '__appname', '__driver', '__metadata', | ||
'__compression_settings') | ||
'__compression_settings', '__max_connecting') | ||
|
||
def __init__(self, max_pool_size=MAX_POOL_SIZE, | ||
min_pool_size=MIN_POOL_SIZE, | ||
|
@@ -303,7 +318,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE, | |
wait_queue_multiple=None, ssl_context=None, | ||
ssl_match_hostname=True, socket_keepalive=True, | ||
event_listeners=None, appname=None, driver=None, | ||
compression_settings=None): | ||
compression_settings=None, max_connecting=MAX_CONNECTING): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't configurable by users but I think it still made sense to add max_connecting to PoolOptions. |
||
|
||
self.__max_pool_size = max_pool_size | ||
self.__min_pool_size = min_pool_size | ||
|
@@ -319,6 +334,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE, | |
self.__appname = appname | ||
self.__driver = driver | ||
self.__compression_settings = compression_settings | ||
self.__max_connecting = max_connecting | ||
self.__metadata = copy.deepcopy(_METADATA) | ||
if appname: | ||
self.__metadata['application'] = {'name': appname} | ||
|
@@ -357,6 +373,8 @@ def non_default_options(self): | |
opts['maxIdleTimeMS'] = self.__max_idle_time_seconds * 1000 | ||
if self.__wait_queue_timeout != WAIT_QUEUE_TIMEOUT: | ||
opts['waitQueueTimeoutMS'] = self.__wait_queue_timeout * 1000 | ||
if self.__max_connecting != MAX_CONNECTING: | ||
opts['maxConnecting'] = self.__max_connecting | ||
return opts | ||
|
||
@property | ||
|
@@ -381,6 +399,13 @@ def min_pool_size(self): | |
""" | ||
return self.__min_pool_size | ||
|
||
@property | ||
def max_connecting(self): | ||
"""The maximum number of concurrent connection creation attempts per | ||
pool. Defaults to 2. | ||
""" | ||
return self.__max_connecting | ||
|
||
@property | ||
def max_idle_time_seconds(self): | ||
"""The maximum number of seconds that a connection can remain | ||
|
@@ -1080,6 +1105,9 @@ def __init__(self, address, options, handshake=True): | |
|
||
self._socket_semaphore = thread_util.create_semaphore( | ||
self.opts.max_pool_size, max_waiters) | ||
self._max_connecting_cond = threading.Condition(self.lock) | ||
self._max_connecting = self.opts.max_connecting | ||
self._pending = 0 | ||
if self.enabled_for_cmap: | ||
self.opts.event_listeners.publish_pool_created( | ||
self.address, self.opts.non_default_options) | ||
|
@@ -1143,21 +1171,34 @@ def remove_stale_sockets(self, reference_generation, all_credentials): | |
if (len(self.sockets) + self.active_sockets >= | ||
self.opts.min_pool_size): | ||
# There are enough sockets in the pool. | ||
break | ||
return | ||
|
||
# We must acquire the semaphore to respect max_pool_size. | ||
if not self._socket_semaphore.acquire(False): | ||
break | ||
return | ||
incremented = False | ||
try: | ||
with self._max_connecting_cond: | ||
# If maxConnecting connections are already being created | ||
# by this pool then try again later instead of waiting. | ||
if self._pending >= self._max_connecting: | ||
return | ||
self._pending += 1 | ||
incremented = True | ||
sock_info = self.connect(all_credentials) | ||
with self.lock: | ||
# Close connection and return if the pool was reset during | ||
# socket creation or while acquiring the pool lock. | ||
if self.generation != reference_generation: | ||
sock_info.close_socket(ConnectionClosedReason.STALE) | ||
break | ||
return | ||
self.sockets.appendleft(sock_info) | ||
finally: | ||
if incremented: | ||
# Notify after adding the socket to the pool. | ||
with self._max_connecting_cond: | ||
self._pending -= 1 | ||
self._max_connecting_cond.notify() | ||
self._socket_semaphore.release() | ||
|
||
def connect(self, all_credentials=None): | ||
|
@@ -1260,28 +1301,49 @@ def _get_socket(self, all_credentials): | |
'pool') | ||
|
||
# Get a free socket or create one. | ||
if self.opts.wait_queue_timeout: | ||
deadline = _time() + self.opts.wait_queue_timeout | ||
else: | ||
deadline = None | ||
if not self._socket_semaphore.acquire( | ||
True, self.opts.wait_queue_timeout): | ||
self._raise_wait_queue_timeout() | ||
|
||
# We've now acquired the semaphore and must release it on error. | ||
sock_info = None | ||
incremented = False | ||
emitted_event = False | ||
try: | ||
with self.lock: | ||
self.active_sockets += 1 | ||
incremented = True | ||
|
||
while sock_info is None: | ||
try: | ||
with self.lock: | ||
# CMAP: we MUST wait for either maxConnecting OR for a socket | ||
# to be checked back into the pool. | ||
with self._max_connecting_cond: | ||
while (self._pending >= self._max_connecting and | ||
not self.sockets): | ||
if not _cond_wait(self._max_connecting_cond, deadline): | ||
# timeout | ||
emitted_event = True | ||
self._raise_wait_queue_timeout() | ||
|
||
try: | ||
sock_info = self.sockets.popleft() | ||
except IndexError: | ||
# Can raise ConnectionFailure or CertificateError. | ||
sock_info = self.connect(all_credentials) | ||
else: | ||
except IndexError: | ||
self._pending += 1 | ||
if sock_info: # We got a socket from the pool | ||
if self._perished(sock_info): | ||
sock_info = None | ||
continue | ||
else: # We need to create a new connection | ||
try: | ||
sock_info = self.connect(all_credentials) | ||
finally: | ||
with self._max_connecting_cond: | ||
self._pending -= 1 | ||
self._max_connecting_cond.notify() | ||
sock_info.check_auth(all_credentials) | ||
except Exception: | ||
if sock_info: | ||
|
@@ -1293,7 +1355,7 @@ def _get_socket(self, all_credentials): | |
with self.lock: | ||
self.active_sockets -= 1 | ||
|
||
if self.enabled_for_cmap: | ||
if self.enabled_for_cmap and not emitted_event: | ||
self.opts.event_listeners.publish_connection_check_out_failed( | ||
self.address, ConnectionCheckOutFailedReason.CONN_ERROR) | ||
raise | ||
|
@@ -1324,6 +1386,8 @@ def return_socket(self, sock_info): | |
sock_info.update_last_checkin_time() | ||
sock_info.update_is_writable(self.is_writable) | ||
self.sockets.appendleft(sock_info) | ||
# Notify any threads waiting to create a connection. | ||
self._max_connecting_cond.notify() | ||
|
||
self._socket_semaphore.release() | ||
with self.lock: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
{ | ||
"version": 1, | ||
"style": "integration", | ||
"description": "maxConnecting is enforced", | ||
"runOn": [ | ||
{ | ||
"minServerVersion": "4.4.0" | ||
} | ||
], | ||
"failPoint": { | ||
"configureFailPoint": "failCommand", | ||
"mode": { | ||
"times": 50 | ||
}, | ||
"data": { | ||
"failCommands": [ | ||
"isMaster" | ||
], | ||
"closeConnection": false, | ||
"blockConnection": true, | ||
"blockTimeMS": 750 | ||
} | ||
}, | ||
"poolOptions": { | ||
"maxPoolSize": 10, | ||
"waitQueueTimeoutMS": 5000 | ||
}, | ||
"operations": [ | ||
{ | ||
"name": "start", | ||
"target": "thread1" | ||
}, | ||
{ | ||
"name": "checkOut", | ||
"thread": "thread1" | ||
}, | ||
{ | ||
"name": "start", | ||
"target": "thread2" | ||
}, | ||
{ | ||
"name": "wait", | ||
"thread": "thread2", | ||
"ms": 100 | ||
}, | ||
{ | ||
"name": "checkOut", | ||
"thread": "thread2" | ||
}, | ||
{ | ||
"name": "start", | ||
"target": "thread3" | ||
}, | ||
{ | ||
"name": "wait", | ||
"thread": "thread3", | ||
"ms": 100 | ||
}, | ||
{ | ||
"name": "checkOut", | ||
"thread": "thread3" | ||
}, | ||
{ | ||
"name": "waitForEvent", | ||
"event": "ConnectionReady", | ||
"count": 3 | ||
} | ||
], | ||
"events": [ | ||
{ | ||
"type": "ConnectionCreated", | ||
"address": 42, | ||
"connectionId": 1 | ||
}, | ||
{ | ||
"type": "ConnectionCreated", | ||
"address": 42 | ||
}, | ||
{ | ||
"type": "ConnectionReady", | ||
"address": 42, | ||
"connectionId": 1 | ||
}, | ||
{ | ||
"type": "ConnectionCreated", | ||
"address": 42 | ||
}, | ||
{ | ||
"type": "ConnectionReady", | ||
"address": 42 | ||
}, | ||
{ | ||
"type": "ConnectionReady", | ||
"address": 42 | ||
} | ||
], | ||
"ignore": [ | ||
"ConnectionCheckOutStarted", | ||
"ConnectionCheckedIn", | ||
"ConnectionCheckedOut", | ||
"ConnectionClosed", | ||
"ConnectionPoolCreated" | ||
] | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is because of the 4.4+ streaming SDAM behavior which requires 2 sockets per server.