Skip to content

PYTHON-2363 Rate limit new connection creations via maxConnecting #511

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions doc/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,32 @@ to 100. If there are ``maxPoolSize`` connections to a server and all are in
use, the next request to that server will wait until one of the connections
becomes available.

The client instance opens one additional socket per server in your MongoDB
The client instance opens two additional sockets per server in your MongoDB
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is because of the 4.4+ streaming SDAM behavior which requires 2 sockets per server.

topology for monitoring the server's state.

For example, a client connected to a 3-node replica set opens 3 monitoring
For example, a client connected to a 3-node replica set opens 6 monitoring
sockets. It also opens as many sockets as needed to support a multi-threaded
application's concurrent operations on each server, up to ``maxPoolSize``. With
a ``maxPoolSize`` of 100, if the application only uses the primary (the
default), then only the primary connection pool grows and the total connections
is at most 103. If the application uses a
is at most 106. If the application uses a
:class:`~pymongo.read_preferences.ReadPreference` to query the secondaries,
their pools also grow and the total connections can reach 303.
their pools also grow and the total connections can reach 306.

Additionally, the pools are rate limited such that each connection pool can
only create at most 2 connections in parallel at any time. The connection
creation covers covers all the work required to setup a new connection
including DNS, TCP, SSL/TLS, MongoDB handshake, and MongoDB authentication.
For example, if three threads concurrently attempt to check out a connection
from an empty pool, the first two threads will begin creating new connections
while the third thread will wait. The third thread stops waiting when either:

- one of the first two threads finishes creating a connection, or
- an existing connection is checked back into the pool.

Rate limiting concurrent connection creation reduces the likelihood of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also say that users who are concerned about any additional latency this might result in can increase their minPoolSize to reduce the likelihood of having threads wait to create new connections?

Also, should we add a note about the impact this has at application start time when a minPoolSize is set? Presumably pools take longer to initialize in this case though I am not sure what impace this might have on applications.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not suggest increasing minPoolSize. We actually want threads to wait a bit before creating connections because it yields better throughput and latency in most cases (as evidenced by test_maxConnecting).

Also, should we add a note about the impact this has at application start time when a minPoolSize is set? Presumably pools take longer to initialize in this case though I am not sure what impace this might have on applications.

Pools shouldn't take longer because minPoolSize connection creation is (and always was) single threaded. MongoClient only creates a single connection for a single pool at once.

connection storms and improves the driver's ability to reuse existing
connections.

It is possible to set the minimum number of concurrent connections to each
server with ``minPoolSize``, which defaults to 0. The connection pool will be
Expand Down
3 changes: 3 additions & 0 deletions pymongo/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@
# Default value for minPoolSize.
MIN_POOL_SIZE = 0

# The maximum number of concurrent connection creation attempts per pool.
MAX_CONNECTING = 2

# Default value for maxIdleTimeMS.
MAX_IDLE_TIME_MS = None

Expand Down
94 changes: 81 additions & 13 deletions pymongo/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@
IPADDR_SAFE as _IPADDR_SAFE)

from bson import DEFAULT_CODEC_OPTIONS
from bson.py3compat import imap, itervalues, _unicode
from bson.py3compat import imap, itervalues, _unicode, PY3
from bson.son import SON
from pymongo import auth, helpers, thread_util, __version__
from pymongo.client_session import _validate_session_write_concern
from pymongo.common import (MAX_BSON_SIZE,
MAX_CONNECTING,
MAX_IDLE_TIME_SEC,
MAX_MESSAGE_SIZE,
MAX_POOL_SIZE,
Expand Down Expand Up @@ -285,6 +286,20 @@ def _raise_connection_failure(address, error, msg_prefix=None):
else:
raise AutoReconnect(msg)

if PY3:
def _cond_wait(condition, deadline):
timeout = deadline - _time() if deadline else None
return condition.wait(timeout)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add a comment here that wait() treats negative timeouts same as timeout=0?

else:
def _cond_wait(condition, deadline):
timeout = deadline - _time() if deadline else None
condition.wait(timeout)
# Python 2.7 always returns False for wait(),
# manually check for a timeout.
if timeout and _time() >= deadline:
return False
return True


class PoolOptions(object):

Expand All @@ -294,7 +309,7 @@ class PoolOptions(object):
'__wait_queue_timeout', '__wait_queue_multiple',
'__ssl_context', '__ssl_match_hostname', '__socket_keepalive',
'__event_listeners', '__appname', '__driver', '__metadata',
'__compression_settings')
'__compression_settings', '__max_connecting')

def __init__(self, max_pool_size=MAX_POOL_SIZE,
min_pool_size=MIN_POOL_SIZE,
Expand All @@ -303,7 +318,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
wait_queue_multiple=None, ssl_context=None,
ssl_match_hostname=True, socket_keepalive=True,
event_listeners=None, appname=None, driver=None,
compression_settings=None):
compression_settings=None, max_connecting=MAX_CONNECTING):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't configurable by users but I think it still made sense to add max_connecting to PoolOptions.


self.__max_pool_size = max_pool_size
self.__min_pool_size = min_pool_size
Expand All @@ -319,6 +334,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE,
self.__appname = appname
self.__driver = driver
self.__compression_settings = compression_settings
self.__max_connecting = max_connecting
self.__metadata = copy.deepcopy(_METADATA)
if appname:
self.__metadata['application'] = {'name': appname}
Expand Down Expand Up @@ -357,6 +373,8 @@ def non_default_options(self):
opts['maxIdleTimeMS'] = self.__max_idle_time_seconds * 1000
if self.__wait_queue_timeout != WAIT_QUEUE_TIMEOUT:
opts['waitQueueTimeoutMS'] = self.__wait_queue_timeout * 1000
if self.__max_connecting != MAX_CONNECTING:
opts['maxConnecting'] = self.__max_connecting
return opts

@property
Expand All @@ -381,6 +399,13 @@ def min_pool_size(self):
"""
return self.__min_pool_size

@property
def max_connecting(self):
"""The maximum number of concurrent connection creation attempts per
pool. Defaults to 2.
"""
return self.__max_connecting

@property
def max_idle_time_seconds(self):
"""The maximum number of seconds that a connection can remain
Expand Down Expand Up @@ -1080,6 +1105,9 @@ def __init__(self, address, options, handshake=True):

self._socket_semaphore = thread_util.create_semaphore(
self.opts.max_pool_size, max_waiters)
self._max_connecting_cond = threading.Condition(self.lock)
self._max_connecting = self.opts.max_connecting
self._pending = 0
if self.enabled_for_cmap:
self.opts.event_listeners.publish_pool_created(
self.address, self.opts.non_default_options)
Expand Down Expand Up @@ -1143,21 +1171,34 @@ def remove_stale_sockets(self, reference_generation, all_credentials):
if (len(self.sockets) + self.active_sockets >=
self.opts.min_pool_size):
# There are enough sockets in the pool.
break
return

# We must acquire the semaphore to respect max_pool_size.
if not self._socket_semaphore.acquire(False):
break
return
incremented = False
try:
with self._max_connecting_cond:
# If maxConnecting connections are already being created
# by this pool then try again later instead of waiting.
if self._pending >= self._max_connecting:
return
self._pending += 1
incremented = True
sock_info = self.connect(all_credentials)
with self.lock:
# Close connection and return if the pool was reset during
# socket creation or while acquiring the pool lock.
if self.generation != reference_generation:
sock_info.close_socket(ConnectionClosedReason.STALE)
break
return
self.sockets.appendleft(sock_info)
finally:
if incremented:
# Notify after adding the socket to the pool.
with self._max_connecting_cond:
self._pending -= 1
self._max_connecting_cond.notify()
self._socket_semaphore.release()

def connect(self, all_credentials=None):
Expand Down Expand Up @@ -1260,28 +1301,53 @@ def _get_socket(self, all_credentials):
'pool')

# Get a free socket or create one.
if self.opts.wait_queue_timeout:
deadline = _time() + self.opts.wait_queue_timeout
else:
deadline = None
if not self._socket_semaphore.acquire(
True, self.opts.wait_queue_timeout):
self._raise_wait_queue_timeout()

# We've now acquired the semaphore and must release it on error.
sock_info = None
incremented = False
emitted_event = False
try:
with self.lock:
self.active_sockets += 1
incremented = True

while sock_info is None:
try:
with self.lock:
# CMAP: we MUST wait for either maxConnecting OR for a socket
# to be checked back into the pool.
with self._max_connecting_cond:
while not (self.sockets or
self._pending < self._max_connecting):
if not _cond_wait(self._max_connecting_cond, deadline):
# Timed out, notify the next thread to ensure a
# timeout doesn't consume the condition.
if (self.sockets or
self._pending < self._max_connecting):
self._max_connecting_cond.notify()
emitted_event = True
self._raise_wait_queue_timeout()

try:
sock_info = self.sockets.popleft()
except IndexError:
# Can raise ConnectionFailure or CertificateError.
sock_info = self.connect(all_credentials)
else:
except IndexError:
self._pending += 1
if sock_info: # We got a socket from the pool
if self._perished(sock_info):
sock_info = None
continue
else: # We need to create a new connection
try:
sock_info = self.connect(all_credentials)
finally:
with self._max_connecting_cond:
self._pending -= 1
self._max_connecting_cond.notify()
sock_info.check_auth(all_credentials)
except Exception:
if sock_info:
Expand All @@ -1293,7 +1359,7 @@ def _get_socket(self, all_credentials):
with self.lock:
self.active_sockets -= 1

if self.enabled_for_cmap:
if self.enabled_for_cmap and not emitted_event:
self.opts.event_listeners.publish_connection_check_out_failed(
self.address, ConnectionCheckOutFailedReason.CONN_ERROR)
raise
Expand Down Expand Up @@ -1324,6 +1390,8 @@ def return_socket(self, sock_info):
sock_info.update_last_checkin_time()
sock_info.update_is_writable(self.is_writable)
self.sockets.appendleft(sock_info)
# Notify any threads waiting to create a connection.
self._max_connecting_cond.notify()

self._socket_semaphore.release()
with self.lock:
Expand Down
104 changes: 104 additions & 0 deletions test/cmap/pool-checkout-maxConnecting-is-enforced.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
{
"version": 1,
"style": "integration",
"description": "maxConnecting is enforced",
"runOn": [
{
"minServerVersion": "4.4.0"
}
],
"failPoint": {
"configureFailPoint": "failCommand",
"mode": {
"times": 50
},
"data": {
"failCommands": [
"isMaster"
],
"closeConnection": false,
"blockConnection": true,
"blockTimeMS": 750
}
},
"poolOptions": {
"maxPoolSize": 10,
"waitQueueTimeoutMS": 5000
},
"operations": [
{
"name": "start",
"target": "thread1"
},
{
"name": "checkOut",
"thread": "thread1"
},
{
"name": "start",
"target": "thread2"
},
{
"name": "wait",
"thread": "thread2",
"ms": 100
},
{
"name": "checkOut",
"thread": "thread2"
},
{
"name": "start",
"target": "thread3"
},
{
"name": "wait",
"thread": "thread3",
"ms": 100
},
{
"name": "checkOut",
"thread": "thread3"
},
{
"name": "waitForEvent",
"event": "ConnectionReady",
"count": 3
}
],
"events": [
{
"type": "ConnectionCreated",
"address": 42,
"connectionId": 1
},
{
"type": "ConnectionCreated",
"address": 42
},
{
"type": "ConnectionReady",
"address": 42,
"connectionId": 1
},
{
"type": "ConnectionCreated",
"address": 42
},
{
"type": "ConnectionReady",
"address": 42
},
{
"type": "ConnectionReady",
"address": 42
}
],
"ignore": [
"ConnectionCheckOutStarted",
"ConnectionCheckedIn",
"ConnectionCheckedOut",
"ConnectionClosed",
"ConnectionPoolCreated"
]
}
Loading