Skip to content

Commit 87d582d

Browse files
authored
feat(integrations): Add integration for asyncpg (#2314)
So far this records every statement that is directly issued, as well as the SQL statements that are used for cursors and prepared statements.
1 parent a6e1cbe commit 87d582d

File tree

9 files changed

+788
-1
lines changed

9 files changed

+788
-1
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
name: Test asyncpg
2+
3+
on:
4+
push:
5+
branches:
6+
- master
7+
- release/**
8+
9+
pull_request:
10+
11+
# Cancel in progress workflows on pull_requests.
12+
# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-using-a-fallback-value
13+
concurrency:
14+
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
15+
cancel-in-progress: true
16+
17+
permissions:
18+
contents: read
19+
20+
env:
21+
BUILD_CACHE_KEY: ${{ github.sha }}
22+
CACHED_BUILD_PATHS: |
23+
${{ github.workspace }}/dist-serverless
24+
25+
jobs:
26+
test:
27+
name: asyncpg, python ${{ matrix.python-version }}, ${{ matrix.os }}
28+
runs-on: ${{ matrix.os }}
29+
timeout-minutes: 30
30+
31+
strategy:
32+
fail-fast: false
33+
matrix:
34+
python-version: ["3.7","3.8","3.9","3.10","3.11"]
35+
# python3.6 reached EOL and is no longer being supported on
36+
# new versions of hosted runners on Github Actions
37+
# ubuntu-20.04 is the last version that supported python3.6
38+
# see https://github.com/actions/setup-python/issues/544#issuecomment-1332535877
39+
os: [ubuntu-20.04]
40+
services:
41+
postgres:
42+
image: postgres
43+
env:
44+
POSTGRES_PASSWORD: sentry
45+
# Set health checks to wait until postgres has started
46+
options: >-
47+
--health-cmd pg_isready
48+
--health-interval 10s
49+
--health-timeout 5s
50+
--health-retries 5
51+
# Maps tcp port 5432 on service container to the host
52+
ports:
53+
- 5432:5432
54+
env:
55+
SENTRY_PYTHON_TEST_POSTGRES_USER: postgres
56+
SENTRY_PYTHON_TEST_POSTGRES_PASSWORD: sentry
57+
SENTRY_PYTHON_TEST_POSTGRES_NAME: ci_test
58+
SENTRY_PYTHON_TEST_POSTGRES_HOST: localhost
59+
60+
steps:
61+
- uses: actions/checkout@v3
62+
- uses: actions/setup-python@v4
63+
with:
64+
python-version: ${{ matrix.python-version }}
65+
66+
- name: Setup Test Env
67+
run: |
68+
pip install coverage "tox>=3,<4"
69+
70+
- name: Test asyncpg
71+
uses: nick-fields/retry@v2
72+
with:
73+
timeout_minutes: 15
74+
max_attempts: 2
75+
retry_wait_seconds: 5
76+
shell: bash
77+
command: |
78+
set -x # print commands that are executed
79+
coverage erase
80+
81+
# Run tests
82+
./scripts/runtox.sh "py${{ matrix.python-version }}-asyncpg" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch &&
83+
coverage combine .coverage* &&
84+
coverage xml -i
85+
86+
- uses: codecov/codecov-action@v3
87+
with:
88+
token: ${{ secrets.CODECOV_TOKEN }}
89+
files: coverage.xml
90+
91+
92+
check_required_tests:
93+
name: All asyncpg tests passed or skipped
94+
needs: test
95+
# Always run this, even if a dependent job failed
96+
if: always()
97+
runs-on: ubuntu-20.04
98+
steps:
99+
- name: Check for failures
100+
if: contains(needs.test.result, 'failure')
101+
run: |
102+
echo "One of the dependent jobs has failed. You may need to re-run it." && exit 1

scripts/split-tox-gh-actions/split-tox-gh-actions.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,10 @@
3030
TEMPLATE_SNIPPET_TEST = TEMPLATE_DIR / "ci-yaml-test-snippet.txt"
3131
TEMPLATE_SNIPPET_TEST_PY27 = TEMPLATE_DIR / "ci-yaml-test-py27-snippet.txt"
3232

33-
FRAMEWORKS_NEEDING_POSTGRES = ["django"]
33+
FRAMEWORKS_NEEDING_POSTGRES = [
34+
"django",
35+
"asyncpg",
36+
]
3437

3538
MATRIX_DEFINITION = """
3639
strategy:

sentry_sdk/consts.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ class SPANDATA:
7474
Example: myDatabase
7575
"""
7676

77+
DB_USER = "db.user"
78+
"""
79+
The name of the database user used for connecting to the database.
80+
See: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/database.md
81+
Example: my_user
82+
"""
83+
7784
DB_OPERATION = "db.operation"
7885
"""
7986
The name of the operation being executed, e.g. the MongoDB command name such as findAndModify, or the SQL keyword.

sentry_sdk/integrations/asyncpg.py

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
from __future__ import annotations
2+
import contextlib
3+
from typing import Any, TypeVar, Callable, Awaitable, Iterator
4+
5+
from asyncpg.cursor import BaseCursor # type: ignore
6+
7+
from sentry_sdk import Hub
8+
from sentry_sdk.consts import OP, SPANDATA
9+
from sentry_sdk.integrations import Integration, DidNotEnable
10+
from sentry_sdk.tracing import Span
11+
from sentry_sdk.tracing_utils import record_sql_queries
12+
from sentry_sdk.utils import parse_version, capture_internal_exceptions
13+
14+
try:
15+
import asyncpg # type: ignore[import]
16+
17+
except ImportError:
18+
raise DidNotEnable("asyncpg not installed.")
19+
20+
# asyncpg.__version__ is a string containing the semantic version in the form of "<major>.<minor>.<patch>"
21+
asyncpg_version = parse_version(asyncpg.__version__)
22+
23+
if asyncpg_version is not None and asyncpg_version < (0, 23, 0):
24+
raise DidNotEnable("asyncpg >= 0.23.0 required")
25+
26+
27+
class AsyncPGIntegration(Integration):
28+
identifier = "asyncpg"
29+
_record_params = False
30+
31+
def __init__(self, *, record_params: bool = False):
32+
AsyncPGIntegration._record_params = record_params
33+
34+
@staticmethod
35+
def setup_once() -> None:
36+
asyncpg.Connection.execute = _wrap_execute(
37+
asyncpg.Connection.execute,
38+
)
39+
40+
asyncpg.Connection._execute = _wrap_connection_method(
41+
asyncpg.Connection._execute
42+
)
43+
asyncpg.Connection._executemany = _wrap_connection_method(
44+
asyncpg.Connection._executemany, executemany=True
45+
)
46+
asyncpg.Connection.cursor = _wrap_cursor_creation(asyncpg.Connection.cursor)
47+
asyncpg.Connection.prepare = _wrap_connection_method(asyncpg.Connection.prepare)
48+
asyncpg.connect_utils._connect_addr = _wrap_connect_addr(
49+
asyncpg.connect_utils._connect_addr
50+
)
51+
52+
53+
T = TypeVar("T")
54+
55+
56+
def _wrap_execute(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
57+
async def _inner(*args: Any, **kwargs: Any) -> T:
58+
hub = Hub.current
59+
integration = hub.get_integration(AsyncPGIntegration)
60+
61+
# Avoid recording calls to _execute twice.
62+
# Calls to Connection.execute with args also call
63+
# Connection._execute, which is recorded separately
64+
# args[0] = the connection object, args[1] is the query
65+
if integration is None or len(args) > 2:
66+
return await f(*args, **kwargs)
67+
68+
query = args[1]
69+
with record_sql_queries(hub, None, query, None, None, executemany=False):
70+
res = await f(*args, **kwargs)
71+
return res
72+
73+
return _inner
74+
75+
76+
SubCursor = TypeVar("SubCursor", bound=BaseCursor)
77+
78+
79+
@contextlib.contextmanager
80+
def _record(
81+
hub: Hub,
82+
cursor: SubCursor | None,
83+
query: str,
84+
params_list: tuple[Any, ...] | None,
85+
*,
86+
executemany: bool = False,
87+
) -> Iterator[Span]:
88+
integration = hub.get_integration(AsyncPGIntegration)
89+
if not integration._record_params:
90+
params_list = None
91+
92+
param_style = "pyformat" if params_list else None
93+
94+
with record_sql_queries(
95+
hub,
96+
cursor,
97+
query,
98+
params_list,
99+
param_style,
100+
executemany=executemany,
101+
record_cursor_repr=cursor is not None,
102+
) as span:
103+
yield span
104+
105+
106+
def _wrap_connection_method(
107+
f: Callable[..., Awaitable[T]], *, executemany: bool = False
108+
) -> Callable[..., Awaitable[T]]:
109+
async def _inner(*args: Any, **kwargs: Any) -> T:
110+
hub = Hub.current
111+
integration = hub.get_integration(AsyncPGIntegration)
112+
113+
if integration is None:
114+
return await f(*args, **kwargs)
115+
116+
query = args[1]
117+
params_list = args[2] if len(args) > 2 else None
118+
with _record(hub, None, query, params_list, executemany=executemany) as span:
119+
_set_db_data(span, args[0])
120+
res = await f(*args, **kwargs)
121+
return res
122+
123+
return _inner
124+
125+
126+
def _wrap_cursor_creation(f: Callable[..., T]) -> Callable[..., T]:
127+
def _inner(*args: Any, **kwargs: Any) -> T: # noqa: N807
128+
hub = Hub.current
129+
integration = hub.get_integration(AsyncPGIntegration)
130+
131+
if integration is None:
132+
return f(*args, **kwargs)
133+
134+
query = args[1]
135+
params_list = args[2] if len(args) > 2 else None
136+
137+
with _record(
138+
hub,
139+
None,
140+
query,
141+
params_list,
142+
executemany=False,
143+
) as span:
144+
_set_db_data(span, args[0])
145+
res = f(*args, **kwargs)
146+
span.set_data("db.cursor", res)
147+
148+
return res
149+
150+
return _inner
151+
152+
153+
def _wrap_connect_addr(f: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
154+
async def _inner(*args: Any, **kwargs: Any) -> T:
155+
hub = Hub.current
156+
integration = hub.get_integration(AsyncPGIntegration)
157+
158+
if integration is None:
159+
return await f(*args, **kwargs)
160+
161+
user = kwargs["params"].user
162+
database = kwargs["params"].database
163+
164+
with hub.start_span(op=OP.DB, description="connect") as span:
165+
span.set_data(SPANDATA.DB_SYSTEM, "postgresql")
166+
addr = kwargs.get("addr")
167+
if addr:
168+
try:
169+
span.set_data(SPANDATA.SERVER_ADDRESS, addr[0])
170+
span.set_data(SPANDATA.SERVER_PORT, addr[1])
171+
except IndexError:
172+
pass
173+
span.set_data(SPANDATA.DB_NAME, database)
174+
span.set_data(SPANDATA.DB_USER, user)
175+
176+
with capture_internal_exceptions():
177+
hub.add_breadcrumb(message="connect", category="query", data=span._data)
178+
res = await f(*args, **kwargs)
179+
180+
return res
181+
182+
return _inner
183+
184+
185+
def _set_db_data(span: Span, conn: Any) -> None:
186+
span.set_data(SPANDATA.DB_SYSTEM, "postgresql")
187+
188+
addr = conn._addr
189+
if addr:
190+
try:
191+
span.set_data(SPANDATA.SERVER_ADDRESS, addr[0])
192+
span.set_data(SPANDATA.SERVER_PORT, addr[1])
193+
except IndexError:
194+
pass
195+
196+
database = conn._params.database
197+
if database:
198+
span.set_data(SPANDATA.DB_NAME, database)
199+
200+
user = conn._params.user
201+
if user:
202+
span.set_data(SPANDATA.DB_USER, user)

sentry_sdk/tracing_utils.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ def record_sql_queries(
107107
params_list, # type: Any
108108
paramstyle, # type: Optional[str]
109109
executemany, # type: bool
110+
record_cursor_repr=False, # type: bool
110111
):
111112
# type: (...) -> Generator[sentry_sdk.tracing.Span, None, None]
112113

@@ -132,6 +133,8 @@ def record_sql_queries(
132133
data["db.paramstyle"] = paramstyle
133134
if executemany:
134135
data["db.executemany"] = True
136+
if record_cursor_repr and cursor is not None:
137+
data["db.cursor"] = cursor
135138

136139
with capture_internal_exceptions():
137140
hub.add_breadcrumb(message=query, category="query", data=data)

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def get_file_text(file_name):
4646
extras_require={
4747
"aiohttp": ["aiohttp>=3.5"],
4848
"arq": ["arq>=0.23"],
49+
"asyncpg": ["asyncpg>=0.23"],
4950
"beam": ["apache-beam>=2.12"],
5051
"bottle": ["bottle>=0.12.13"],
5152
"celery": ["celery>=3"],
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import pytest
2+
3+
pytest.importorskip("asyncpg")

0 commit comments

Comments
 (0)