Skip to content

Commit 76af9d2

Browse files
fdellekartali.sorouraminiantonpirker
authored
gRPC integration and aio interceptors (#2369)
Automatically add client and server interceptors to gRPC calls. Make it work with async gRPC servers and async gRPC client channels. --------- Co-authored-by: ali.sorouramini <[email protected]> Co-authored-by: Anton Pirker <[email protected]> Co-authored-by: Anton Pirker <[email protected]>
1 parent 2cb232e commit 76af9d2

19 files changed

+934
-111
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ repos:
1111
rev: 22.6.0
1212
hooks:
1313
- id: black
14+
exclude: ^(.*_pb2.py|.*_pb2_grpc.py)
1415

1516
- repo: https://github.com/pycqa/flake8
1617
rev: 5.0.4

linter-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ mypy
22
black
33
flake8==5.0.4 # flake8 depends on pyflakes>=3.0.0 and this dropped support for Python 2 "# type:" comments
44
types-certifi
5+
types-protobuf
56
types-redis
67
types-setuptools
78
pymongo # There is no separate types module.

pyproject.toml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
[tool.black]
2+
# 'extend-exclude' excludes files or directories in addition to the defaults
3+
extend-exclude = '''
4+
# A regex preceded with ^/ will apply only to files and directories
5+
# in the root of the project.
6+
(
7+
.*_pb2.py # exclude autogenerated Protocol Buffer files anywhere in the project
8+
| .*_pb2_grpc.py # exclude autogenerated Protocol Buffer files anywhere in the project
9+
)
10+
'''
Lines changed: 152 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,152 @@
1-
from .server import ServerInterceptor # noqa: F401
2-
from .client import ClientInterceptor # noqa: F401
1+
from functools import wraps
2+
3+
import grpc
4+
from grpc import Channel, Server, intercept_channel
5+
from grpc.aio import Channel as AsyncChannel
6+
from grpc.aio import Server as AsyncServer
7+
8+
from sentry_sdk.integrations import Integration
9+
from sentry_sdk._types import TYPE_CHECKING
10+
11+
from .client import ClientInterceptor
12+
from .server import ServerInterceptor
13+
from .aio.server import ServerInterceptor as AsyncServerInterceptor
14+
from .aio.client import (
15+
SentryUnaryUnaryClientInterceptor as AsyncUnaryUnaryClientInterceptor,
16+
)
17+
from .aio.client import (
18+
SentryUnaryStreamClientInterceptor as AsyncUnaryStreamClientIntercetor,
19+
)
20+
21+
from typing import Any, Optional, Sequence
22+
23+
# Hack to get new Python features working in older versions
24+
# without introducing a hard dependency on `typing_extensions`
25+
# from: https://stackoverflow.com/a/71944042/300572
26+
if TYPE_CHECKING:
27+
from typing import ParamSpec, Callable
28+
else:
29+
# Fake ParamSpec
30+
class ParamSpec:
31+
def __init__(self, _):
32+
self.args = None
33+
self.kwargs = None
34+
35+
# Callable[anything] will return None
36+
class _Callable:
37+
def __getitem__(self, _):
38+
return None
39+
40+
# Make instances
41+
Callable = _Callable()
42+
43+
P = ParamSpec("P")
44+
45+
46+
def _wrap_channel_sync(func: Callable[P, Channel]) -> Callable[P, Channel]:
47+
"Wrapper for synchronous secure and insecure channel."
48+
49+
@wraps(func)
50+
def patched_channel(*args: Any, **kwargs: Any) -> Channel:
51+
channel = func(*args, **kwargs)
52+
if not ClientInterceptor._is_intercepted:
53+
ClientInterceptor._is_intercepted = True
54+
return intercept_channel(channel, ClientInterceptor())
55+
else:
56+
return channel
57+
58+
return patched_channel
59+
60+
61+
def _wrap_intercept_channel(func: Callable[P, Channel]) -> Callable[P, Channel]:
62+
@wraps(func)
63+
def patched_intercept_channel(
64+
channel: Channel, *interceptors: grpc.ServerInterceptor
65+
) -> Channel:
66+
if ClientInterceptor._is_intercepted:
67+
interceptors = tuple(
68+
[
69+
interceptor
70+
for interceptor in interceptors
71+
if not isinstance(interceptor, ClientInterceptor)
72+
]
73+
)
74+
else:
75+
interceptors = interceptors
76+
return intercept_channel(channel, *interceptors)
77+
78+
return patched_intercept_channel # type: ignore
79+
80+
81+
def _wrap_channel_async(func: Callable[P, AsyncChannel]) -> Callable[P, AsyncChannel]:
82+
"Wrapper for asynchronous secure and insecure channel."
83+
84+
@wraps(func)
85+
def patched_channel(
86+
*args: P.args,
87+
interceptors: Optional[Sequence[grpc.aio.ClientInterceptor]] = None,
88+
**kwargs: P.kwargs,
89+
) -> Channel:
90+
sentry_interceptors = [
91+
AsyncUnaryUnaryClientInterceptor(),
92+
AsyncUnaryStreamClientIntercetor(),
93+
]
94+
interceptors = [*sentry_interceptors, *(interceptors or [])]
95+
return func(*args, interceptors=interceptors, **kwargs) # type: ignore
96+
97+
return patched_channel # type: ignore
98+
99+
100+
def _wrap_sync_server(func: Callable[P, Server]) -> Callable[P, Server]:
101+
"""Wrapper for synchronous server."""
102+
103+
@wraps(func)
104+
def patched_server(
105+
*args: P.args,
106+
interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None,
107+
**kwargs: P.kwargs,
108+
) -> Server:
109+
interceptors = [
110+
interceptor
111+
for interceptor in interceptors or []
112+
if not isinstance(interceptor, ServerInterceptor)
113+
]
114+
server_interceptor = ServerInterceptor()
115+
interceptors = [server_interceptor, *(interceptors or [])]
116+
return func(*args, interceptors=interceptors, **kwargs) # type: ignore
117+
118+
return patched_server # type: ignore
119+
120+
121+
def _wrap_async_server(func: Callable[P, AsyncServer]) -> Callable[P, AsyncServer]:
122+
"""Wrapper for asynchronous server."""
123+
124+
@wraps(func)
125+
def patched_aio_server(
126+
*args: P.args,
127+
interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None,
128+
**kwargs: P.kwargs,
129+
) -> Server:
130+
server_interceptor = AsyncServerInterceptor()
131+
interceptors = [server_interceptor, *(interceptors or [])]
132+
return func(*args, interceptors=interceptors, **kwargs) # type: ignore
133+
134+
return patched_aio_server # type: ignore
135+
136+
137+
class GRPCIntegration(Integration):
138+
identifier = "grpc"
139+
140+
@staticmethod
141+
def setup_once() -> None:
142+
import grpc
143+
144+
grpc.insecure_channel = _wrap_channel_sync(grpc.insecure_channel)
145+
grpc.secure_channel = _wrap_channel_sync(grpc.secure_channel)
146+
grpc.intercept_channel = _wrap_intercept_channel(grpc.intercept_channel)
147+
148+
grpc.aio.insecure_channel = _wrap_channel_async(grpc.aio.insecure_channel)
149+
grpc.aio.secure_channel = _wrap_channel_async(grpc.aio.secure_channel)
150+
151+
grpc.server = _wrap_sync_server(grpc.server)
152+
grpc.aio.server = _wrap_async_server(grpc.aio.server)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .server import ServerInterceptor # noqa: F401
2+
from .client import ClientInterceptor # noqa: F401
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
from typing import Callable, Union, AsyncIterable, Any
2+
3+
from grpc.aio import (
4+
UnaryUnaryClientInterceptor,
5+
UnaryStreamClientInterceptor,
6+
ClientCallDetails,
7+
UnaryUnaryCall,
8+
UnaryStreamCall,
9+
)
10+
from google.protobuf.message import Message
11+
12+
from sentry_sdk import Hub
13+
from sentry_sdk.consts import OP
14+
15+
16+
class ClientInterceptor:
17+
@staticmethod
18+
def _update_client_call_details_metadata_from_hub(
19+
client_call_details: ClientCallDetails, hub: Hub
20+
) -> ClientCallDetails:
21+
metadata = (
22+
list(client_call_details.metadata) if client_call_details.metadata else []
23+
)
24+
for key, value in hub.iter_trace_propagation_headers():
25+
metadata.append((key, value))
26+
27+
client_call_details = ClientCallDetails(
28+
method=client_call_details.method,
29+
timeout=client_call_details.timeout,
30+
metadata=metadata,
31+
credentials=client_call_details.credentials,
32+
wait_for_ready=client_call_details.wait_for_ready,
33+
)
34+
35+
return client_call_details
36+
37+
38+
class SentryUnaryUnaryClientInterceptor(ClientInterceptor, UnaryUnaryClientInterceptor): # type: ignore
39+
async def intercept_unary_unary(
40+
self,
41+
continuation: Callable[[ClientCallDetails, Message], UnaryUnaryCall],
42+
client_call_details: ClientCallDetails,
43+
request: Message,
44+
) -> Union[UnaryUnaryCall, Message]:
45+
hub = Hub.current
46+
method = client_call_details.method
47+
48+
with hub.start_span(
49+
op=OP.GRPC_CLIENT, description="unary unary call to %s" % method.decode()
50+
) as span:
51+
span.set_data("type", "unary unary")
52+
span.set_data("method", method)
53+
54+
client_call_details = self._update_client_call_details_metadata_from_hub(
55+
client_call_details, hub
56+
)
57+
58+
response = await continuation(client_call_details, request)
59+
status_code = await response.code()
60+
span.set_data("code", status_code.name)
61+
62+
return response
63+
64+
65+
class SentryUnaryStreamClientInterceptor(
66+
ClientInterceptor, UnaryStreamClientInterceptor # type: ignore
67+
):
68+
async def intercept_unary_stream(
69+
self,
70+
continuation: Callable[[ClientCallDetails, Message], UnaryStreamCall],
71+
client_call_details: ClientCallDetails,
72+
request: Message,
73+
) -> Union[AsyncIterable[Any], UnaryStreamCall]:
74+
hub = Hub.current
75+
method = client_call_details.method
76+
77+
with hub.start_span(
78+
op=OP.GRPC_CLIENT, description="unary stream call to %s" % method.decode()
79+
) as span:
80+
span.set_data("type", "unary stream")
81+
span.set_data("method", method)
82+
83+
client_call_details = self._update_client_call_details_metadata_from_hub(
84+
client_call_details, hub
85+
)
86+
87+
response = await continuation(client_call_details, request)
88+
# status_code = await response.code()
89+
# span.set_data("code", status_code)
90+
91+
return response
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
from sentry_sdk import Hub
2+
from sentry_sdk._types import MYPY
3+
from sentry_sdk.consts import OP
4+
from sentry_sdk.integrations import DidNotEnable
5+
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_CUSTOM
6+
from sentry_sdk.utils import event_from_exception
7+
8+
if MYPY:
9+
from collections.abc import Awaitable, Callable
10+
from typing import Any
11+
12+
13+
try:
14+
import grpc
15+
from grpc import HandlerCallDetails, RpcMethodHandler
16+
from grpc.aio import ServicerContext
17+
except ImportError:
18+
raise DidNotEnable("grpcio is not installed")
19+
20+
21+
class ServerInterceptor(grpc.aio.ServerInterceptor): # type: ignore
22+
def __init__(self, find_name=None):
23+
# type: (ServerInterceptor, Callable[[ServicerContext], str] | None) -> None
24+
self._find_method_name = find_name or self._find_name
25+
26+
super(ServerInterceptor, self).__init__()
27+
28+
async def intercept_service(self, continuation, handler_call_details):
29+
# type: (ServerInterceptor, Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]], HandlerCallDetails) -> Awaitable[RpcMethodHandler]
30+
self._handler_call_details = handler_call_details
31+
handler = await continuation(handler_call_details)
32+
33+
if not handler.request_streaming and not handler.response_streaming:
34+
handler_factory = grpc.unary_unary_rpc_method_handler
35+
36+
async def wrapped(request, context):
37+
# type: (Any, ServicerContext) -> Any
38+
name = self._find_method_name(context)
39+
if not name:
40+
return await handler(request, context)
41+
42+
hub = Hub.current
43+
44+
# What if the headers are empty?
45+
transaction = Transaction.continue_from_headers(
46+
dict(context.invocation_metadata()),
47+
op=OP.GRPC_SERVER,
48+
name=name,
49+
source=TRANSACTION_SOURCE_CUSTOM,
50+
)
51+
52+
with hub.start_transaction(transaction=transaction):
53+
try:
54+
return await handler.unary_unary(request, context)
55+
except Exception as exc:
56+
event, hint = event_from_exception(
57+
exc,
58+
mechanism={"type": "grpc", "handled": False},
59+
)
60+
hub.capture_event(event, hint=hint)
61+
raise
62+
63+
elif not handler.request_streaming and handler.response_streaming:
64+
handler_factory = grpc.unary_stream_rpc_method_handler
65+
66+
async def wrapped(request, context): # type: ignore
67+
# type: (Any, ServicerContext) -> Any
68+
async for r in handler.unary_stream(request, context):
69+
yield r
70+
71+
elif handler.request_streaming and not handler.response_streaming:
72+
handler_factory = grpc.stream_unary_rpc_method_handler
73+
74+
async def wrapped(request, context):
75+
# type: (Any, ServicerContext) -> Any
76+
response = handler.stream_unary(request, context)
77+
return await response
78+
79+
elif handler.request_streaming and handler.response_streaming:
80+
handler_factory = grpc.stream_stream_rpc_method_handler
81+
82+
async def wrapped(request, context): # type: ignore
83+
# type: (Any, ServicerContext) -> Any
84+
async for r in handler.stream_stream(request, context):
85+
yield r
86+
87+
return handler_factory(
88+
wrapped,
89+
request_deserializer=handler.request_deserializer,
90+
response_serializer=handler.response_serializer,
91+
)
92+
93+
def _find_name(self, context):
94+
# type: (ServicerContext) -> str
95+
return self._handler_call_details.method

0 commit comments

Comments
 (0)