Skip to content

Cherry-pick for 4.3.5 #2468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Nov 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
cc07235
fix is_connected (#2278)
dvora-h Jul 21, 2022
a4ff8a6
fix: workaround asyncio bug on connection reset by peer (#2259)
sileht Jul 24, 2022
de903f2
Fix crash: key expire while search (#2270)
dvora-h Jul 24, 2022
7098ea4
async_cluster: fix concurrent pipeline (#2280)
utkarshgupta137 Jul 24, 2022
2c13bb4
Add support for TIMESERIES 1.8 (#2296)
dvora-h Jul 24, 2022
60a9a5a
Graph - add counters for removed labels and properties (#2292)
DvirDukhan Jul 26, 2022
272e463
Add support for async GRAPH module (#2273)
dvora-h Jul 28, 2022
f9ea68f
Add support for `TDIGEST.QUANTILE` extensions (#2317)
dvora-h Aug 2, 2022
3bd062f
Fix async SEARCH pipeline (#2316)
dvora-h Aug 2, 2022
800dc0d
Support TDIGEST.MERGESTORE and make compression optional on TDIGEST.C…
dvora-h Aug 2, 2022
ebe48eb
Search test - Ignore order of the items in the response (#2322)
dvora-h Aug 4, 2022
c255630
Add TDIGEST.TRIMMED_MEAN (#2300)
bodevone Aug 4, 2022
fb9e133
Add support for WITHSUFFIXTRIE to FT.CREATE (#2324)
dvora-h Aug 4, 2022
3a68ff5
Fix GRAPH.LIST & TDIGEST.QUANTILE tests (#2335)
dvora-h Aug 14, 2022
f17b9e8
fix test (#2358)
dvora-h Aug 25, 2022
7325ccd
Adding reserve as an alias for create, so that we have BF.RESERVE and…
chayim Aug 29, 2022
3db5e5e
Mark `TOPK.COUNT` as deprecated (#2363)
dvora-h Sep 4, 2022
2c4b386
Fix KeyError in async cluster - initialize before execute multi key c…
dvora-h Oct 30, 2022
738d0c7
update modules
dvora-h Nov 21, 2022
d088178
Remove `deprecated` dependency (#2386)
akx Oct 30, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
* Added dynaminc_startup_nodes configuration to RedisCluster
* Fix reusing the old nodes' connections when cluster topology refresh is being done
* Fix RedisCluster to immediately raise AuthenticationError without a retry
* ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225)
* Remove compatibility code for old versions of Hiredis, drop Packaging dependency
* The `deprecated` library is no longer a dependency

* 4.1.3 (Feb 8, 2022)
* Fix flushdb and flushall (#1926)
* Add redis5 and redis4 dockers (#1871)
Expand Down
18 changes: 9 additions & 9 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,6 @@ class ClusterNode:
"""

__slots__ = (
"_command_stack",
"_connections",
"_free",
"connection_class",
Expand Down Expand Up @@ -796,7 +795,6 @@ def __init__(

self._connections: List[Connection] = []
self._free: Deque[Connection] = collections.deque(maxlen=self.max_connections)
self._command_stack: List["PipelineCommand"] = []

def __repr__(self) -> str:
return (
Expand Down Expand Up @@ -887,18 +885,18 @@ async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
# Release connection
self._free.append(connection)

async def execute_pipeline(self) -> bool:
async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
# Acquire connection
connection = self.acquire_connection()

# Execute command
await connection.send_packed_command(
connection.pack_commands(cmd.args for cmd in self._command_stack), False
connection.pack_commands(cmd.args for cmd in commands), False
)

# Read responses
ret = False
for cmd in self._command_stack:
for cmd in commands:
try:
cmd.result = await self.parse_response(
connection, cmd.args[0], **cmd.kwargs
Expand Down Expand Up @@ -1365,12 +1363,14 @@ async def _execute(

node = target_nodes[0]
if node.name not in nodes:
nodes[node.name] = node
node._command_stack = []
node._command_stack.append(cmd)
nodes[node.name] = (node, [])
nodes[node.name][1].append(cmd)

errors = await asyncio.gather(
*(asyncio.ensure_future(node.execute_pipeline()) for node in nodes.values())
*(
asyncio.ensure_future(node[0].execute_pipeline(node[1]))
for node in nodes.values()
)
)

if any(errors):
Expand Down
13 changes: 11 additions & 2 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ def __del__(self):

@property
def is_connected(self):
return self._reader and self._writer
return self._reader is not None and self._writer is not None

def register_connect_callback(self, callback):
self._connect_callbacks.append(weakref.WeakMethod(callback))
Expand Down Expand Up @@ -767,7 +767,16 @@ async def _connect(self):
def _error_message(self, exception):
# args for socket.error can either be (errno, "message")
# or just "message"
if len(exception.args) == 1:
if not exception.args:
# asyncio has a bug where on Connection reset by peer, the
# exception is not instanciated, so args is empty. This is the
# workaround.
# See: https://github.com/redis/redis-py/issues/2237
# See: https://github.com/python/cpython/issues/94061
return (
f"Error connecting to {self.host}:{self.port}. Connection reset by peer"
)
elif len(exception.args) == 1:
return f"Error connecting to {self.host}:{self.port}. {exception.args[0]}."
else:
return (
Expand Down
9 changes: 7 additions & 2 deletions redis/commands/bf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,16 @@ def __init__(self, client, **kwargs):
# TDIGEST_RESET: bool_ok,
# TDIGEST_ADD: spaceHolder,
# TDIGEST_MERGE: spaceHolder,
TDIGEST_CDF: float,
TDIGEST_QUANTILE: float,
TDIGEST_CDF: parse_to_list,
TDIGEST_QUANTILE: parse_to_list,
TDIGEST_MIN: float,
TDIGEST_MAX: float,
TDIGEST_TRIMMED_MEAN: float,
TDIGEST_INFO: TDigestInfo,
TDIGEST_RANK: parse_to_list,
TDIGEST_REVRANK: parse_to_list,
TDIGEST_BYRANK: parse_to_list,
TDIGEST_BYREVRANK: parse_to_list,
}

self.client = client
Expand Down
103 changes: 81 additions & 22 deletions redis/commands/bf/commands.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from redis.client import NEVER_DECODE
from redis.exceptions import ModuleError
from redis.utils import HIREDIS_AVAILABLE
from redis.utils import HIREDIS_AVAILABLE, deprecated_function

BF_RESERVE = "BF.RESERVE"
BF_ADD = "BF.ADD"
Expand Down Expand Up @@ -49,6 +49,11 @@
TDIGEST_MIN = "TDIGEST.MIN"
TDIGEST_MAX = "TDIGEST.MAX"
TDIGEST_INFO = "TDIGEST.INFO"
TDIGEST_TRIMMED_MEAN = "TDIGEST.TRIMMED_MEAN"
TDIGEST_RANK = "TDIGEST.RANK"
TDIGEST_REVRANK = "TDIGEST.REVRANK"
TDIGEST_BYRANK = "TDIGEST.BYRANK"
TDIGEST_BYREVRANK = "TDIGEST.BYREVRANK"


class BFCommands:
Expand All @@ -67,6 +72,8 @@ def create(self, key, errorRate, capacity, expansion=None, noScale=None):
self.append_no_scale(params, noScale)
return self.execute_command(BF_RESERVE, *params)

reserve = create

def add(self, key, item):
"""
Add to a Bloom Filter `key` an `item`.
Expand Down Expand Up @@ -176,6 +183,8 @@ def create(
self.append_max_iterations(params, max_iterations)
return self.execute_command(CF_RESERVE, *params)

reserve = create

def add(self, key, item):
"""
Add an `item` to a Cuckoo Filter `key`.
Expand Down Expand Up @@ -316,6 +325,7 @@ def query(self, key, *items):
""" # noqa
return self.execute_command(TOPK_QUERY, key, *items)

@deprecated_function(version="4.4.0", reason="deprecated since redisbloom 2.4.0")
def count(self, key, *items):
"""
Return count for one `item` or more from `key`.
Expand Down Expand Up @@ -344,12 +354,12 @@ def info(self, key):


class TDigestCommands:
def create(self, key, compression):
def create(self, key, compression=100):
"""
Allocate the memory and initialize the t-digest.
For more information see `TDIGEST.CREATE <https://redis.io/commands/tdigest.create>`_.
""" # noqa
return self.execute_command(TDIGEST_CREATE, key, compression)
return self.execute_command(TDIGEST_CREATE, key, "COMPRESSION", compression)

def reset(self, key):
"""
Expand All @@ -358,26 +368,30 @@ def reset(self, key):
""" # noqa
return self.execute_command(TDIGEST_RESET, key)

def add(self, key, values, weights):
def add(self, key, values):
"""
Add one or more samples (value with weight) to a sketch `key`.
Both `values` and `weights` are lists.
For more information see `TDIGEST.ADD <https://redis.io/commands/tdigest.add>`_.
Adds one or more observations to a t-digest sketch `key`.

Example:

>>> tdigestadd('A', [1500.0], [1.0])
For more information see `TDIGEST.ADD <https://redis.io/commands/tdigest.add>`_.
""" # noqa
params = [key]
self.append_values_and_weights(params, values, weights)
return self.execute_command(TDIGEST_ADD, *params)
return self.execute_command(TDIGEST_ADD, key, *values)

def merge(self, toKey, fromKey):
def merge(self, destination_key, num_keys, *keys, compression=None, override=False):
"""
Merge all of the values from 'fromKey' to 'toKey' sketch.
Merges all of the values from `keys` to 'destination-key' sketch.
It is mandatory to provide the `num_keys` before passing the input keys and
the other (optional) arguments.
If `destination_key` already exists its values are merged with the input keys.
If you wish to override the destination key contents use the `OVERRIDE` parameter.

For more information see `TDIGEST.MERGE <https://redis.io/commands/tdigest.merge>`_.
""" # noqa
return self.execute_command(TDIGEST_MERGE, toKey, fromKey)
params = [destination_key, num_keys, *keys]
if compression is not None:
params.extend(["COMPRESSION", compression])
if override:
params.append("OVERRIDE")
return self.execute_command(TDIGEST_MERGE, *params)

def min(self, key):
"""
Expand All @@ -393,20 +407,21 @@ def max(self, key):
""" # noqa
return self.execute_command(TDIGEST_MAX, key)

def quantile(self, key, quantile):
def quantile(self, key, quantile, *quantiles):
"""
Return double value estimate of the cutoff such that a specified fraction of the data
added to this TDigest would be less than or equal to the cutoff.
Returns estimates of one or more cutoffs such that a specified fraction of the
observations added to this t-digest would be less than or equal to each of the
specified cutoffs. (Multiple quantiles can be returned with one call)
For more information see `TDIGEST.QUANTILE <https://redis.io/commands/tdigest.quantile>`_.
""" # noqa
return self.execute_command(TDIGEST_QUANTILE, key, quantile)
return self.execute_command(TDIGEST_QUANTILE, key, quantile, *quantiles)

def cdf(self, key, value):
def cdf(self, key, value, *values):
"""
Return double fraction of all points added which are <= value.
For more information see `TDIGEST.CDF <https://redis.io/commands/tdigest.cdf>`_.
""" # noqa
return self.execute_command(TDIGEST_CDF, key, value)
return self.execute_command(TDIGEST_CDF, key, value, *values)

def info(self, key):
"""
Expand All @@ -416,6 +431,50 @@ def info(self, key):
""" # noqa
return self.execute_command(TDIGEST_INFO, key)

def trimmed_mean(self, key, low_cut_quantile, high_cut_quantile):
"""
Return mean value from the sketch, excluding observation values outside
the low and high cutoff quantiles.
For more information see `TDIGEST.TRIMMED_MEAN <https://redis.io/commands/tdigest.trimmed_mean>`_.
""" # noqa
return self.execute_command(
TDIGEST_TRIMMED_MEAN, key, low_cut_quantile, high_cut_quantile
)

def rank(self, key, value, *values):
"""
Retrieve the estimated rank of value (the number of observations in the sketch
that are smaller than value + half the number of observations that are equal to value).

For more information see `TDIGEST.RANK <https://redis.io/commands/tdigest.rank>`_.
""" # noqa
return self.execute_command(TDIGEST_RANK, key, value, *values)

def revrank(self, key, value, *values):
"""
Retrieve the estimated rank of value (the number of observations in the sketch
that are larger than value + half the number of observations that are equal to value).

For more information see `TDIGEST.REVRANK <https://redis.io/commands/tdigest.revrank>`_.
""" # noqa
return self.execute_command(TDIGEST_REVRANK, key, value, *values)

def byrank(self, key, rank, *ranks):
"""
Retrieve an estimation of the value with the given rank.

For more information see `TDIGEST.BY_RANK <https://redis.io/commands/tdigest.by_rank>`_.
""" # noqa
return self.execute_command(TDIGEST_BYRANK, key, rank, *ranks)

def byrevrank(self, key, rank, *ranks):
"""
Retrieve an estimation of the value with the given reverse rank.

For more information see `TDIGEST.BY_REVRANK <https://redis.io/commands/tdigest.by_revrank>`_.
""" # noqa
return self.execute_command(TDIGEST_BYREVRANK, key, rank, *ranks)


class CMSCommands:
"""Count-Min Sketch Commands"""
Expand Down
22 changes: 12 additions & 10 deletions redis/commands/bf/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,20 @@ def __init__(self, args):
class TDigestInfo(object):
compression = None
capacity = None
mergedNodes = None
unmergedNodes = None
mergedWeight = None
unmergedWeight = None
totalCompressions = None
merged_nodes = None
unmerged_nodes = None
merged_weight = None
unmerged_weight = None
total_compressions = None
memory_usage = None

def __init__(self, args):
response = dict(zip(map(nativestr, args[::2]), args[1::2]))
self.compression = response["Compression"]
self.capacity = response["Capacity"]
self.mergedNodes = response["Merged nodes"]
self.unmergedNodes = response["Unmerged nodes"]
self.mergedWeight = response["Merged weight"]
self.unmergedWeight = response["Unmerged weight"]
self.totalCompressions = response["Total compressions"]
self.merged_nodes = response["Merged nodes"]
self.unmerged_nodes = response["Unmerged nodes"]
self.merged_weight = response["Merged weight"]
self.unmerged_weight = response["Unmerged weight"]
self.total_compressions = response["Total compressions"]
self.memory_usage = response["Memory usage"]
3 changes: 3 additions & 0 deletions redis/commands/bf/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
def parse_tdigest_quantile(response):
"""Parse TDIGEST.QUANTILE response."""
return [float(x) for x in response]
19 changes: 19 additions & 0 deletions redis/commands/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,25 @@ async def _split_command_across_slots(self, command: str, *keys: KeyT) -> int:
# Sum up the reply from each command
return sum(await self._execute_pipeline_by_slot(command, slots_to_keys))

async def _execute_pipeline_by_slot(
self, command: str, slots_to_args: Mapping[int, Iterable[EncodableT]]
) -> List[Any]:
if self._initialize:
await self.initialize()
read_from_replicas = self.read_from_replicas and command in READ_COMMANDS
pipe = self.pipeline()
[
pipe.execute_command(
command,
*slot_args,
target_nodes=[
self.nodes_manager.get_node_from_slot(slot, read_from_replicas)
],
)
for slot, slot_args in slots_to_args.items()
]
return await pipe.execute()


class ClusterManagementCommands(ManagementCommands):
"""
Expand Down
Loading