Skip to content

Commit 93c59c7

Browse files
authored
Merge branch 'master' into redis-pool
2 parents 1c80e16 + 8592cac commit 93c59c7

37 files changed

+663
-179
lines changed

CHANGES

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
* Fix string cleanse in Redis Graph
2+
* Make PythonParser resumable in case of error (#2510)
3+
* Add `timeout=None` in `SentinelConnectionManager.read_response`
14
* Documentation fix: password protected socket connection (#2374)
25
* Allow `timeout=None` in `PubSub.get_message()` to wait forever
36
* add `nowait` flag to `asyncio.Connection.disconnect()`
@@ -31,6 +34,7 @@
3134
* Fix Sentinel.execute_command doesn't execute across the entire sentinel cluster bug (#2458)
3235
* Added a replacement for the default cluster node in the event of failure (#2463)
3336
* Simplified connection allocation code for asyncio.connection.BlockingConnectionPool
37+
* Fix for Unhandled exception related to self.host with unix socket (#2496)
3438

3539
* 4.1.3 (Feb 8, 2022)
3640
* Fix flushdb and flushall (#1926)

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ redis-py 4.3.x will be the last generation of redis-py to support python 3.6 as
2121

2222
## Installation
2323

24+
Start a redis via docker:
25+
26+
``` bash
27+
docker run -p 6379:6379 -it redis/redis-stack:latest
28+
```
29+
2430
To install redis-py, simply:
2531

2632
``` bash

redis/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,10 @@ def int_or_str(value):
5353
__version__ = "99.99.99"
5454

5555

56-
VERSION = tuple(map(int_or_str, __version__.split(".")))
56+
try:
57+
VERSION = tuple(map(int_or_str, __version__.split(".")))
58+
except AttributeError:
59+
VERSION = tuple(99, 99, 99)
5760

5861
__all__ = [
5962
"AuthenticationError",

redis/asyncio/client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ def __del__(self, _warnings: Any = warnings) -> None:
453453
f"Unclosed client session {self!r}", ResourceWarning, source=self
454454
)
455455
context = {"client": self, "message": self._DEL_MESSAGE}
456-
asyncio.get_event_loop().call_exception_handler(context)
456+
asyncio.get_running_loop().call_exception_handler(context)
457457

458458
async def close(self, close_connection_pool: Optional[bool] = None) -> None:
459459
"""
@@ -483,8 +483,8 @@ async def _send_command_parse_response(self, conn, command_name, *args, **option
483483
async def _disconnect_raise(self, conn: Connection, error: Exception):
484484
"""
485485
Close the connection and raise an exception
486-
if retry_on_timeout is not set or the error
487-
is not a TimeoutError
486+
if retry_on_error is not set or the error
487+
is not one of the specified error types
488488
"""
489489
await conn.disconnect()
490490
if (
@@ -798,7 +798,7 @@ async def check_health(self):
798798

799799
if (
800800
conn.health_check_interval
801-
and asyncio.get_event_loop().time() > conn.next_health_check
801+
and asyncio.get_running_loop().time() > conn.next_health_check
802802
):
803803
await conn.send_command(
804804
"PING", self.HEALTH_CHECK_MESSAGE, check_health=False

redis/asyncio/connection.py

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -208,11 +208,18 @@ async def read_response(
208208
class PythonParser(BaseParser):
209209
"""Plain Python parsing class"""
210210

211-
__slots__ = BaseParser.__slots__ + ("encoder",)
211+
__slots__ = BaseParser.__slots__ + ("encoder", "_buffer", "_pos", "_chunks")
212212

213213
def __init__(self, socket_read_size: int):
214214
super().__init__(socket_read_size)
215215
self.encoder: Optional[Encoder] = None
216+
self._buffer = b""
217+
self._chunks = []
218+
self._pos = 0
219+
220+
def _clear(self):
221+
self._buffer = b""
222+
self._chunks.clear()
216223

217224
def on_connect(self, connection: "Connection"):
218225
"""Called when the stream connects"""
@@ -227,8 +234,11 @@ def on_disconnect(self):
227234
if self._stream is not None:
228235
self._stream = None
229236
self.encoder = None
237+
self._clear()
230238

231239
async def can_read_destructive(self) -> bool:
240+
if self._buffer:
241+
return True
232242
if self._stream is None:
233243
raise RedisError("Buffer is closed.")
234244
try:
@@ -237,14 +247,23 @@ async def can_read_destructive(self) -> bool:
237247
except asyncio.TimeoutError:
238248
return False
239249

240-
async def read_response(
250+
async def read_response(self, disable_decoding: bool = False):
251+
if self._chunks:
252+
# augment parsing buffer with previously read data
253+
self._buffer += b"".join(self._chunks)
254+
self._chunks.clear()
255+
self._pos = 0
256+
response = await self._read_response(disable_decoding=disable_decoding)
257+
# Successfully parsing a response allows us to clear our parsing buffer
258+
self._clear()
259+
return response
260+
261+
async def _read_response(
241262
self, disable_decoding: bool = False
242263
) -> Union[EncodableT, ResponseError, None]:
243264
if not self._stream or not self.encoder:
244265
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
245266
raw = await self._readline()
246-
if not raw:
247-
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
248267
response: Any
249268
byte, response = raw[:1], raw[1:]
250269

@@ -258,6 +277,7 @@ async def read_response(
258277
# if the error is a ConnectionError, raise immediately so the user
259278
# is notified
260279
if isinstance(error, ConnectionError):
280+
self._clear() # Successful parse
261281
raise error
262282
# otherwise, we're dealing with a ResponseError that might belong
263283
# inside a pipeline response. the connection's read_response()
@@ -282,7 +302,7 @@ async def read_response(
282302
if length == -1:
283303
return None
284304
response = [
285-
(await self.read_response(disable_decoding)) for _ in range(length)
305+
(await self._read_response(disable_decoding)) for _ in range(length)
286306
]
287307
if isinstance(response, bytes) and disable_decoding is False:
288308
response = self.encoder.decode(response)
@@ -293,25 +313,38 @@ async def _read(self, length: int) -> bytes:
293313
Read `length` bytes of data. These are assumed to be followed
294314
by a '\r\n' terminator which is subsequently discarded.
295315
"""
296-
if self._stream is None:
297-
raise RedisError("Buffer is closed.")
298-
try:
299-
data = await self._stream.readexactly(length + 2)
300-
except asyncio.IncompleteReadError as error:
301-
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error
302-
return data[:-2]
316+
want = length + 2
317+
end = self._pos + want
318+
if len(self._buffer) >= end:
319+
result = self._buffer[self._pos : end - 2]
320+
else:
321+
tail = self._buffer[self._pos :]
322+
try:
323+
data = await self._stream.readexactly(want - len(tail))
324+
except asyncio.IncompleteReadError as error:
325+
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from error
326+
result = (tail + data)[:-2]
327+
self._chunks.append(data)
328+
self._pos += want
329+
return result
303330

304331
async def _readline(self) -> bytes:
305332
"""
306333
read an unknown number of bytes up to the next '\r\n'
307334
line separator, which is discarded.
308335
"""
309-
if self._stream is None:
310-
raise RedisError("Buffer is closed.")
311-
data = await self._stream.readline()
312-
if not data.endswith(b"\r\n"):
313-
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
314-
return data[:-2]
336+
found = self._buffer.find(b"\r\n", self._pos)
337+
if found >= 0:
338+
result = self._buffer[self._pos : found]
339+
else:
340+
tail = self._buffer[self._pos :]
341+
data = await self._stream.readline()
342+
if not data.endswith(b"\r\n"):
343+
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
344+
result = (tail + data)[:-2]
345+
self._chunks.append(data)
346+
self._pos += len(result) + 2
347+
return result
315348

316349

317350
class HiredisParser(BaseParser):
@@ -532,7 +565,7 @@ def repr_pieces(self):
532565
def __del__(self):
533566
try:
534567
if self.is_connected:
535-
loop = asyncio.get_event_loop()
568+
loop = asyncio.get_running_loop()
536569
coro = self.disconnect()
537570
if loop.is_running():
538571
loop.create_task(coro)

redis/asyncio/lock.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,14 +201,14 @@ async def acquire(
201201
blocking_timeout = self.blocking_timeout
202202
stop_trying_at = None
203203
if blocking_timeout is not None:
204-
stop_trying_at = asyncio.get_event_loop().time() + blocking_timeout
204+
stop_trying_at = asyncio.get_running_loop().time() + blocking_timeout
205205
while True:
206206
if await self.do_acquire(token):
207207
self.local.token = token
208208
return True
209209
if not blocking:
210210
return False
211-
next_try_at = asyncio.get_event_loop().time() + sleep
211+
next_try_at = asyncio.get_running_loop().time() + sleep
212212
if stop_trying_at is not None and next_try_at > stop_trying_at:
213213
return False
214214
await asyncio.sleep(sleep)

redis/asyncio/sentinel.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import random
33
import weakref
4-
from typing import AsyncIterator, Iterable, Mapping, Sequence, Tuple, Type
4+
from typing import AsyncIterator, Iterable, Mapping, Optional, Sequence, Tuple, Type
55

66
from redis.asyncio.client import Redis
77
from redis.asyncio.connection import (
@@ -63,9 +63,16 @@ async def connect(self):
6363
lambda error: asyncio.sleep(0),
6464
)
6565

66-
async def read_response(self, disable_decoding: bool = False):
66+
async def read_response(
67+
self,
68+
disable_decoding: bool = False,
69+
timeout: Optional[float] = None,
70+
):
6771
try:
68-
return await super().read_response(disable_decoding=disable_decoding)
72+
return await super().read_response(
73+
disable_decoding=disable_decoding,
74+
timeout=timeout,
75+
)
6976
except ReadOnlyError:
7077
if self.connection_pool.is_master:
7178
# When talking to a master, a ReadOnlyError when likely

redis/cluster.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ def parse_cluster_shards(resp, **options):
121121
"charset",
122122
"connection_class",
123123
"connection_pool",
124+
"connection_pool_class",
124125
"client_name",
125126
"credential_provider",
126127
"db",
@@ -1267,6 +1268,7 @@ def __init__(
12671268
require_full_coverage=False,
12681269
lock=None,
12691270
dynamic_startup_nodes=True,
1271+
connection_pool_class=ConnectionPool,
12701272
**kwargs,
12711273
):
12721274
self.nodes_cache = {}
@@ -1277,6 +1279,7 @@ def __init__(
12771279
self.from_url = from_url
12781280
self._require_full_coverage = require_full_coverage
12791281
self._dynamic_startup_nodes = dynamic_startup_nodes
1282+
self.connection_pool_class = connection_pool_class
12801283
self._moved_exception = None
12811284
self.connection_kwargs = kwargs
12821285
self.read_load_balancer = LoadBalancer()
@@ -1420,7 +1423,7 @@ def create_redis_node(self, host, port, **kwargs):
14201423
# Create a redis node with a costumed connection pool
14211424
kwargs.update({"host": host})
14221425
kwargs.update({"port": port})
1423-
r = Redis(connection_pool=ConnectionPool(**kwargs))
1426+
r = Redis(connection_pool=self.connection_pool_class(**kwargs))
14241427
else:
14251428
r = Redis(host=host, port=port, **kwargs)
14261429
return r
@@ -1437,6 +1440,8 @@ def _get_or_create_cluster_node(self, host, port, role, tmp_nodes_cache):
14371440
if target_node is None or target_node.redis_connection is None:
14381441
# create new cluster node for this cluster
14391442
target_node = ClusterNode(host, port, role)
1443+
if target_node.server_type != role:
1444+
target_node.server_type = role
14401445

14411446
return target_node
14421447

redis/commands/bf/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ def __init__(self, client, **kwargs):
198198
# BF_MEXISTS: spaceHolder,
199199
# BF_SCANDUMP: spaceHolder,
200200
# BF_LOADCHUNK: spaceHolder,
201+
# BF_CARD: spaceHolder,
201202
BF_INFO: BFInfo,
202203
}
203204

redis/commands/bf/commands.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
BF_SCANDUMP = "BF.SCANDUMP"
1212
BF_LOADCHUNK = "BF.LOADCHUNK"
1313
BF_INFO = "BF.INFO"
14+
BF_CARD = "BF.CARD"
1415

1516
CF_RESERVE = "CF.RESERVE"
1617
CF_ADD = "CF.ADD"
@@ -165,6 +166,14 @@ def info(self, key):
165166
""" # noqa
166167
return self.execute_command(BF_INFO, key)
167168

169+
def card(self, key):
170+
"""
171+
Returns the cardinality of a Bloom filter - number of items that were added to a Bloom filter and detected as unique
172+
(items that caused at least one bit to be set in at least one sub-filter).
173+
For more information see `BF.CARD <https://redis.io/commands/bf.card>`_.
174+
""" # noqa
175+
return self.execute_command(BF_CARD, key)
176+
168177

169178
class CFCommands:
170179
"""Cuckoo Filter commands."""

redis/commands/cluster.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
[
5353
"BITCOUNT",
5454
"BITPOS",
55+
"EVAL_RO",
56+
"EVALSHA_RO",
5557
"EXISTS",
5658
"GEODIST",
5759
"GEOHASH",

redis/commands/core.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ def acl_setuser(
193193
selectors: Optional[Iterable[Tuple[str, KeyT]]] = None,
194194
reset: bool = False,
195195
reset_keys: bool = False,
196+
reset_channels: bool = False,
196197
reset_passwords: bool = False,
197198
**kwargs,
198199
) -> ResponseT:
@@ -248,6 +249,12 @@ def acl_setuser(
248249
key permissions will be kept and any new specified key permissions
249250
will be applied on top.
250251
252+
``reset_channels`` is a boolean indicating whether the user's channel
253+
permissions should be reset prior to applying any new channel permissions
254+
specified in ``channels``.If this is False, the user's existing
255+
channel permissions will be kept and any new specified channel permissions
256+
will be applied on top.
257+
251258
``reset_passwords`` is a boolean indicating whether to remove all
252259
existing passwords and the 'nopass' flag from the user prior to
253260
applying any new passwords specified in 'passwords' or
@@ -266,6 +273,9 @@ def acl_setuser(
266273
if reset_keys:
267274
pieces.append(b"resetkeys")
268275

276+
if reset_channels:
277+
pieces.append(b"resetchannels")
278+
269279
if reset_passwords:
270280
pieces.append(b"resetpass")
271281

@@ -2255,6 +2265,8 @@ def set(
22552265
pieces.append(int(ex.total_seconds()))
22562266
elif isinstance(ex, int):
22572267
pieces.append(ex)
2268+
elif isinstance(ex, str) and ex.isdigit():
2269+
pieces.append(int(ex))
22582270
else:
22592271
raise DataError("ex must be datetime.timedelta or int")
22602272
if px is not None:
@@ -5139,7 +5151,7 @@ def eval_ro(
51395151
"""
51405152
The read-only variant of the EVAL command
51415153
5142-
Execute the read-only Lue ``script`` specifying the ``numkeys`` the script
5154+
Execute the read-only Lua ``script`` specifying the ``numkeys`` the script
51435155
will touch and the key names and argument values in ``keys_and_args``.
51445156
Returns the result of the script.
51455157

redis/commands/graph/edge.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ def __str__(self):
6161
return res
6262

6363
def __eq__(self, rhs):
64+
# Type checking
65+
if not isinstance(rhs, Edge):
66+
return False
67+
6468
# Quick positive check, if both IDs are set.
6569
if self.id is not None and rhs.id is not None and self.id == rhs.id:
6670
return True

0 commit comments

Comments
 (0)