Skip to content

Commit 7b7ab23

Browse files
committed
Simplify BlockingConnectionPool
1 parent 5bd622c commit 7b7ab23

File tree

2 files changed

+25
-93
lines changed

2 files changed

+25
-93
lines changed

redis/asyncio/connection.py

Lines changed: 22 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,6 +1015,13 @@ def reset(self):
10151015
self._available_connections = []
10161016
self._in_use_connections = set()
10171017

1018+
def can_get_connection(self) -> bool:
1019+
"""Return True if a connection can be retrieved from the pool."""
1020+
return (
1021+
self._available_connections
1022+
or self._created_connections < self.max_connections
1023+
)
1024+
10181025
async def get_connection(self, command_name, *keys, **options):
10191026
"""Get a connection from the pool"""
10201027
try:
@@ -1102,19 +1109,19 @@ class BlockingConnectionPool(ConnectionPool):
11021109
"""
11031110
A blocking connection pool::
11041111
1105-
>>> from redis.client import Redis
1112+
>>> from redis.asyncio.client import Redis
11061113
>>> client = Redis(connection_pool=BlockingConnectionPool())
11071114
11081115
It performs the same function as the default
1109-
:py:class:`~redis.ConnectionPool` implementation, in that,
1116+
:py:class:`~redis.asyncio.ConnectionPool` implementation, in that,
11101117
it maintains a pool of reusable connections that can be shared by
11111118
multiple async redis clients.
11121119
11131120
The difference is that, in the event that a client tries to get a
11141121
connection from the pool when all of connections are in use, rather than
11151122
raising a :py:class:`~redis.ConnectionError` (as the default
1116-
:py:class:`~redis.ConnectionPool` implementation does), it
1117-
makes the client wait ("blocks") for a specified number of seconds until
1123+
:py:class:`~redis.asyncio.ConnectionPool` implementation does), it
1124+
makes blocks the current `Task` for a specified number of seconds until
11181125
a connection becomes available.
11191126
11201127
Use ``max_connections`` to increase / decrease the pool size::
@@ -1137,107 +1144,29 @@ def __init__(
11371144
max_connections: int = 50,
11381145
timeout: Optional[int] = 20,
11391146
connection_class: Type[AbstractConnection] = Connection,
1140-
queue_class: Type[asyncio.Queue] = asyncio.LifoQueue,
1147+
queue_class: Type[asyncio.Queue] = asyncio.LifoQueue, # deprecated
11411148
**connection_kwargs,
11421149
):
11431150

1144-
self.queue_class = queue_class
1145-
self.timeout = timeout
1146-
self._connections: List[AbstractConnection]
11471151
super().__init__(
11481152
connection_class=connection_class,
11491153
max_connections=max_connections,
11501154
**connection_kwargs,
11511155
)
1152-
self._lock = asyncio.Lock()
1153-
1154-
def reset(self):
1155-
# Create and fill up a queue with ``None`` values.
1156-
self.pool = self.queue_class(self.max_connections)
1157-
while True:
1158-
try:
1159-
self.pool.put_nowait(None)
1160-
except asyncio.QueueFull:
1161-
break
1162-
1163-
# Keep a list of actual connection instances so that we can
1164-
# disconnect them later.
1165-
self._connections = []
1166-
1167-
def make_connection(self):
1168-
"""Make a fresh connection."""
1169-
connection = self.connection_class(**self.connection_kwargs)
1170-
self._connections.append(connection)
1171-
return connection
1156+
self._condition = asyncio.Condition()
1157+
self.timeout = timeout
11721158

11731159
async def get_connection(self, command_name, *keys, **options):
1174-
"""
1175-
Get a connection, blocking for ``self.timeout`` until a connection
1176-
is available from the pool.
1177-
1178-
If the connection returned is ``None`` then creates a new connection.
1179-
Because we use a last-in first-out queue, the existing connections
1180-
(having been returned to the pool after the initial ``None`` values
1181-
were added) will be returned before ``None`` values. This means we only
1182-
create new connections when we need to, i.e.: the actual number of
1183-
connections will only increase in response to demand.
1184-
"""
1185-
1186-
# Try and get a connection from the pool. If one isn't available within
1187-
# self.timeout then raise a ``ConnectionError``.
1188-
connection = None
11891160
try:
11901161
async with async_timeout(self.timeout):
1191-
connection = await self.pool.get()
1192-
except (asyncio.QueueEmpty, asyncio.TimeoutError):
1193-
# Note that this is not caught by the redis client and will be
1194-
# raised unless handled by application code. If you want never to
1195-
raise ConnectionError("No connection available.")
1196-
1197-
# If the ``connection`` is actually ``None`` then that's a cue to make
1198-
# a new connection to add to the pool.
1199-
if connection is None:
1200-
connection = self.make_connection()
1201-
1202-
try:
1203-
# ensure this connection is connected to Redis
1204-
await connection.connect()
1205-
# connections that the pool provides should be ready to send
1206-
# a command. if not, the connection was either returned to the
1207-
# pool before all data has been read or the socket has been
1208-
# closed. either way, reconnect and verify everything is good.
1209-
try:
1210-
if await connection.can_read_destructive():
1211-
raise ConnectionError("Connection has data") from None
1212-
except (ConnectionError, OSError):
1213-
await connection.disconnect()
1214-
await connection.connect()
1215-
if await connection.can_read_destructive():
1216-
raise ConnectionError("Connection not ready") from None
1217-
except BaseException:
1218-
# release the connection back to the pool so that we don't leak it
1219-
await self.release(connection)
1220-
raise
1221-
1222-
return connection
1162+
async with self._condition:
1163+
await self._condition.wait_for(self.can_get_connection)
1164+
return await super().get_connection(command_name, *keys, **options)
1165+
except asyncio.TimeoutError as err:
1166+
raise ConnectionError("No connection available.") from err
12231167

12241168
async def release(self, connection: AbstractConnection):
12251169
"""Releases the connection back to the pool."""
1226-
# Put the connection back into the pool.
1227-
try:
1228-
self.pool.put_nowait(connection)
1229-
except asyncio.QueueFull:
1230-
# perhaps the pool has been reset() after a fork? regardless,
1231-
# we don't want this connection
1232-
pass
1233-
1234-
async def disconnect(self, inuse_connections: bool = True):
1235-
"""Disconnects all connections in the pool."""
1236-
async with self._lock:
1237-
resp = await asyncio.gather(
1238-
*(connection.disconnect() for connection in self._connections),
1239-
return_exceptions=True,
1240-
)
1241-
exc = next((r for r in resp if isinstance(r, BaseException)), None)
1242-
if exc:
1243-
raise exc
1170+
async with self._condition:
1171+
await super().release(connection)
1172+
self._condition.notify()

tests/test_asyncio/test_connection_pool.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ class DummyConnection(Connection):
9494
def __init__(self, **kwargs):
9595
self.kwargs = kwargs
9696

97+
def repr_pieces(self):
98+
return [("id", id(self)), ("kwargs", self.kwargs)]
99+
97100
async def connect(self):
98101
pass
99102

0 commit comments

Comments
 (0)