Skip to content

Commit b7adeb5

Browse files
committed
Add support for redis asyncio
1 parent ad3724c commit b7adeb5

File tree

3 files changed

+218
-57
lines changed

3 files changed

+218
-57
lines changed

sentry_sdk/integrations/redis.py

Lines changed: 146 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
if MYPY:
1111
from typing import Any, Sequence
12+
from sentry_sdk.tracing import Span
1213

1314
_SINGLE_KEY_COMMANDS = frozenset(
1415
["decr", "decrby", "get", "incr", "incrby", "pttl", "set", "setex", "setnx", "ttl"]
@@ -19,6 +20,31 @@
1920
_MAX_NUM_ARGS = 10
2021

2122

23+
def _set_pipeline_data(
24+
span, is_cluster, get_command_args_fn, is_transaction, command_stack
25+
):
26+
# type: (Span, bool, Any, bool, Sequence[Any]) -> None
27+
span.set_tag("redis.is_cluster", is_cluster)
28+
transaction = is_transaction if not is_cluster else False
29+
span.set_tag("redis.transaction", transaction)
30+
31+
commands = []
32+
for i, arg in enumerate(command_stack):
33+
if i > _MAX_NUM_ARGS:
34+
break
35+
command_args = []
36+
for j, command_arg in enumerate(get_command_args_fn(arg)):
37+
if j > 0:
38+
command_arg = repr(command_arg)
39+
command_args.append(command_arg)
40+
commands.append(" ".join(command_args))
41+
42+
span.set_data(
43+
"redis.commands",
44+
{"count": len(command_stack), "first_ten": commands},
45+
)
46+
47+
2248
def patch_redis_pipeline(pipeline_cls, is_cluster, get_command_args_fn):
2349
# type: (Any, bool, Any) -> None
2450
old_execute = pipeline_cls.execute
@@ -34,31 +60,47 @@ def sentry_patched_execute(self, *args, **kwargs):
3460
op=OP.DB_REDIS, description="redis.pipeline.execute"
3561
) as span:
3662
with capture_internal_exceptions():
37-
span.set_tag("redis.is_cluster", is_cluster)
38-
transaction = self.transaction if not is_cluster else False
39-
span.set_tag("redis.transaction", transaction)
40-
41-
commands = []
42-
for i, arg in enumerate(self.command_stack):
43-
if i > _MAX_NUM_ARGS:
44-
break
45-
command_args = []
46-
for j, command_arg in enumerate(get_command_args_fn(arg)):
47-
if j > 0:
48-
command_arg = repr(command_arg)
49-
command_args.append(command_arg)
50-
commands.append(" ".join(command_args))
51-
52-
span.set_data(
53-
"redis.commands",
54-
{"count": len(self.command_stack), "first_ten": commands},
63+
_set_pipeline_data(
64+
span,
65+
is_cluster,
66+
get_command_args_fn,
67+
self.transaction,
68+
self.command_stack,
5569
)
5670

5771
return old_execute(self, *args, **kwargs)
5872

5973
pipeline_cls.execute = sentry_patched_execute
6074

6175

76+
def patch_redis_async_pipeline(pipeline_cls):
77+
# type: (Any) -> None
78+
old_execute = pipeline_cls.execute
79+
80+
async def _sentry_execute(self, *args, **kwargs):
81+
# type: (Any, *Any, **Any) -> Any
82+
hub = Hub.current
83+
84+
if hub.get_integration(RedisIntegration) is None:
85+
return await old_execute(self, *args, **kwargs)
86+
87+
with hub.start_span(
88+
op=OP.DB_REDIS, description="redis.pipeline.execute"
89+
) as span:
90+
with capture_internal_exceptions():
91+
_set_pipeline_data(
92+
span,
93+
False,
94+
_get_redis_command_args,
95+
self.is_transaction,
96+
self.command_stack,
97+
)
98+
99+
return await old_execute(self, *args, **kwargs)
100+
101+
pipeline_cls.execute = _sentry_execute
102+
103+
62104
def _get_redis_command_args(command):
63105
# type: (Any) -> Sequence[Any]
64106
return command[0]
@@ -69,6 +111,37 @@ def _parse_rediscluster_command(command):
69111
return command.args
70112

71113

114+
def _patch_redis(redis):
115+
# type: (Any) -> None
116+
patch_redis_client(redis.StrictRedis, is_cluster=False)
117+
patch_redis_pipeline(redis.client.Pipeline, False, _get_redis_command_args)
118+
try:
119+
strict_pipeline = redis.client.StrictPipeline
120+
except AttributeError:
121+
pass
122+
else:
123+
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)
124+
try:
125+
import redis.asyncio # type: ignore
126+
except ImportError:
127+
pass
128+
else:
129+
patch_redis_async_client(redis.asyncio.client.StrictRedis)
130+
patch_redis_async_pipeline(redis.asyncio.client.Pipeline)
131+
132+
133+
def _patch_rb():
134+
# type: () -> None
135+
try:
136+
import rb.clients # type: ignore
137+
except ImportError:
138+
pass
139+
else:
140+
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
141+
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
142+
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)
143+
144+
72145
def _patch_rediscluster():
73146
# type: () -> None
74147
try:
@@ -104,30 +177,46 @@ def setup_once():
104177
except ImportError:
105178
raise DidNotEnable("Redis client not installed")
106179

107-
patch_redis_client(redis.StrictRedis, is_cluster=False)
108-
patch_redis_pipeline(redis.client.Pipeline, False, _get_redis_command_args)
109-
try:
110-
strict_pipeline = redis.client.StrictPipeline # type: ignore
111-
except AttributeError:
112-
pass
113-
else:
114-
patch_redis_pipeline(strict_pipeline, False, _get_redis_command_args)
115-
116-
try:
117-
import rb.clients # type: ignore
118-
except ImportError:
119-
pass
120-
else:
121-
patch_redis_client(rb.clients.FanoutClient, is_cluster=False)
122-
patch_redis_client(rb.clients.MappingClient, is_cluster=False)
123-
patch_redis_client(rb.clients.RoutingClient, is_cluster=False)
180+
_patch_redis(redis)
181+
_patch_rb()
124182

125183
try:
126184
_patch_rediscluster()
127185
except Exception:
128186
logger.exception("Error occurred while patching `rediscluster` library")
129187

130188

189+
def _get_span_description(name, *args):
190+
# type: (str, *Any) -> str
191+
description = name
192+
193+
with capture_internal_exceptions():
194+
description_parts = [name]
195+
for i, arg in enumerate(args):
196+
if i > _MAX_NUM_ARGS:
197+
break
198+
199+
description_parts.append(repr(arg))
200+
201+
description = " ".join(description_parts)
202+
203+
return description
204+
205+
206+
def _set_client_data(span, is_cluster, name, *args):
207+
# type: (Span, bool, str, *Any) -> None
208+
span.set_tag("redis.is_cluster", is_cluster)
209+
if name:
210+
span.set_tag("redis.command", name)
211+
212+
if name and args:
213+
name_low = name.lower()
214+
if (name_low in _SINGLE_KEY_COMMANDS) or (
215+
name_low in _MULTI_KEY_COMMANDS and len(args) == 1
216+
):
217+
span.set_tag("redis.key", args[0])
218+
219+
131220
def patch_redis_client(cls, is_cluster):
132221
# type: (Any, bool) -> None
133222
"""
@@ -143,30 +232,32 @@ def sentry_patched_execute_command(self, name, *args, **kwargs):
143232
if hub.get_integration(RedisIntegration) is None:
144233
return old_execute_command(self, name, *args, **kwargs)
145234

146-
description = name
235+
description = _get_span_description(name, *args)
147236

148-
with capture_internal_exceptions():
149-
description_parts = [name]
150-
for i, arg in enumerate(args):
151-
if i > _MAX_NUM_ARGS:
152-
break
237+
with hub.start_span(op=OP.DB_REDIS, description=description) as span:
238+
_set_client_data(span, is_cluster, name, *args)
153239

154-
description_parts.append(repr(arg))
240+
return old_execute_command(self, name, *args, **kwargs)
155241

156-
description = " ".join(description_parts)
242+
cls.execute_command = sentry_patched_execute_command
157243

158-
with hub.start_span(op=OP.DB_REDIS, description=description) as span:
159-
span.set_tag("redis.is_cluster", is_cluster)
160-
if name:
161-
span.set_tag("redis.command", name)
162244

163-
if name and args:
164-
name_low = name.lower()
165-
if (name_low in _SINGLE_KEY_COMMANDS) or (
166-
name_low in _MULTI_KEY_COMMANDS and len(args) == 1
167-
):
168-
span.set_tag("redis.key", args[0])
245+
def patch_redis_async_client(cls):
246+
# type: (Any) -> None
247+
old_execute_command = cls.execute_command
169248

170-
return old_execute_command(self, name, *args, **kwargs)
249+
async def _sentry_execute_command(self, name, *args, **kwargs):
250+
# type: (Any, str, *Any, **Any) -> Any
251+
hub = Hub.current
171252

172-
cls.execute_command = sentry_patched_execute_command
253+
if hub.get_integration(RedisIntegration) is None:
254+
return await old_execute_command(self, name, *args, **kwargs)
255+
256+
description = _get_span_description(name, *args)
257+
258+
with hub.start_span(op=OP.DB_REDIS, description=description) as span:
259+
_set_client_data(span, False, name, *args)
260+
261+
return await old_execute_command(self, name, *args, **kwargs)
262+
263+
cls.execute_command = _sentry_execute_command

tests/integrations/redis/test_redis.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1+
import pytest
2+
13
from sentry_sdk import capture_message, start_transaction
24
from sentry_sdk.integrations.redis import RedisIntegration
35

46
from fakeredis import FakeStrictRedis
5-
import pytest
7+
8+
try:
9+
from fakeredis.aioredis import FakeRedis as AsyncFakeRedis
10+
except ImportError:
11+
AsyncFakeRedis = None
612

713

814
def test_basic(sentry_init, capture_events):
@@ -58,3 +64,66 @@ def test_redis_pipeline(sentry_init, capture_events, is_transaction):
5864
"redis.transaction": is_transaction,
5965
"redis.is_cluster": False,
6066
}
67+
68+
69+
@pytest.mark.asyncio
70+
@pytest.mark.skipif(
71+
AsyncFakeRedis is None, reason="fakeredis.asyncio is not available."
72+
)
73+
async def test_async_basic(sentry_init, capture_events):
74+
sentry_init(integrations=[RedisIntegration()])
75+
events = capture_events()
76+
77+
connection = AsyncFakeRedis()
78+
79+
await connection.get("foobar")
80+
capture_message("hi")
81+
82+
(event,) = events
83+
(crumb,) = event["breadcrumbs"]["values"]
84+
85+
assert crumb == {
86+
"category": "redis",
87+
"message": "GET 'foobar'",
88+
"data": {
89+
"redis.key": "foobar",
90+
"redis.command": "GET",
91+
"redis.is_cluster": False,
92+
},
93+
"timestamp": crumb["timestamp"],
94+
"type": "redis",
95+
}
96+
97+
98+
@pytest.mark.parametrize("is_transaction", [False, True])
99+
@pytest.mark.asyncio
100+
@pytest.mark.skipif(
101+
AsyncFakeRedis is None, reason="fakeredis.asyncio is not available."
102+
)
103+
async def test_async_redis_pipeline(sentry_init, capture_events, is_transaction):
104+
sentry_init(integrations=[RedisIntegration()], traces_sample_rate=1.0)
105+
events = capture_events()
106+
107+
connection = FakeStrictRedis()
108+
with start_transaction():
109+
110+
pipeline = connection.pipeline(transaction=is_transaction)
111+
pipeline.get("foo")
112+
pipeline.set("bar", 1)
113+
pipeline.set("baz", 2)
114+
pipeline.execute()
115+
116+
(event,) = events
117+
(span,) = event["spans"]
118+
assert span["op"] == "db.redis"
119+
assert span["description"] == "redis.pipeline.execute"
120+
assert span["data"] == {
121+
"redis.commands": {
122+
"count": 3,
123+
"first_ten": ["GET 'foo'", "SET 'bar' 1", "SET 'baz' 2"],
124+
}
125+
}
126+
assert span["tags"] == {
127+
"redis.transaction": is_transaction,
128+
"redis.is_cluster": False,
129+
}

tox.ini

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,8 @@ deps =
333333
requests: requests>=2.0
334334

335335
# Redis
336-
redis: fakeredis<1.7.4
336+
redis: fakeredis>=1.7.5
337+
redis: pytest-asyncio
337338

338339
# Redis Cluster
339340
rediscluster-v1: redis-py-cluster>=1.0.0,<2.0.0

0 commit comments

Comments
 (0)