-
Notifications
You must be signed in to change notification settings - Fork 549
gRPC integration and aio interceptors #2369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
antonpirker
merged 63 commits into
getsentry:master
from
fdellekart:feat/grpc-aio-integration
Nov 8, 2023
Merged
Changes from all commits
Commits
Show all changes
63 commits
Select commit
Hold shift + click to select a range
c341091
feat: Add interceptor for async gRPC server
a554078
feat: Add exception handling to async gRPC interceptor
0296157
feat: Add gRPC integration with monkeypatch for synchronous client si…
fdellekart dfb8726
feat: Add patch with gRPC server side interceptors for grpc.server
fdellekart aaf705e
test: Add tests to verify gRPC integration does not break other inter…
fdellekart 155e998
feat: Add monkeypatch for async server to gRPC integrations
fdellekart 0b487e1
feat(grpc): Add async unary unary client interceptor
fdellekart ce73a36
feat(grpc): Add monkeypatching for async channels to integration
fdellekart 96c2c91
test(grpc): Add test for aio client integration
fdellekart 4103929
refactor: Avoid unnecessary code duplication in grpc integration
fdellekart a8ddcf6
fix: gRPC async metadata can be tuple although differently typed
fdellekart 1861aa8
Merge branch 'master' into feat/grpc-aio-integration
antonpirker 9f1bc88
refactor(grpc): consistent naming and imports
fdellekart fac3ee3
Merge branch 'master' into feat/grpc-aio-integration
antonpirker b43784f
Added pytest-asyncio to test deps
antonpirker 34bad4c
Added types to linter requirements
antonpirker 5a89d9b
Made mechanism type lowercase
antonpirker 3c493bc
Merge branch 'master' into feat/grpc-aio-integration
antonpirker 8341a0d
fix: typing of async gRPC integration
fdellekart 69704d4
feat(gRPC): Add async unary-stream interceptor
fdellekart 53bb01c
Merge branch 'master' into feat/grpc-aio-integration
antonpirker 4cde781
Merge branch 'master' into feat/grpc-aio-integration
antonpirker ccacf0a
Trying to make typing work in older Python versions
antonpirker c1c0be7
The tests need the types.
antonpirker 1dff2a7
Merge branch 'master' into feat/grpc-aio-integration
antonpirker 4c71549
Fixed typo
antonpirker c734463
Merge branch 'feat/grpc-aio-integration' of github.com:fdellekart/sen…
antonpirker b0b80fc
Merge branch 'master' into feat/grpc-aio-integration
antonpirker d2e1bbe
Cleaned up tests
antonpirker d4ffa1f
Prevent flake8 from checking generated files
antonpirker d737830
Tell black to not check auto generated files.
antonpirker 02cfbde
Added pyproject.toml to right directory
antonpirker 4bf2c83
Fixed some linting
antonpirker 0fb680e
Merge branch 'feat/grpc-aio-integration' of github.com:fdellekart/sen…
fdellekart 2437f87
test(gRPC): Add test for unary stream client interceptor
fdellekart 30ada2f
refactor(gRPC): Remove external APIs from server side interceptor
fdellekart 8b3e7e6
Cleaned up tests
antonpirker 9aba63e
Updated black config to work with pre-commit
antonpirker b625b2d
cleanup
antonpirker 9c9f3d8
cleanup
antonpirker 7cd1057
Merge branch 'feat/grpc-aio-integration' of github.com:fdellekart/sen…
fdellekart 2276ee4
refactor: gRPC code generation script to be run from project root
fdellekart 119e415
test: Add test endpoint for unary-stream gRPC method
fdellekart c465dcf
fix: Typos in interceptors and status code of stream response stuck f…
fdellekart aec6c7c
test: Add test for async unary-stream client interceptor
fdellekart 7b09a67
fix(gRPC): Make sure that server side interceptor does not break unar…
fdellekart 6a19737
feat(gRPC): Ensure backwards compatibility in case gRPC interceptors …
fdellekart 545f26b
test(gRPC): Add tests for currently unsupported RPC types
fdellekart dac5b60
fix(gRPC): aio integration was breaking unsupported RPC types
fdellekart 9a97c9a
Merge branch 'master' into feat/grpc-aio-integration
antonpirker ff2ff75
Merge branch 'master' into feat/grpc-aio-integration
antonpirker 63b912e
Merge branch 'master' into feat/grpc-aio-integration
antonpirker 7cb78df
Small typing fix
antonpirker b0e6d19
Small linting fix
antonpirker b678001
Merge branch 'master' into feat/grpc-aio-integration
antonpirker ff9372b
Merge branch 'master' into pr/fdellekart/2369
antonpirker afacd37
Consistent naming (was my fault I guess)
antonpirker 7be1306
Set nicer transaction name for async server interceptor
antonpirker 27ab22f
Merge branch 'master' into feat/grpc-aio-integration
antonpirker 2247336
Merge branch 'master' into feat/grpc-aio-integration
antonpirker f83e610
Make linter happy
antonpirker 1271459
Make mypy happy
antonpirker dddfa41
Merge branch 'master' into feat/grpc-aio-integration
antonpirker File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
[tool.black] | ||
# 'extend-exclude' excludes files or directories in addition to the defaults | ||
extend-exclude = ''' | ||
# A regex preceded with ^/ will apply only to files and directories | ||
# in the root of the project. | ||
( | ||
.*_pb2.py # exclude autogenerated Protocol Buffer files anywhere in the project | ||
| .*_pb2_grpc.py # exclude autogenerated Protocol Buffer files anywhere in the project | ||
) | ||
''' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,152 @@ | ||
from .server import ServerInterceptor # noqa: F401 | ||
from .client import ClientInterceptor # noqa: F401 | ||
from functools import wraps | ||
|
||
import grpc | ||
from grpc import Channel, Server, intercept_channel | ||
from grpc.aio import Channel as AsyncChannel | ||
from grpc.aio import Server as AsyncServer | ||
|
||
from sentry_sdk.integrations import Integration | ||
from sentry_sdk._types import TYPE_CHECKING | ||
|
||
from .client import ClientInterceptor | ||
from .server import ServerInterceptor | ||
from .aio.server import ServerInterceptor as AsyncServerInterceptor | ||
from .aio.client import ( | ||
SentryUnaryUnaryClientInterceptor as AsyncUnaryUnaryClientInterceptor, | ||
) | ||
from .aio.client import ( | ||
SentryUnaryStreamClientInterceptor as AsyncUnaryStreamClientIntercetor, | ||
) | ||
|
||
from typing import Any, Optional, Sequence | ||
|
||
# Hack to get new Python features working in older versions | ||
# without introducing a hard dependency on `typing_extensions` | ||
# from: https://stackoverflow.com/a/71944042/300572 | ||
if TYPE_CHECKING: | ||
from typing import ParamSpec, Callable | ||
else: | ||
# Fake ParamSpec | ||
class ParamSpec: | ||
def __init__(self, _): | ||
self.args = None | ||
self.kwargs = None | ||
|
||
# Callable[anything] will return None | ||
class _Callable: | ||
def __getitem__(self, _): | ||
return None | ||
|
||
# Make instances | ||
Callable = _Callable() | ||
|
||
P = ParamSpec("P") | ||
|
||
|
||
def _wrap_channel_sync(func: Callable[P, Channel]) -> Callable[P, Channel]: | ||
"Wrapper for synchronous secure and insecure channel." | ||
|
||
@wraps(func) | ||
def patched_channel(*args: Any, **kwargs: Any) -> Channel: | ||
channel = func(*args, **kwargs) | ||
if not ClientInterceptor._is_intercepted: | ||
ClientInterceptor._is_intercepted = True | ||
return intercept_channel(channel, ClientInterceptor()) | ||
else: | ||
return channel | ||
|
||
return patched_channel | ||
|
||
|
||
def _wrap_intercept_channel(func: Callable[P, Channel]) -> Callable[P, Channel]: | ||
@wraps(func) | ||
def patched_intercept_channel( | ||
channel: Channel, *interceptors: grpc.ServerInterceptor | ||
) -> Channel: | ||
if ClientInterceptor._is_intercepted: | ||
interceptors = tuple( | ||
[ | ||
interceptor | ||
for interceptor in interceptors | ||
if not isinstance(interceptor, ClientInterceptor) | ||
] | ||
) | ||
else: | ||
interceptors = interceptors | ||
return intercept_channel(channel, *interceptors) | ||
|
||
return patched_intercept_channel # type: ignore | ||
|
||
|
||
def _wrap_channel_async(func: Callable[P, AsyncChannel]) -> Callable[P, AsyncChannel]: | ||
"Wrapper for asynchronous secure and insecure channel." | ||
|
||
@wraps(func) | ||
def patched_channel( | ||
*args: P.args, | ||
interceptors: Optional[Sequence[grpc.aio.ClientInterceptor]] = None, | ||
**kwargs: P.kwargs, | ||
) -> Channel: | ||
sentry_interceptors = [ | ||
AsyncUnaryUnaryClientInterceptor(), | ||
AsyncUnaryStreamClientIntercetor(), | ||
] | ||
interceptors = [*sentry_interceptors, *(interceptors or [])] | ||
return func(*args, interceptors=interceptors, **kwargs) # type: ignore | ||
|
||
return patched_channel # type: ignore | ||
|
||
|
||
def _wrap_sync_server(func: Callable[P, Server]) -> Callable[P, Server]: | ||
"""Wrapper for synchronous server.""" | ||
|
||
@wraps(func) | ||
def patched_server( | ||
*args: P.args, | ||
interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None, | ||
**kwargs: P.kwargs, | ||
) -> Server: | ||
interceptors = [ | ||
interceptor | ||
for interceptor in interceptors or [] | ||
if not isinstance(interceptor, ServerInterceptor) | ||
] | ||
server_interceptor = ServerInterceptor() | ||
interceptors = [server_interceptor, *(interceptors or [])] | ||
return func(*args, interceptors=interceptors, **kwargs) # type: ignore | ||
|
||
return patched_server # type: ignore | ||
|
||
|
||
def _wrap_async_server(func: Callable[P, AsyncServer]) -> Callable[P, AsyncServer]: | ||
"""Wrapper for asynchronous server.""" | ||
|
||
@wraps(func) | ||
def patched_aio_server( | ||
*args: P.args, | ||
interceptors: Optional[Sequence[grpc.ServerInterceptor]] = None, | ||
**kwargs: P.kwargs, | ||
) -> Server: | ||
server_interceptor = AsyncServerInterceptor() | ||
interceptors = [server_interceptor, *(interceptors or [])] | ||
return func(*args, interceptors=interceptors, **kwargs) # type: ignore | ||
|
||
return patched_aio_server # type: ignore | ||
|
||
|
||
class GRPCIntegration(Integration): | ||
identifier = "grpc" | ||
|
||
@staticmethod | ||
def setup_once() -> None: | ||
import grpc | ||
|
||
grpc.insecure_channel = _wrap_channel_sync(grpc.insecure_channel) | ||
grpc.secure_channel = _wrap_channel_sync(grpc.secure_channel) | ||
grpc.intercept_channel = _wrap_intercept_channel(grpc.intercept_channel) | ||
|
||
grpc.aio.insecure_channel = _wrap_channel_async(grpc.aio.insecure_channel) | ||
grpc.aio.secure_channel = _wrap_channel_async(grpc.aio.secure_channel) | ||
|
||
grpc.server = _wrap_sync_server(grpc.server) | ||
grpc.aio.server = _wrap_async_server(grpc.aio.server) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from .server import ServerInterceptor # noqa: F401 | ||
from .client import ClientInterceptor # noqa: F401 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
from typing import Callable, Union, AsyncIterable, Any | ||
|
||
from grpc.aio import ( | ||
UnaryUnaryClientInterceptor, | ||
UnaryStreamClientInterceptor, | ||
ClientCallDetails, | ||
UnaryUnaryCall, | ||
UnaryStreamCall, | ||
) | ||
from google.protobuf.message import Message | ||
|
||
from sentry_sdk import Hub | ||
from sentry_sdk.consts import OP | ||
|
||
|
||
class ClientInterceptor: | ||
@staticmethod | ||
def _update_client_call_details_metadata_from_hub( | ||
client_call_details: ClientCallDetails, hub: Hub | ||
) -> ClientCallDetails: | ||
metadata = ( | ||
list(client_call_details.metadata) if client_call_details.metadata else [] | ||
) | ||
for key, value in hub.iter_trace_propagation_headers(): | ||
metadata.append((key, value)) | ||
|
||
client_call_details = ClientCallDetails( | ||
method=client_call_details.method, | ||
timeout=client_call_details.timeout, | ||
metadata=metadata, | ||
credentials=client_call_details.credentials, | ||
wait_for_ready=client_call_details.wait_for_ready, | ||
) | ||
|
||
return client_call_details | ||
|
||
|
||
class SentryUnaryUnaryClientInterceptor(ClientInterceptor, UnaryUnaryClientInterceptor): # type: ignore | ||
async def intercept_unary_unary( | ||
self, | ||
continuation: Callable[[ClientCallDetails, Message], UnaryUnaryCall], | ||
client_call_details: ClientCallDetails, | ||
request: Message, | ||
) -> Union[UnaryUnaryCall, Message]: | ||
hub = Hub.current | ||
method = client_call_details.method | ||
|
||
with hub.start_span( | ||
op=OP.GRPC_CLIENT, description="unary unary call to %s" % method.decode() | ||
) as span: | ||
span.set_data("type", "unary unary") | ||
span.set_data("method", method) | ||
|
||
client_call_details = self._update_client_call_details_metadata_from_hub( | ||
client_call_details, hub | ||
) | ||
|
||
response = await continuation(client_call_details, request) | ||
status_code = await response.code() | ||
span.set_data("code", status_code.name) | ||
|
||
return response | ||
|
||
|
||
class SentryUnaryStreamClientInterceptor( | ||
ClientInterceptor, UnaryStreamClientInterceptor # type: ignore | ||
): | ||
async def intercept_unary_stream( | ||
self, | ||
continuation: Callable[[ClientCallDetails, Message], UnaryStreamCall], | ||
client_call_details: ClientCallDetails, | ||
request: Message, | ||
) -> Union[AsyncIterable[Any], UnaryStreamCall]: | ||
hub = Hub.current | ||
method = client_call_details.method | ||
|
||
with hub.start_span( | ||
op=OP.GRPC_CLIENT, description="unary stream call to %s" % method.decode() | ||
) as span: | ||
span.set_data("type", "unary stream") | ||
span.set_data("method", method) | ||
|
||
client_call_details = self._update_client_call_details_metadata_from_hub( | ||
client_call_details, hub | ||
) | ||
|
||
response = await continuation(client_call_details, request) | ||
# status_code = await response.code() | ||
# span.set_data("code", status_code) | ||
|
||
return response |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
from sentry_sdk import Hub | ||
from sentry_sdk._types import MYPY | ||
from sentry_sdk.consts import OP | ||
from sentry_sdk.integrations import DidNotEnable | ||
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_CUSTOM | ||
from sentry_sdk.utils import event_from_exception | ||
|
||
if MYPY: | ||
from collections.abc import Awaitable, Callable | ||
from typing import Any | ||
|
||
|
||
try: | ||
import grpc | ||
from grpc import HandlerCallDetails, RpcMethodHandler | ||
from grpc.aio import ServicerContext | ||
except ImportError: | ||
raise DidNotEnable("grpcio is not installed") | ||
|
||
|
||
class ServerInterceptor(grpc.aio.ServerInterceptor): # type: ignore | ||
def __init__(self, find_name=None): | ||
# type: (ServerInterceptor, Callable[[ServicerContext], str] | None) -> None | ||
self._find_method_name = find_name or self._find_name | ||
|
||
super(ServerInterceptor, self).__init__() | ||
|
||
async def intercept_service(self, continuation, handler_call_details): | ||
# type: (ServerInterceptor, Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler]], HandlerCallDetails) -> Awaitable[RpcMethodHandler] | ||
self._handler_call_details = handler_call_details | ||
handler = await continuation(handler_call_details) | ||
|
||
if not handler.request_streaming and not handler.response_streaming: | ||
handler_factory = grpc.unary_unary_rpc_method_handler | ||
|
||
async def wrapped(request, context): | ||
# type: (Any, ServicerContext) -> Any | ||
name = self._find_method_name(context) | ||
if not name: | ||
return await handler(request, context) | ||
|
||
hub = Hub.current | ||
|
||
# What if the headers are empty? | ||
transaction = Transaction.continue_from_headers( | ||
dict(context.invocation_metadata()), | ||
op=OP.GRPC_SERVER, | ||
name=name, | ||
source=TRANSACTION_SOURCE_CUSTOM, | ||
) | ||
|
||
with hub.start_transaction(transaction=transaction): | ||
try: | ||
return await handler.unary_unary(request, context) | ||
except Exception as exc: | ||
event, hint = event_from_exception( | ||
exc, | ||
mechanism={"type": "grpc", "handled": False}, | ||
) | ||
hub.capture_event(event, hint=hint) | ||
raise | ||
|
||
elif not handler.request_streaming and handler.response_streaming: | ||
handler_factory = grpc.unary_stream_rpc_method_handler | ||
|
||
async def wrapped(request, context): # type: ignore | ||
# type: (Any, ServicerContext) -> Any | ||
async for r in handler.unary_stream(request, context): | ||
yield r | ||
|
||
elif handler.request_streaming and not handler.response_streaming: | ||
handler_factory = grpc.stream_unary_rpc_method_handler | ||
|
||
async def wrapped(request, context): | ||
# type: (Any, ServicerContext) -> Any | ||
response = handler.stream_unary(request, context) | ||
return await response | ||
|
||
elif handler.request_streaming and handler.response_streaming: | ||
handler_factory = grpc.stream_stream_rpc_method_handler | ||
|
||
async def wrapped(request, context): # type: ignore | ||
# type: (Any, ServicerContext) -> Any | ||
async for r in handler.stream_stream(request, context): | ||
yield r | ||
|
||
return handler_factory( | ||
wrapped, | ||
request_deserializer=handler.request_deserializer, | ||
response_serializer=handler.response_serializer, | ||
) | ||
|
||
def _find_name(self, context): | ||
# type: (ServicerContext) -> str | ||
return self._handler_call_details.method |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.