Skip to content

Commit fa56c6c

Browse files
authored
Instrument redis.asyncio clients (#1076)
1 parent 2671ff5 commit fa56c6c

File tree

4 files changed

+161
-1
lines changed

4 files changed

+161
-1
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1919
### Added
2020
- `opentelemetry-instrument` and `opentelemetry-bootstrap` now include a `--version` flag
2121
([#1065](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1065))
22+
- `opentelemetry-instrumentation-redis` now instruments asynchronous Redis clients, if the installed redis-py includes async support (>=4.2.0).
23+
([#1076](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1076))
2224

2325
## [1.11.1-0.30b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.11.1-0.30b1) - 2022-04-21
2426

instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,22 @@
3838
client = redis.StrictRedis(host="localhost", port=6379)
3939
client.get("my-key")
4040
41+
Async Redis clients (i.e. redis.asyncio.Redis) are also instrumented in the same way:
42+
43+
.. code:: python
44+
45+
from opentelemetry.instrumentation.redis import RedisInstrumentor
46+
import redis.asyncio
47+
48+
49+
# Instrument redis
50+
RedisInstrumentor().instrument()
51+
52+
# This will report a span with the default settings
53+
async def redis_get():
54+
client = redis.asyncio.Redis(host="localhost", port=6379)
55+
await client.get("my-key")
56+
4157
The `instrument` method accepts the following keyword args:
4258
4359
tracer_provider (TracerProvider) - an optional tracer provider
@@ -102,6 +118,10 @@ def response_hook(span, instance, response):
102118
typing.Callable[[Span, redis.connection.Connection, Any], None]
103119
]
104120

121+
_REDIS_ASYNCIO_VERSION = (4, 2, 0)
122+
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
123+
import redis.asyncio
124+
105125

106126
def _set_connection_attributes(span, conn):
107127
if not span.is_recording():
@@ -176,6 +196,22 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
176196
f"{pipeline_class}.immediate_execute_command",
177197
_traced_execute_command,
178198
)
199+
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
200+
wrap_function_wrapper(
201+
"redis.asyncio",
202+
f"{redis_class}.execute_command",
203+
_traced_execute_command,
204+
)
205+
wrap_function_wrapper(
206+
"redis.asyncio.client",
207+
f"{pipeline_class}.execute",
208+
_traced_execute_pipeline,
209+
)
210+
wrap_function_wrapper(
211+
"redis.asyncio.client",
212+
f"{pipeline_class}.immediate_execute_command",
213+
_traced_execute_command,
214+
)
179215

180216

181217
class RedisInstrumentor(BaseInstrumentor):
@@ -222,3 +258,8 @@ def _uninstrument(self, **kwargs):
222258
unwrap(redis.Redis, "pipeline")
223259
unwrap(redis.client.Pipeline, "execute")
224260
unwrap(redis.client.Pipeline, "immediate_execute_command")
261+
if redis.VERSION >= _REDIS_ASYNCIO_VERSION:
262+
unwrap(redis.asyncio.Redis, "execute_command")
263+
unwrap(redis.asyncio.Redis, "pipeline")
264+
unwrap(redis.asyncio.client.Pipeline, "execute")
265+
unwrap(redis.asyncio.client.Pipeline, "immediate_execute_command")

tests/opentelemetry-docker-tests/tests/redis/test_redis_functional.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
import asyncio
16+
1517
import redis
18+
import redis.asyncio
1619

1720
from opentelemetry import trace
1821
from opentelemetry.instrumentation.redis import RedisInstrumentor
@@ -121,6 +124,120 @@ def test_parent(self):
121124
self.assertEqual(child_span.name, "GET")
122125

123126

127+
def async_call(coro):
128+
loop = asyncio.get_event_loop()
129+
return loop.run_until_complete(coro)
130+
131+
132+
class TestAsyncRedisInstrument(TestBase):
133+
def setUp(self):
134+
super().setUp()
135+
self.redis_client = redis.asyncio.Redis(port=6379)
136+
async_call(self.redis_client.flushall())
137+
RedisInstrumentor().instrument(tracer_provider=self.tracer_provider)
138+
139+
def tearDown(self):
140+
super().tearDown()
141+
RedisInstrumentor().uninstrument()
142+
143+
def _check_span(self, span, name):
144+
self.assertEqual(span.name, name)
145+
self.assertIs(span.status.status_code, trace.StatusCode.UNSET)
146+
self.assertEqual(span.attributes.get(SpanAttributes.DB_NAME), 0)
147+
self.assertEqual(
148+
span.attributes[SpanAttributes.NET_PEER_NAME], "localhost"
149+
)
150+
self.assertEqual(span.attributes[SpanAttributes.NET_PEER_PORT], 6379)
151+
152+
def test_long_command(self):
153+
async_call(self.redis_client.mget(*range(1000)))
154+
155+
spans = self.memory_exporter.get_finished_spans()
156+
self.assertEqual(len(spans), 1)
157+
span = spans[0]
158+
self._check_span(span, "MGET")
159+
self.assertTrue(
160+
span.attributes.get(SpanAttributes.DB_STATEMENT).startswith(
161+
"MGET 0 1 2 3"
162+
)
163+
)
164+
self.assertTrue(
165+
span.attributes.get(SpanAttributes.DB_STATEMENT).endswith("...")
166+
)
167+
168+
def test_basics(self):
169+
self.assertIsNone(async_call(self.redis_client.get("cheese")))
170+
spans = self.memory_exporter.get_finished_spans()
171+
self.assertEqual(len(spans), 1)
172+
span = spans[0]
173+
self._check_span(span, "GET")
174+
self.assertEqual(
175+
span.attributes.get(SpanAttributes.DB_STATEMENT), "GET cheese"
176+
)
177+
self.assertEqual(span.attributes.get("db.redis.args_length"), 2)
178+
179+
def test_pipeline_traced(self):
180+
async def pipeline_simple():
181+
async with self.redis_client.pipeline(
182+
transaction=False
183+
) as pipeline:
184+
pipeline.set("blah", 32)
185+
pipeline.rpush("foo", "éé")
186+
pipeline.hgetall("xxx")
187+
await pipeline.execute()
188+
189+
async_call(pipeline_simple())
190+
191+
spans = self.memory_exporter.get_finished_spans()
192+
self.assertEqual(len(spans), 1)
193+
span = spans[0]
194+
self._check_span(span, "SET RPUSH HGETALL")
195+
self.assertEqual(
196+
span.attributes.get(SpanAttributes.DB_STATEMENT),
197+
"SET blah 32\nRPUSH foo éé\nHGETALL xxx",
198+
)
199+
self.assertEqual(span.attributes.get("db.redis.pipeline_length"), 3)
200+
201+
def test_pipeline_immediate(self):
202+
async def pipeline_immediate():
203+
async with self.redis_client.pipeline() as pipeline:
204+
pipeline.set("a", 1)
205+
await pipeline.immediate_execute_command("SET", "b", 2)
206+
await pipeline.execute()
207+
208+
async_call(pipeline_immediate())
209+
210+
spans = self.memory_exporter.get_finished_spans()
211+
# expecting two separate spans here, rather than a
212+
# single span for the whole pipeline
213+
self.assertEqual(len(spans), 2)
214+
span = spans[0]
215+
self._check_span(span, "SET")
216+
self.assertEqual(
217+
span.attributes.get(SpanAttributes.DB_STATEMENT), "SET b 2"
218+
)
219+
220+
def test_parent(self):
221+
"""Ensure OpenTelemetry works with redis."""
222+
ot_tracer = trace.get_tracer("redis_svc")
223+
224+
with ot_tracer.start_as_current_span("redis_get"):
225+
self.assertIsNone(async_call(self.redis_client.get("cheese")))
226+
227+
spans = self.memory_exporter.get_finished_spans()
228+
self.assertEqual(len(spans), 2)
229+
child_span, parent_span = spans[0], spans[1]
230+
231+
# confirm the parenting
232+
self.assertIsNone(parent_span.parent)
233+
self.assertIs(child_span.parent, parent_span.get_span_context())
234+
235+
self.assertEqual(parent_span.name, "redis_get")
236+
self.assertEqual(parent_span.instrumentation_info.name, "redis_svc")
237+
238+
self.assertEqual(child_span.name, "GET")
239+
240+
124241
class TestRedisDBIndexInstrument(TestBase):
125242
def setUp(self):
126243
super().setUp()

tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -491,7 +491,7 @@ deps =
491491
psycopg2 ~= 2.8.4
492492
aiopg >= 0.13.0, < 1.3.0
493493
sqlalchemy ~= 1.4
494-
redis ~= 3.5
494+
redis ~= 4.2
495495
celery[pytest] >= 4.0, < 6.0
496496
protobuf>=3.13.0
497497
requests==2.25.0

0 commit comments

Comments
 (0)