Skip to content

Commit ab34b5d

Browse files
committed
Ensure connection creation can be subclassed via make_connection()
1 parent 7b7ab23 commit ab34b5d

File tree

1 file changed

+23
-26
lines changed

1 file changed

+23
-26
lines changed

redis/asyncio/connection.py

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -998,10 +998,8 @@ def __init__(
998998
self.connection_kwargs = connection_kwargs
999999
self.max_connections = max_connections
10001000

1001-
self._created_connections: int
1002-
self._available_connections: List[AbstractConnection]
1003-
self._in_use_connections: Set[AbstractConnection]
1004-
self.reset() # lgtm [py/init-calls-subclass]
1001+
self._available_connections: List[AbstractConnection] = []
1002+
self._in_use_connections: Set[AbstractConnection] = set()
10051003
self.encoder_class = self.connection_kwargs.get("encoder_class", Encoder)
10061004

10071005
def __repr__(self):
@@ -1011,43 +1009,29 @@ def __repr__(self):
10111009
)
10121010

10131011
def reset(self):
1014-
self._created_connections = 0
10151012
self._available_connections = []
10161013
self._in_use_connections = set()
10171014

10181015
def can_get_connection(self) -> bool:
10191016
"""Return True if a connection can be retrieved from the pool."""
10201017
return (
10211018
self._available_connections
1022-
or self._created_connections < self.max_connections
1019+
or len(self._in_use_connections) < self.max_connections
10231020
)
10241021

10251022
async def get_connection(self, command_name, *keys, **options):
10261023
"""Get a connection from the pool"""
10271024
try:
10281025
connection = self._available_connections.pop()
10291026
except IndexError:
1027+
if len(self._in_use_connections) >= self.max_connections:
1028+
raise ConnectionError("Too many connections") from None
10301029
connection = self.make_connection()
10311030
self._in_use_connections.add(connection)
10321031

10331032
try:
1034-
# ensure this connection is connected to Redis
1035-
await connection.connect()
1036-
# connections that the pool provides should be ready to send
1037-
# a command. if not, the connection was either returned to the
1038-
# pool before all data has been read or the socket has been
1039-
# closed. either way, reconnect and verify everything is good.
1040-
try:
1041-
if await connection.can_read_destructive():
1042-
raise ConnectionError("Connection has data") from None
1043-
except (ConnectionError, OSError):
1044-
await connection.disconnect()
1045-
await connection.connect()
1046-
if await connection.can_read_destructive():
1047-
raise ConnectionError("Connection not ready") from None
1033+
await self.ensure_connection(connection)
10481034
except BaseException:
1049-
# release the connection back to the pool so that we don't
1050-
# leak it
10511035
await self.release(connection)
10521036
raise
10531037

@@ -1063,12 +1047,25 @@ def get_encoder(self):
10631047
)
10641048

10651049
def make_connection(self):
1066-
"""Create a new connection"""
1067-
if self._created_connections >= self.max_connections:
1068-
raise ConnectionError("Too many connections")
1069-
self._created_connections += 1
1050+
"""Create a new connection. Can be overridden by child classes."""
10701051
return self.connection_class(**self.connection_kwargs)
10711052

1053+
async def ensure_connection(self, connection: AbstractConnection):
1054+
"""Ensure that the connection object is connected and valid"""
1055+
await connection.connect()
1056+
# connections that the pool provides should be ready to send
1057+
# a command. if not, the connection was either returned to the
1058+
# pool before all data has been read or the socket has been
1059+
# closed. either way, reconnect and verify everything is good.
1060+
try:
1061+
if await connection.can_read_destructive():
1062+
raise ConnectionError("Connection has data") from None
1063+
except (ConnectionError, OSError):
1064+
await connection.disconnect()
1065+
await connection.connect()
1066+
if await connection.can_read_destructive():
1067+
raise ConnectionError("Connection not ready") from None
1068+
10721069
async def release(self, connection: AbstractConnection):
10731070
"""Releases the connection back to the pool"""
10741071
# Connections should always be returned to the correct pool,

0 commit comments

Comments
 (0)