Skip to content

BlockingConnectionPool simplification: use counter instead of connection list #2518

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* Fix Sentinel.execute_command doesn't execute across the entire sentinel cluster bug (#2458)
* Added a replacement for the default cluster node in the event of failure (#2463)
* Fix for Unhandled exception related to self.host with unix socket (#2496)
* Simplified connection allocation code for asyncio.connection.BlockingConnectionPool

* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
Expand Down
108 changes: 64 additions & 44 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1237,6 +1237,19 @@ class ConnectionPool:
``connection_class``.
"""

__slots__ = (
"connection_class",
"connection_kwargs",
"max_connections",
"_fork_lock",
"_lock",
"_created_connections",
"_available_connections",
"_in_use_connections",
"encoder_class",
"pid",
)

@classmethod
def from_url(cls: Type[_CP], url: str, **kwargs) -> _CP:
"""
Expand Down Expand Up @@ -1519,36 +1532,37 @@ class BlockingConnectionPool(ConnectionPool):
>>> pool = BlockingConnectionPool(timeout=5)
"""

__slots__ = (
"queue_class",
"timeout",
"pool",
)

def __init__(
self,
max_connections: int = 50,
timeout: Optional[int] = 20,
connection_class: Type[Connection] = Connection,
queue_class: Type[asyncio.Queue] = asyncio.LifoQueue,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The LifoQueue ensures that hot connections are the ones re-used first, which is probably beneficial.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was no difference in my simple tests, maybe i did something wrong but Lifo had 0 impact

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I´m not sure there will be a difference, or that this was the original intent of the LifoQueue. But LIFO is often used in caching, to maintain hot information. There is a degenerate case which can happen with Fifo, which is that if you pull always the oldest connections, you may end up with all the connections having timed out, or been disconnected due to idleness or other such things. But maybe that doesn't matter at all. Maybe the only intention of the LifoQueue was to keep those None objects at the end.

queue_class: Type[asyncio.Queue] = asyncio.Queue,
**connection_kwargs,
):

self.queue_class = queue_class
self.timeout = timeout
self._connections: List[Connection]
self._in_use_connections: Set[Connection]

super().__init__(
connection_class=connection_class,
max_connections=max_connections,
**connection_kwargs,
)

def reset(self):
# Create and fill up a thread safe queue with ``None`` values.
# a queue of ready connections. populated lazily
self.pool = self.queue_class(self.max_connections)
while True:
try:
self.pool.put_nowait(None)
except asyncio.QueueFull:
break

# Keep a list of actual connection instances so that we can
# disconnect them later.
self._connections = []
# used to decide wether we can allocate new connection or wait
self._created_connections = 0
# keep track of connections that are outside queue to close them
self._in_use_connections = set()

# this must be the last operation in this method. while reset() is
# called when holding _fork_lock, other threads in this process
Expand All @@ -1562,42 +1576,40 @@ def reset(self):
self.pid = os.getpid()

def make_connection(self):
"""Make a fresh connection."""
connection = self.connection_class(**self.connection_kwargs)
self._connections.append(connection)
return connection
"""Create a new connection"""
self._created_connections += 1
return self.connection_class(**self.connection_kwargs)

async def get_connection(self, command_name, *keys, **options):
"""
Get a connection, blocking for ``self.timeout`` until a connection
is available from the pool.

If the connection returned is ``None`` then creates a new connection.
Because we use a last-in first-out queue, the existing connections
(having been returned to the pool after the initial ``None`` values
were added) will be returned before ``None`` values. This means we only
create new connections when we need to, i.e.: the actual number of
connections will only increase in response to demand.
Checks internal connection counter to ensure connections are allocated lazily.
"""
# Make sure we haven't changed process.
self._checkpid()

# Try and get a connection from the pool. If one isn't available within
# self.timeout then raise a ``ConnectionError``.
connection = None
try:
async with async_timeout.timeout(self.timeout):
connection = await self.pool.get()
except (asyncio.QueueEmpty, asyncio.TimeoutError):
# Note that this is not caught by the redis client and will be
# raised unless handled by application code. If you want never to
raise ConnectionError("No connection available.")

# If the ``connection`` is actually ``None`` then that's a cue to make
# a new connection to add to the pool.
if connection is None:
connection = self.make_connection()

# if we are under max_connections, try getting one immediately. if it fails
# it is ok to allocate new one
if self._created_connections < self.max_connections:
try:
connection = self.pool.get_nowait()
except asyncio.QueueEmpty:
connection = self.make_connection()
else:
# wait for available connection
try:
async with async_timeout.timeout(self.timeout):
connection = await self.pool.get()
except asyncio.TimeoutError:
# Note that this is not caught by the redis client and will be
# raised unless handled by application code.
raise ConnectionError("No connection available.")

# add to set before try block to ensure release does not try to .remove missing
# value
self._in_use_connections.add(connection)
try:
# ensure this connection is connected to Redis
await connection.connect()
Expand All @@ -1624,15 +1636,15 @@ async def release(self, connection: Connection):
"""Releases the connection back to the pool."""
# Make sure we haven't changed process.
self._checkpid()

if not self.owns_connection(connection):
# pool doesn't own this connection. do not add it back
# to the pool. instead add a None value which is a placeholder
# that will cause the pool to recreate the connection if
# its needed.
# to the pool
await connection.disconnect()
self.pool.put_nowait(None)
return

self._in_use_connections.remove(connection)

# Put the connection back into the pool.
try:
self.pool.put_nowait(connection)
Expand All @@ -1646,7 +1658,15 @@ async def disconnect(self, inuse_connections: bool = True):
self._checkpid()
async with self._lock:
resp = await asyncio.gather(
*(connection.disconnect() for connection in self._connections),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This disconnects only those connections in the pool, not all connections as before.

*(
self.pool.get_nowait().disconnect()
for _ in range(self.pool.qsize())
),
*(
connection.disconnect()
for connection in self._in_use_connections
if inuse_connections
),
return_exceptions=True,
)
exc = next((r for r in resp if isinstance(r, BaseException)), None)
Expand Down
6 changes: 3 additions & 3 deletions tests/test_asyncio/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,16 @@ async def call_with_retry(self, _, __):
mock_conn = mock.MagicMock()
mock_conn.retry = Retry_()

async def get_conn(_):
async def get_conn(*_):
# Validate only one client is created in single-client mode when
# concurrent requests are made
nonlocal init_call_count
await asyncio.sleep(0.01)
init_call_count += 1
return mock_conn

with mock.patch.object(r.connection_pool, "get_connection", get_conn):
with mock.patch.object(r.connection_pool, "release"):
with mock.patch.object(type(r.connection_pool), "get_connection", get_conn):
with mock.patch.object(type(r.connection_pool), "release"):
await asyncio.gather(r.set("a", "b"), r.set("c", "d"))

assert init_call_count == 1
Expand Down