-
Notifications
You must be signed in to change notification settings - Fork 2.6k
BlockingConnectionPool simplification: use counter instead of connection list #2518
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1237,6 +1237,19 @@ class ConnectionPool: | |
``connection_class``. | ||
""" | ||
|
||
__slots__ = ( | ||
"connection_class", | ||
"connection_kwargs", | ||
"max_connections", | ||
"_fork_lock", | ||
"_lock", | ||
"_created_connections", | ||
"_available_connections", | ||
"_in_use_connections", | ||
"encoder_class", | ||
"pid", | ||
) | ||
|
||
@classmethod | ||
def from_url(cls: Type[_CP], url: str, **kwargs) -> _CP: | ||
""" | ||
|
@@ -1519,36 +1532,37 @@ class BlockingConnectionPool(ConnectionPool): | |
>>> pool = BlockingConnectionPool(timeout=5) | ||
""" | ||
|
||
__slots__ = ( | ||
"queue_class", | ||
"timeout", | ||
"pool", | ||
) | ||
|
||
def __init__( | ||
self, | ||
max_connections: int = 50, | ||
timeout: Optional[int] = 20, | ||
connection_class: Type[Connection] = Connection, | ||
queue_class: Type[asyncio.Queue] = asyncio.LifoQueue, | ||
queue_class: Type[asyncio.Queue] = asyncio.Queue, | ||
**connection_kwargs, | ||
): | ||
|
||
self.queue_class = queue_class | ||
self.timeout = timeout | ||
self._connections: List[Connection] | ||
self._in_use_connections: Set[Connection] | ||
|
||
super().__init__( | ||
connection_class=connection_class, | ||
max_connections=max_connections, | ||
**connection_kwargs, | ||
) | ||
|
||
def reset(self): | ||
# Create and fill up a thread safe queue with ``None`` values. | ||
# a queue of ready connections. populated lazily | ||
self.pool = self.queue_class(self.max_connections) | ||
while True: | ||
try: | ||
self.pool.put_nowait(None) | ||
except asyncio.QueueFull: | ||
break | ||
|
||
# Keep a list of actual connection instances so that we can | ||
# disconnect them later. | ||
self._connections = [] | ||
# used to decide wether we can allocate new connection or wait | ||
self._created_connections = 0 | ||
# keep track of connections that are outside queue to close them | ||
self._in_use_connections = set() | ||
|
||
# this must be the last operation in this method. while reset() is | ||
# called when holding _fork_lock, other threads in this process | ||
|
@@ -1562,42 +1576,40 @@ def reset(self): | |
self.pid = os.getpid() | ||
|
||
def make_connection(self): | ||
"""Make a fresh connection.""" | ||
connection = self.connection_class(**self.connection_kwargs) | ||
self._connections.append(connection) | ||
return connection | ||
"""Create a new connection""" | ||
self._created_connections += 1 | ||
return self.connection_class(**self.connection_kwargs) | ||
|
||
async def get_connection(self, command_name, *keys, **options): | ||
""" | ||
Get a connection, blocking for ``self.timeout`` until a connection | ||
is available from the pool. | ||
|
||
If the connection returned is ``None`` then creates a new connection. | ||
Because we use a last-in first-out queue, the existing connections | ||
(having been returned to the pool after the initial ``None`` values | ||
were added) will be returned before ``None`` values. This means we only | ||
create new connections when we need to, i.e.: the actual number of | ||
connections will only increase in response to demand. | ||
Checks internal connection counter to ensure connections are allocated lazily. | ||
""" | ||
# Make sure we haven't changed process. | ||
self._checkpid() | ||
|
||
# Try and get a connection from the pool. If one isn't available within | ||
# self.timeout then raise a ``ConnectionError``. | ||
connection = None | ||
try: | ||
async with async_timeout.timeout(self.timeout): | ||
connection = await self.pool.get() | ||
except (asyncio.QueueEmpty, asyncio.TimeoutError): | ||
# Note that this is not caught by the redis client and will be | ||
# raised unless handled by application code. If you want never to | ||
raise ConnectionError("No connection available.") | ||
|
||
# If the ``connection`` is actually ``None`` then that's a cue to make | ||
# a new connection to add to the pool. | ||
if connection is None: | ||
connection = self.make_connection() | ||
|
||
# if we are under max_connections, try getting one immediately. if it fails | ||
# it is ok to allocate new one | ||
if self._created_connections < self.max_connections: | ||
try: | ||
connection = self.pool.get_nowait() | ||
except asyncio.QueueEmpty: | ||
connection = self.make_connection() | ||
else: | ||
# wait for available connection | ||
try: | ||
async with async_timeout.timeout(self.timeout): | ||
connection = await self.pool.get() | ||
except asyncio.TimeoutError: | ||
# Note that this is not caught by the redis client and will be | ||
# raised unless handled by application code. | ||
raise ConnectionError("No connection available.") | ||
|
||
# add to set before try block to ensure release does not try to .remove missing | ||
# value | ||
self._in_use_connections.add(connection) | ||
try: | ||
# ensure this connection is connected to Redis | ||
await connection.connect() | ||
|
@@ -1624,15 +1636,15 @@ async def release(self, connection: Connection): | |
"""Releases the connection back to the pool.""" | ||
# Make sure we haven't changed process. | ||
self._checkpid() | ||
|
||
if not self.owns_connection(connection): | ||
# pool doesn't own this connection. do not add it back | ||
# to the pool. instead add a None value which is a placeholder | ||
# that will cause the pool to recreate the connection if | ||
# its needed. | ||
# to the pool | ||
await connection.disconnect() | ||
self.pool.put_nowait(None) | ||
return | ||
|
||
self._in_use_connections.remove(connection) | ||
|
||
# Put the connection back into the pool. | ||
try: | ||
self.pool.put_nowait(connection) | ||
|
@@ -1646,7 +1658,15 @@ async def disconnect(self, inuse_connections: bool = True): | |
self._checkpid() | ||
async with self._lock: | ||
resp = await asyncio.gather( | ||
*(connection.disconnect() for connection in self._connections), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This disconnects only those connections in the pool, not all connections as before. |
||
*( | ||
self.pool.get_nowait().disconnect() | ||
for _ in range(self.pool.qsize()) | ||
), | ||
*( | ||
connection.disconnect() | ||
for connection in self._in_use_connections | ||
if inuse_connections | ||
), | ||
return_exceptions=True, | ||
) | ||
exc = next((r for r in resp if isinstance(r, BaseException)), None) | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The LifoQueue ensures that hot connections are the ones re-used first, which is probably beneficial.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was no difference in my simple tests, maybe i did something wrong but Lifo had 0 impact
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I´m not sure there will be a difference, or that this was the original intent of the LifoQueue. But LIFO is often used in caching, to maintain hot information. There is a degenerate case which can happen with Fifo, which is that if you pull always the oldest connections, you may end up with all the connections having timed out, or been disconnected due to idleness or other such things. But maybe that doesn't matter at all. Maybe the only intention of the LifoQueue was to keep those
None
objects at the end.