Skip to content

feat(redis): Add support for redis.asyncio #1933

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b7adeb5
Add support for redis asyncio
Zhenay Mar 1, 2023
4d5c009
python 2.7
Zhenay Mar 1, 2023
24a420a
Clean garbage
Zhenay Mar 1, 2023
7539af7
Change version of fakeredis
Zhenay Mar 1, 2023
27d605e
Install pytest-asyncio only for python3
Zhenay Mar 1, 2023
9ca3ae2
Merge branch 'master' into Zhenay/931-redis-asyncio
Zhenay Mar 17, 2023
582c832
Add py3.10 and py3.11 in test matrix
Zhenay Mar 17, 2023
4190091
Run split-tox-gh-actions.py
Zhenay Mar 17, 2023
db8ada2
Merge branch 'master' into Zhenay/931-redis-asyncio
antonpirker Apr 12, 2023
906525e
Merge branch 'master' into Zhenay/931-redis-asyncio
Zhenay Apr 19, 2023
f47427c
Merge branch 'master' into Zhenay/931-redis-asyncio
Zhenay Apr 26, 2023
48f2a99
Merge branch 'master' into Zhenay/931-redis-asyncio
Zhenay May 6, 2023
9621eff
Fix lint error
Zhenay May 6, 2023
30cba84
Merge branch 'master' into pr/Zhenay/1933
antonpirker Jul 10, 2023
3a45e86
Fixed merge conflict
antonpirker Jul 10, 2023
52ad879
Updated test matrix
antonpirker Jul 10, 2023
113a2e4
linting
antonpirker Jul 10, 2023
e439587
Formatting
antonpirker Jul 10, 2023
c262b3b
Added PII scrubbing
antonpirker Jul 10, 2023
e30e921
Fixed tests
antonpirker Jul 11, 2023
feca3e7
Fixed tests
antonpirker Jul 11, 2023
d1bad28
Updated tests
antonpirker Jul 11, 2023
9776c26
More test updates
antonpirker Jul 11, 2023
e8b240c
Typing
antonpirker Jul 11, 2023
0d06afd
Revert accidental commit
antonpirker Jul 11, 2023
4b34a6f
Merge branch 'master' into Zhenay/931-redis-asyncio
antonpirker Jul 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test-integration-redis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.7","3.8","3.9"]
python-version: ["3.7","3.8","3.9","3.10","3.11"]
# python3.6 reached EOL and is no longer being supported on
# new versions of hosted runners on Github Actions
# ubuntu-20.04 is the last version that supported python3.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

if TYPE_CHECKING:
from typing import Any, Sequence
from sentry_sdk.tracing import Span

_SINGLE_KEY_COMMANDS = frozenset(
["decr", "decrby", "get", "incr", "incrby", "pttl", "set", "setex", "setnx", "ttl"]
Expand All @@ -25,10 +26,64 @@
]

_MAX_NUM_ARGS = 10 # Trim argument lists to this many values
_MAX_NUM_COMMANDS = 10 # Trim command lists to this many values

_DEFAULT_MAX_DATA_SIZE = 1024


def _get_safe_command(name, args):
# type: (str, Sequence[Any]) -> str
command_parts = [name]

for i, arg in enumerate(args):
if i > _MAX_NUM_ARGS:
break

name_low = name.lower()

if name_low in _COMMANDS_INCLUDING_SENSITIVE_DATA:
command_parts.append(SENSITIVE_DATA_SUBSTITUTE)
continue

arg_is_the_key = i == 0
if arg_is_the_key:
command_parts.append(repr(arg))

else:
if _should_send_default_pii():
command_parts.append(repr(arg))
else:
command_parts.append(SENSITIVE_DATA_SUBSTITUTE)

command = " ".join(command_parts)
return command


def _set_pipeline_data(
span, is_cluster, get_command_args_fn, is_transaction, command_stack
):
# type: (Span, bool, Any, bool, Sequence[Any]) -> None
span.set_tag("redis.is_cluster", is_cluster)
transaction = is_transaction if not is_cluster else False
span.set_tag("redis.transaction", transaction)

commands = []
for i, arg in enumerate(command_stack):
if i >= _MAX_NUM_COMMANDS:
break

command = get_command_args_fn(arg)
commands.append(_get_safe_command(command[0], command[1:]))

span.set_data(
"redis.commands",
{
"count": len(command_stack),
"first_ten": commands,
},
)


def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn):
# type: (Any, bool, Any) -> None
old_execute = pipeline_cls.execute
Expand All @@ -44,24 +99,12 @@ def sentry_patched_execute(self, *args, **kwargs):
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
span.set_tag("redis.is_cluster", is_cluster)
transaction = self.transaction if not is_cluster else False
span.set_tag("redis.transaction", transaction)

commands = []
for i, arg in enumerate(self.command_stack):
if i > _MAX_NUM_ARGS:
break
command_args = []
for j, command_arg in enumerate(get_command_args_fn(arg)):
if j > 0:
command_arg = repr(command_arg)
command_args.append(command_arg)
commands.append(" ".join(command_args))

span.set_data(
"redis.commands",
{"count": len(self.command_stack), "first_ten": commands},
_set_pipeline_data(
span,
is_cluster,
get_command_args_fn,
self.transaction,
self.command_stack,
)
span.set_data(SPANDATA.DB_SYSTEM, "redis")

Expand All @@ -80,6 +123,43 @@ def _parse_rediscluster_command(command):
return command.args


def _patch_redis(StrictRedis, client): # noqa: N803
# type: (Any, Any) -> None
patch_redis_client(StrictRedis, is_cluster=False)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args)
try:
strict_pipeline = client.StrictPipeline
except AttributeError:
pass
else:
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)

try:
import redis.asyncio
except ImportError:
pass
else:
from sentry_sdk.integrations.redis.asyncio import (
patch_redis_async_client,
patch_redis_async_pipeline,
)

patch_redis_async_client(redis.asyncio.client.StrictRedis)
patch_redis_async_pipeline(redis.asyncio.client.Pipeline)


def _patch_rb():
# type: () -> None
try:
import rb.clients # type: ignore
except ImportError:
pass
else:
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)


def _patch_rediscluster():
# type: () -> None
try:
Expand Down Expand Up @@ -119,30 +199,40 @@ def setup_once():
except ImportError:
raise DidNotEnable("Redis client not installed")

patch_redis_client(StrictRedis, is_cluster=False)
patch_redis_pipeline(client.Pipeline, False, _get_redis_command_args)
try:
strict_pipeline = client.StrictPipeline # type: ignore
except AttributeError:
pass
else:
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)

try:
import rb.clients # type: ignore
except ImportError:
pass
else:
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)
_patch_redis(StrictRedis, client)
_patch_rb()

try:
_patch_rediscluster()
except Exception:
logger.exception("Error occurred while patching `rediscluster` library")


def _get_span_description(name, *args):
# type: (str, *Any) -> str
description = name

with capture_internal_exceptions():
description = _get_safe_command(name, args)

return description


def _set_client_data(span, is_cluster, name, *args):
# type: (Span, bool, str, *Any) -> None
span.set_tag("redis.is_cluster", is_cluster)
if name:
span.set_tag("redis.command", name)
span.set_tag(SPANDATA.DB_OPERATION, name)

if name and args:
name_low = name.lower()
if (name_low in _SINGLE_KEY_COMMANDS) or (
name_low in _MULTI_KEY_COMMANDS and len(args) == 1
):
span.set_tag("redis.key", args[0])


def patch_redis_client(cls, is_cluster):
# type: (Any, bool) -> None
"""
Expand All @@ -159,31 +249,7 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):
if integration is None:
return old_execute_command(self, name, *args, **kwargs)

description = name

with capture_internal_exceptions():
description_parts = [name]
for i, arg in enumerate(args):
if i > _MAX_NUM_ARGS:
break

name_low = name.lower()

if name_low in _COMMANDS_INCLUDING_SENSITIVE_DATA:
description_parts.append(SENSITIVE_DATA_SUBSTITUTE)
continue

arg_is_the_key = i == 0
if arg_is_the_key:
description_parts.append(repr(arg))

else:
if _should_send_default_pii():
description_parts.append(repr(arg))
else:
description_parts.append(SENSITIVE_DATA_SUBSTITUTE)

description = " ".join(description_parts)
description = _get_span_description(name, *args)

data_should_be_truncated = (
integration.max_data_size and len(description) > integration.max_data_size
Expand All @@ -192,18 +258,7 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):
description = description[: integration.max_data_size - len("...")] + "..."

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
span.set_tag("redis.is_cluster", is_cluster)

if name:
span.set_tag("redis.command", name)
span.set_tag(SPANDATA.DB_OPERATION, name)

if name and args:
name_low = name.lower()
if (name_low in _SINGLE_KEY_COMMANDS) or (
name_low in _MULTI_KEY_COMMANDS and len(args) == 1
):
span.set_tag("redis.key", args[0])
_set_client_data(span, is_cluster, name, *args)

return old_execute_command(self, name, *args, **kwargs)

Expand Down
67 changes: 67 additions & 0 deletions sentry_sdk/integrations/redis/asyncio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import absolute_import

from sentry_sdk import Hub
from sentry_sdk.consts import OP
from sentry_sdk.utils import capture_internal_exceptions
from sentry_sdk.integrations.redis import (
RedisIntegration,
_get_redis_command_args,
_get_span_description,
_set_client_data,
_set_pipeline_data,
)


from sentry_sdk._types import MYPY

if MYPY:
from typing import Any


def patch_redis_async_pipeline(pipeline_cls):
# type: (Any) -> None
old_execute = pipeline_cls.execute

async def _sentry_execute(self, *args, **kwargs):
# type: (Any, *Any, **Any) -> Any
hub = Hub.current

if hub.get_integration(RedisIntegration) is None:
return await old_execute(self, *args, **kwargs)

with hub.start_span(
op=OP.DB_REDIS, description="redis.pipeline.execute"
) as span:
with capture_internal_exceptions():
_set_pipeline_data(
span,
False,
_get_redis_command_args,
self.is_transaction,
self.command_stack,
)

return await old_execute(self, *args, **kwargs)

pipeline_cls.execute = _sentry_execute


def patch_redis_async_client(cls):
# type: (Any) -> None
old_execute_command = cls.execute_command

async def _sentry_execute_command(self, name, *args, **kwargs):
# type: (Any, str, *Any, **Any) -> Any
hub = Hub.current

if hub.get_integration(RedisIntegration) is None:
return await old_execute_command(self, name, *args, **kwargs)

description = _get_span_description(name, *args)

with hub.start_span(op=OP.DB_REDIS, description=description) as span:
_set_client_data(span, False, name, *args)

return await old_execute_command(self, name, *args, **kwargs)

cls.execute_command = _sentry_execute_command
3 changes: 3 additions & 0 deletions tests/integrations/redis/asyncio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("fakeredis.aioredis")
Loading