@@ -998,7 +998,6 @@ def __init__(
998
998
self .connection_kwargs = connection_kwargs
999
999
self .max_connections = max_connections
1000
1000
1001
- self ._lock = asyncio .Lock ()
1002
1001
self ._created_connections : int
1003
1002
self ._available_connections : List [AbstractConnection ]
1004
1003
self ._in_use_connections : Set [AbstractConnection ]
@@ -1012,19 +1011,17 @@ def __repr__(self):
1012
1011
)
1013
1012
1014
1013
def reset (self ):
1015
- self ._lock = asyncio .Lock ()
1016
1014
self ._created_connections = 0
1017
1015
self ._available_connections = []
1018
1016
self ._in_use_connections = set ()
1019
1017
1020
1018
async def get_connection (self , command_name , * keys , ** options ):
1021
1019
"""Get a connection from the pool"""
1022
- async with self ._lock :
1023
- try :
1024
- connection = self ._available_connections .pop ()
1025
- except IndexError :
1026
- connection = self .make_connection ()
1027
- self ._in_use_connections .add (connection )
1020
+ try :
1021
+ connection = self ._available_connections .pop ()
1022
+ except IndexError :
1023
+ connection = self .make_connection ()
1024
+ self ._in_use_connections .add (connection )
1028
1025
1029
1026
try :
1030
1027
# ensure this connection is connected to Redis
@@ -1067,11 +1064,10 @@ def make_connection(self):
1067
1064
1068
1065
async def release (self , connection : AbstractConnection ):
1069
1066
"""Releases the connection back to the pool"""
1070
- async with self ._lock :
1071
- # Connections should always be returned to the correct pool,
1072
- # not doing so is an error that will cause an exception here.
1073
- self ._in_use_connections .remove (connection )
1074
- self ._available_connections .append (connection )
1067
+ # Connections should always be returned to the correct pool,
1068
+ # not doing so is an error that will cause an exception here.
1069
+ self ._in_use_connections .remove (connection )
1070
+ self ._available_connections .append (connection )
1075
1071
1076
1072
async def disconnect (self , inuse_connections : bool = True ):
1077
1073
"""
@@ -1081,20 +1077,19 @@ async def disconnect(self, inuse_connections: bool = True):
1081
1077
current in use, potentially by other tasks. Otherwise only disconnect
1082
1078
connections that are idle in the pool.
1083
1079
"""
1084
- async with self ._lock :
1085
- if inuse_connections :
1086
- connections : Iterable [AbstractConnection ] = chain (
1087
- self ._available_connections , self ._in_use_connections
1088
- )
1089
- else :
1090
- connections = self ._available_connections
1091
- resp = await asyncio .gather (
1092
- * (connection .disconnect () for connection in connections ),
1093
- return_exceptions = True ,
1080
+ if inuse_connections :
1081
+ connections : Iterable [AbstractConnection ] = chain (
1082
+ self ._available_connections , self ._in_use_connections
1094
1083
)
1095
- exc = next ((r for r in resp if isinstance (r , BaseException )), None )
1096
- if exc :
1097
- raise exc
1084
+ else :
1085
+ connections = self ._available_connections
1086
+ resp = await asyncio .gather (
1087
+ * (connection .disconnect () for connection in connections ),
1088
+ return_exceptions = True ,
1089
+ )
1090
+ exc = next ((r for r in resp if isinstance (r , BaseException )), None )
1091
+ if exc :
1092
+ raise exc
1098
1093
1099
1094
def set_retry (self , retry : "Retry" ) -> None :
1100
1095
for conn in self ._available_connections :
@@ -1154,6 +1149,7 @@ def __init__(
1154
1149
max_connections = max_connections ,
1155
1150
** connection_kwargs ,
1156
1151
)
1152
+ self ._lock = asyncio .Lock ()
1157
1153
1158
1154
def reset (self ):
1159
1155
# Create and fill up a queue with ``None`` values.
0 commit comments