-
Notifications
You must be signed in to change notification settings - Fork 1.1k
PYTHON-2462 Avoid connection storms: implement pool PAUSED state #531
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 10 commits
9c5fe16
7db31ba
5a1dd76
6d27ccc
d1e6435
81ccba5
fce48f8
f8b2d76
3532c64
0e24b46
307dbfe
b48d230
beb061f
8cba5d2
0e0001b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,7 +30,7 @@ | |
from bson import DEFAULT_CODEC_OPTIONS | ||
from bson.py3compat import imap, itervalues, _unicode, PY3 | ||
from bson.son import SON | ||
from pymongo import auth, helpers, thread_util, __version__ | ||
from pymongo import auth, helpers, __version__ | ||
from pymongo.client_session import _validate_session_write_concern | ||
from pymongo.common import (MAX_BSON_SIZE, | ||
MAX_CONNECTING, | ||
|
@@ -46,6 +46,7 @@ | |
CertificateError, | ||
ConnectionFailure, | ||
ConfigurationError, | ||
ExceededMaxWaiters, | ||
InvalidOperation, | ||
DocumentTooLarge, | ||
NetworkTimeout, | ||
|
@@ -309,7 +310,8 @@ class PoolOptions(object): | |
'__wait_queue_timeout', '__wait_queue_multiple', | ||
'__ssl_context', '__ssl_match_hostname', '__socket_keepalive', | ||
'__event_listeners', '__appname', '__driver', '__metadata', | ||
'__compression_settings', '__max_connecting') | ||
'__compression_settings', '__max_connecting', | ||
'__pause_enabled') | ||
|
||
def __init__(self, max_pool_size=MAX_POOL_SIZE, | ||
min_pool_size=MIN_POOL_SIZE, | ||
|
@@ -318,7 +320,8 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE, | |
wait_queue_multiple=None, ssl_context=None, | ||
ssl_match_hostname=True, socket_keepalive=True, | ||
event_listeners=None, appname=None, driver=None, | ||
compression_settings=None, max_connecting=MAX_CONNECTING): | ||
compression_settings=None, max_connecting=MAX_CONNECTING, | ||
pause_enabled=True): | ||
|
||
self.__max_pool_size = max_pool_size | ||
self.__min_pool_size = min_pool_size | ||
|
@@ -335,6 +338,7 @@ def __init__(self, max_pool_size=MAX_POOL_SIZE, | |
self.__driver = driver | ||
self.__compression_settings = compression_settings | ||
self.__max_connecting = max_connecting | ||
self.__pause_enabled = pause_enabled | ||
self.__metadata = copy.deepcopy(_METADATA) | ||
if appname: | ||
self.__metadata['application'] = {'name': appname} | ||
|
@@ -406,6 +410,10 @@ def max_connecting(self): | |
""" | ||
return self.__max_connecting | ||
|
||
@property | ||
def pause_enabled(self): | ||
return self.__pause_enabled | ||
|
||
@property | ||
def max_idle_time_seconds(self): | ||
"""The maximum number of seconds that a connection can remain | ||
|
@@ -1058,6 +1066,8 @@ class _PoolClosedError(PyMongoError): | |
pass | ||
|
||
|
||
PAUSED, READY, CLOSED = range(3) | ||
|
||
# Do *not* explicitly inherit from object or Jython won't call __del__ | ||
# http://bugs.jython.org/issue1057 | ||
class Pool: | ||
|
@@ -1068,6 +1078,9 @@ def __init__(self, address, options, handshake=True): | |
- `options`: a PoolOptions instance | ||
- `handshake`: whether to call ismaster for each new SocketInfo | ||
""" | ||
self.state = READY | ||
if options.pause_enabled: | ||
self.state = PAUSED | ||
# Check a socket's health with socket_closed() every once in a while. | ||
# Can override for testing: 0 to always check, None to never check. | ||
self._check_interval_seconds = 1 | ||
|
@@ -1079,7 +1092,6 @@ def __init__(self, address, options, handshake=True): | |
self.active_sockets = 0 | ||
# Monotonically increasing connection ID required for CMAP Events. | ||
self.next_connection_id = 1 | ||
self.closed = False | ||
# Track whether the sockets in this pool are writeable or not. | ||
self.is_writable = None | ||
|
||
|
@@ -1098,13 +1110,23 @@ def __init__(self, address, options, handshake=True): | |
|
||
if (self.opts.wait_queue_multiple is None or | ||
self.opts.max_pool_size is None): | ||
max_waiters = None | ||
max_waiters = float('inf') | ||
else: | ||
max_waiters = ( | ||
self.opts.max_pool_size * self.opts.wait_queue_multiple) | ||
|
||
self._socket_semaphore = thread_util.create_semaphore( | ||
self.opts.max_pool_size, max_waiters) | ||
# The first portion of the wait queue. | ||
# Enforces: maxPoolSize and waitQueueMultiple | ||
# Also used for: clearing the wait queue | ||
self.size_cond = threading.Condition(self.lock) | ||
self.requests = 0 | ||
prashantmital marked this conversation as resolved.
Show resolved
Hide resolved
|
||
self.max_pool_size = self.opts.max_pool_size | ||
if self.max_pool_size is None: | ||
self.max_pool_size = float('inf') | ||
self.waiters = 0 | ||
self.max_waiters = max_waiters | ||
# The second portion of the wait queue. | ||
# Enforces: maxConnecting | ||
# Also used for: clearing the wait queue | ||
self._max_connecting_cond = threading.Condition(self.lock) | ||
self._max_connecting = self.opts.max_connecting | ||
self._pending = 0 | ||
|
@@ -1114,10 +1136,23 @@ def __init__(self, address, options, handshake=True): | |
# Similar to active_sockets but includes threads in the wait queue. | ||
self.operation_count = 0 | ||
|
||
def _reset(self, close): | ||
with self.lock: | ||
def ready(self): | ||
old_state, self.state = self.state, READY | ||
if old_state != READY: | ||
if self.enabled_for_cmap: | ||
self.opts.event_listeners.publish_pool_ready(self.address) | ||
|
||
@property | ||
def closed(self): | ||
return self.state == CLOSED | ||
|
||
def _reset(self, close, pause=True): | ||
old_state = self.state | ||
with self.size_cond: | ||
if self.closed: | ||
return | ||
if self.opts.pause_enabled and pause: | ||
old_state, self.state = self.state, PAUSED | ||
self.generation += 1 | ||
newpid = os.getpid() | ||
if self.pid != newpid: | ||
|
@@ -1126,7 +1161,10 @@ def _reset(self, close): | |
self.operation_count = 0 | ||
sockets, self.sockets = self.sockets, collections.deque() | ||
if close: | ||
self.closed = True | ||
self.state = CLOSED | ||
# Clear the wait queue | ||
self._max_connecting_cond.notify_all() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So when this happens and there are threads waiting to create a connection, they will get notified (via either size_cond or max_connecting_cond) and end up raising an exception since the pool is not ready? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, exactly that. |
||
self.size_cond.notify_all() | ||
|
||
listeners = self.opts.event_listeners | ||
# CMAP spec says that close() MUST close sockets before publishing the | ||
|
@@ -1138,7 +1176,7 @@ def _reset(self, close): | |
if self.enabled_for_cmap: | ||
listeners.publish_pool_closed(self.address) | ||
else: | ||
if self.enabled_for_cmap: | ||
if old_state != PAUSED and self.enabled_for_cmap: | ||
listeners.publish_pool_cleared(self.address) | ||
for sock_info in sockets: | ||
sock_info.close_socket(ConnectionClosedReason.STALE) | ||
|
@@ -1155,6 +1193,9 @@ def update_is_writable(self, is_writable): | |
def reset(self): | ||
self._reset(close=False) | ||
|
||
def reset_without_pause(self): | ||
self._reset(close=False, pause=False) | ||
|
||
def close(self): | ||
self._reset(close=True) | ||
|
||
|
@@ -1164,6 +1205,9 @@ def remove_stale_sockets(self, reference_generation, all_credentials): | |
`generation` at the point in time this operation was requested on the | ||
pool. | ||
""" | ||
if self.state != READY: | ||
return | ||
|
||
if self.opts.max_idle_time_seconds is not None: | ||
with self.lock: | ||
while (self.sockets and | ||
|
@@ -1172,15 +1216,14 @@ def remove_stale_sockets(self, reference_generation, all_credentials): | |
sock_info.close_socket(ConnectionClosedReason.IDLE) | ||
|
||
while True: | ||
with self.lock: | ||
with self.size_cond: | ||
# There are enough sockets in the pool. | ||
if (len(self.sockets) + self.active_sockets >= | ||
self.opts.min_pool_size): | ||
# There are enough sockets in the pool. | ||
return | ||
|
||
# We must acquire the semaphore to respect max_pool_size. | ||
if not self._socket_semaphore.acquire(False): | ||
return | ||
if self.requests >= self.opts.min_pool_size: | ||
return | ||
self.requests += 1 | ||
incremented = False | ||
try: | ||
with self._max_connecting_cond: | ||
|
@@ -1204,7 +1247,10 @@ def remove_stale_sockets(self, reference_generation, all_credentials): | |
with self._max_connecting_cond: | ||
self._pending -= 1 | ||
self._max_connecting_cond.notify() | ||
self._socket_semaphore.release() | ||
|
||
with self.size_cond: | ||
self.requests -= 1 | ||
self.size_cond.notify() | ||
|
||
def connect(self, all_credentials=None): | ||
"""Connect to Mongo and return a new SocketInfo. | ||
|
@@ -1289,6 +1335,15 @@ def get_socket(self, all_credentials, checkout=False): | |
if not checkout: | ||
self.return_socket(sock_info) | ||
|
||
def _raise_if_not_ready(self): | ||
if self.opts.pause_enabled and self.state == PAUSED: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason we don't simply check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed. |
||
if self.enabled_for_cmap: | ||
self.opts.event_listeners.publish_connection_check_out_failed( | ||
self.address, ConnectionCheckOutFailedReason.CONN_ERROR) | ||
# TODO: ensure this error is retryable | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you intend to address this TODO in this PR? Does it need a test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @patrickfreed have you thought about this question yet? How will we test that this error is retryable in all drivers? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left a comment discussing a few of the options on the SPEC PR: mongodb/specifications#878 (comment) As right now, I'd like to wait until retryable reads gets the unified format tests before personally writing any of them for the spec so we can get this project closed out, though I could add a prose test to it though if you think we should. Something like:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you open a new DRIVERS ticket to add this test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Opened https://jira.mongodb.org/browse/DRIVERS-1483, though it made me realize that errors during connection establishment may or may not be retryable. The spec is mostly silent on it. I think we definitely want this error to be retryable though, since no attempt at the command is actually made. I think given this confusion, we'll need to sort out this test before we close out DRIVERS-781. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So it seems to hinge on https://jira.mongodb.org/browse/DRIVERS-746, which may be a larger effort than we'd want to drag into DRIVERS-781. We could consider not testing this behavior since we're technically not making anything worse than it already is (those threads in the WaitQueue will likely fail to make their connections anyways). Ideally we would complete DRIVERS-746 and then could have this retry though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added a test for this in python. Pymongo already retries after (retryable) connection handshake errors so the test passes as expected. |
||
_raise_connection_failure( | ||
self.address, AutoReconnect('connection pool paused')) | ||
|
||
def _get_socket(self, all_credentials): | ||
"""Get or create a SocketInfo. Can raise ConnectionFailure.""" | ||
# We use the pid here to avoid issues with fork / multiprocessing. | ||
|
@@ -1313,9 +1368,26 @@ def _get_socket(self, all_credentials): | |
deadline = _time() + self.opts.wait_queue_timeout | ||
else: | ||
deadline = None | ||
if not self._socket_semaphore.acquire( | ||
True, self.opts.wait_queue_timeout): | ||
self._raise_wait_queue_timeout() | ||
|
||
with self.size_cond: | ||
self._raise_if_not_ready() | ||
if self.waiters >= self.max_waiters: | ||
prashantmital marked this conversation as resolved.
Show resolved
Hide resolved
|
||
raise ExceededMaxWaiters( | ||
'exceeded max waiters: %s threads already waiting' % ( | ||
self.waiters)) | ||
self.waiters += 1 | ||
try: | ||
while not (self.requests < self.max_pool_size): | ||
if not _cond_wait(self.size_cond, deadline): | ||
# Timed out, notify the next thread to ensure a | ||
# timeout doesn't consume the condition. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure I follow the logic here. Under what circumstance would we enter this next There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment attempts to explain it. This extra logic fixes the following race:
This change fixes the bug by notifying the next thread in the wait queue before raising a timeout. |
||
if self.requests < self.max_pool_size: | ||
self.size_cond.notify() | ||
self._raise_wait_queue_timeout() | ||
self._raise_if_not_ready() | ||
finally: | ||
self.waiters -= 1 | ||
self.requests += 1 | ||
|
||
# We've now acquired the semaphore and must release it on error. | ||
sock_info = None | ||
|
@@ -1330,6 +1402,9 @@ def _get_socket(self, all_credentials): | |
# CMAP: we MUST wait for either maxConnecting OR for a socket | ||
# to be checked back into the pool. | ||
with self._max_connecting_cond: | ||
emitted_event = True | ||
self._raise_if_not_ready() | ||
emitted_event = False | ||
while not (self.sockets or | ||
self._pending < self._max_connecting): | ||
if not _cond_wait(self._max_connecting_cond, deadline): | ||
|
@@ -1340,6 +1415,9 @@ def _get_socket(self, all_credentials): | |
self._max_connecting_cond.notify() | ||
emitted_event = True | ||
self._raise_wait_queue_timeout() | ||
emitted_event = True | ||
self._raise_if_not_ready() | ||
emitted_event = False | ||
|
||
try: | ||
sock_info = self.sockets.popleft() | ||
|
@@ -1361,11 +1439,11 @@ def _get_socket(self, all_credentials): | |
if sock_info: | ||
# We checked out a socket but authentication failed. | ||
sock_info.close_socket(ConnectionClosedReason.ERROR) | ||
self._socket_semaphore.release() | ||
|
||
if incremented: | ||
with self.lock: | ||
with self.size_cond: | ||
self.requests -= 1 | ||
if incremented: | ||
self.active_sockets -= 1 | ||
self.size_cond.notify() | ||
|
||
if self.enabled_for_cmap and not emitted_event: | ||
self.opts.event_listeners.publish_connection_check_out_failed( | ||
|
@@ -1401,10 +1479,11 @@ def return_socket(self, sock_info): | |
# Notify any threads waiting to create a connection. | ||
self._max_connecting_cond.notify() | ||
|
||
self._socket_semaphore.release() | ||
with self.lock: | ||
with self.size_cond: | ||
self.requests -= 1 | ||
self.active_sockets -= 1 | ||
self.operation_count -= 1 | ||
self.size_cond.notify() | ||
|
||
def _perished(self, sock_info): | ||
"""Return True and close the connection if it is "perished". | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a
class
enum for this like we do elsewhere (e.g.UuidRepresentation
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.