@@ -1237,6 +1237,19 @@ class ConnectionPool:
1237
1237
``connection_class``.
1238
1238
"""
1239
1239
1240
+ __slots__ = (
1241
+ "connection_class" ,
1242
+ "connection_kwargs" ,
1243
+ "max_connections" ,
1244
+ "_fork_lock" ,
1245
+ "_lock" ,
1246
+ "_created_connections" ,
1247
+ "_available_connections" ,
1248
+ "_in_use_connections" ,
1249
+ "encoder_class" ,
1250
+ "pid" ,
1251
+ )
1252
+
1240
1253
@classmethod
1241
1254
def from_url (cls : Type [_CP ], url : str , ** kwargs ) -> _CP :
1242
1255
"""
@@ -1519,36 +1532,34 @@ class BlockingConnectionPool(ConnectionPool):
1519
1532
>>> pool = BlockingConnectionPool(timeout=5)
1520
1533
"""
1521
1534
1535
+ __slots__ = (
1536
+ "queue_class" ,
1537
+ "timeout" ,
1538
+ "pool" ,
1539
+ )
1540
+
1522
1541
def __init__ (
1523
1542
self ,
1524
1543
max_connections : int = 50 ,
1525
1544
timeout : Optional [int ] = 20 ,
1526
1545
connection_class : Type [Connection ] = Connection ,
1527
- queue_class : Type [asyncio .Queue ] = asyncio .LifoQueue ,
1546
+ queue_class : Type [asyncio .Queue ] = asyncio .Queue ,
1528
1547
** connection_kwargs ,
1529
1548
):
1530
1549
1531
1550
self .queue_class = queue_class
1532
1551
self .timeout = timeout
1533
- self ._connections : List [Connection ]
1534
1552
super ().__init__ (
1535
1553
connection_class = connection_class ,
1536
1554
max_connections = max_connections ,
1537
1555
** connection_kwargs ,
1538
1556
)
1539
1557
1540
1558
def reset (self ):
1541
- # Create and fill up a thread safe queue with ``None`` values.
1559
+ # a queue of ready connections. populated lazily
1542
1560
self .pool = self .queue_class (self .max_connections )
1543
- while True :
1544
- try :
1545
- self .pool .put_nowait (None )
1546
- except asyncio .QueueFull :
1547
- break
1548
-
1549
- # Keep a list of actual connection instances so that we can
1550
- # disconnect them later.
1551
- self ._connections = []
1561
+ # used to decide wether we can allocate new connection or wait
1562
+ self ._created_connections = 0
1552
1563
1553
1564
# this must be the last operation in this method. while reset() is
1554
1565
# called when holding _fork_lock, other threads in this process
@@ -1562,41 +1573,36 @@ def reset(self):
1562
1573
self .pid = os .getpid ()
1563
1574
1564
1575
def make_connection (self ):
1565
- """Make a fresh connection."""
1566
- connection = self .connection_class (** self .connection_kwargs )
1567
- self ._connections .append (connection )
1568
- return connection
1576
+ """Create a new connection"""
1577
+ self ._created_connections += 1
1578
+ return self .connection_class (** self .connection_kwargs )
1569
1579
1570
1580
async def get_connection (self , command_name , * keys , ** options ):
1571
1581
"""
1572
1582
Get a connection, blocking for ``self.timeout`` until a connection
1573
1583
is available from the pool.
1574
1584
1575
- If the connection returned is ``None`` then creates a new connection.
1576
- Because we use a last-in first-out queue, the existing connections
1577
- (having been returned to the pool after the initial ``None`` values
1578
- were added) will be returned before ``None`` values. This means we only
1579
- create new connections when we need to, i.e.: the actual number of
1580
- connections will only increase in response to demand.
1585
+ Checks internal connection counter to ensure connections are allocated lazily.
1581
1586
"""
1582
1587
# Make sure we haven't changed process.
1583
1588
self ._checkpid ()
1584
1589
1585
- # Try and get a connection from the pool. If one isn't available within
1586
- # self.timeout then raise a ``ConnectionError``.
1587
- connection = None
1588
- try :
1589
- async with async_timeout .timeout (self .timeout ):
1590
- connection = await self .pool .get ()
1591
- except (asyncio .QueueEmpty , asyncio .TimeoutError ):
1592
- # Note that this is not caught by the redis client and will be
1593
- # raised unless handled by application code. If you want never to
1594
- raise ConnectionError ("No connection available." )
1595
-
1596
- # If the ``connection`` is actually ``None`` then that's a cue to make
1597
- # a new connection to add to the pool.
1598
- if connection is None :
1599
- connection = self .make_connection ()
1590
+ # if we are under max_connections, try getting one immediately. if it fails
1591
+ # it is ok to allocate new one
1592
+ if self ._created_connections < self .max_connections :
1593
+ try :
1594
+ connection = self .pool .get_nowait ()
1595
+ except asyncio .QueueEmpty :
1596
+ connection = self .make_connection ()
1597
+ else :
1598
+ # wait for available connection
1599
+ try :
1600
+ async with async_timeout .timeout (self .timeout ):
1601
+ connection = await self .pool .get ()
1602
+ except asyncio .TimeoutError :
1603
+ # Note that this is not caught by the redis client and will be
1604
+ # raised unless handled by application code.
1605
+ raise ConnectionError ("No connection available." )
1600
1606
1601
1607
try :
1602
1608
# ensure this connection is connected to Redis
@@ -1646,7 +1652,10 @@ async def disconnect(self, inuse_connections: bool = True):
1646
1652
self ._checkpid ()
1647
1653
async with self ._lock :
1648
1654
resp = await asyncio .gather (
1649
- * (connection .disconnect () for connection in self ._connections ),
1655
+ * (
1656
+ self .pool .get_nowait ().disconnect ()
1657
+ for _ in range (self .pool .qsize ())
1658
+ ),
1650
1659
return_exceptions = True ,
1651
1660
)
1652
1661
exc = next ((r for r in resp if isinstance (r , BaseException )), None )
0 commit comments