@@ -998,10 +998,8 @@ def __init__(
998
998
self .connection_kwargs = connection_kwargs
999
999
self .max_connections = max_connections
1000
1000
1001
- self ._created_connections : int
1002
- self ._available_connections : List [AbstractConnection ]
1003
- self ._in_use_connections : Set [AbstractConnection ]
1004
- self .reset () # lgtm [py/init-calls-subclass]
1001
+ self ._available_connections : List [AbstractConnection ] = []
1002
+ self ._in_use_connections : Set [AbstractConnection ] = set ()
1005
1003
self .encoder_class = self .connection_kwargs .get ("encoder_class" , Encoder )
1006
1004
1007
1005
def __repr__ (self ):
@@ -1011,43 +1009,29 @@ def __repr__(self):
1011
1009
)
1012
1010
1013
1011
def reset (self ):
1014
- self ._created_connections = 0
1015
1012
self ._available_connections = []
1016
1013
self ._in_use_connections = set ()
1017
1014
1018
1015
def can_get_connection (self ) -> bool :
1019
1016
"""Return True if a connection can be retrieved from the pool."""
1020
1017
return (
1021
1018
self ._available_connections
1022
- or self ._created_connections < self .max_connections
1019
+ or len ( self ._in_use_connections ) < self .max_connections
1023
1020
)
1024
1021
1025
1022
async def get_connection (self , command_name , * keys , ** options ):
1026
1023
"""Get a connection from the pool"""
1027
1024
try :
1028
1025
connection = self ._available_connections .pop ()
1029
1026
except IndexError :
1027
+ if len (self ._in_use_connections ) >= self .max_connections :
1028
+ raise ConnectionError ("Too many connections" ) from None
1030
1029
connection = self .make_connection ()
1031
1030
self ._in_use_connections .add (connection )
1032
1031
1033
1032
try :
1034
- # ensure this connection is connected to Redis
1035
- await connection .connect ()
1036
- # connections that the pool provides should be ready to send
1037
- # a command. if not, the connection was either returned to the
1038
- # pool before all data has been read or the socket has been
1039
- # closed. either way, reconnect and verify everything is good.
1040
- try :
1041
- if await connection .can_read_destructive ():
1042
- raise ConnectionError ("Connection has data" ) from None
1043
- except (ConnectionError , OSError ):
1044
- await connection .disconnect ()
1045
- await connection .connect ()
1046
- if await connection .can_read_destructive ():
1047
- raise ConnectionError ("Connection not ready" ) from None
1033
+ await self .ensure_connection (connection )
1048
1034
except BaseException :
1049
- # release the connection back to the pool so that we don't
1050
- # leak it
1051
1035
await self .release (connection )
1052
1036
raise
1053
1037
@@ -1063,12 +1047,25 @@ def get_encoder(self):
1063
1047
)
1064
1048
1065
1049
def make_connection (self ):
1066
- """Create a new connection"""
1067
- if self ._created_connections >= self .max_connections :
1068
- raise ConnectionError ("Too many connections" )
1069
- self ._created_connections += 1
1050
+ """Create a new connection. Can be overridden by child classes."""
1070
1051
return self .connection_class (** self .connection_kwargs )
1071
1052
1053
+ async def ensure_connection (self , connection : AbstractConnection ):
1054
+ """Ensure that the connection object is connected and valid"""
1055
+ await connection .connect ()
1056
+ # connections that the pool provides should be ready to send
1057
+ # a command. if not, the connection was either returned to the
1058
+ # pool before all data has been read or the socket has been
1059
+ # closed. either way, reconnect and verify everything is good.
1060
+ try :
1061
+ if await connection .can_read_destructive ():
1062
+ raise ConnectionError ("Connection has data" ) from None
1063
+ except (ConnectionError , OSError ):
1064
+ await connection .disconnect ()
1065
+ await connection .connect ()
1066
+ if await connection .can_read_destructive ():
1067
+ raise ConnectionError ("Connection not ready" ) from None
1068
+
1072
1069
async def release (self , connection : AbstractConnection ):
1073
1070
"""Releases the connection back to the pool"""
1074
1071
# Connections should always be returned to the correct pool,
0 commit comments