-
Notifications
You must be signed in to change notification settings - Fork 550
Dramatiq integration from @jacobsvante #3397
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit
Hold shift + click to select a range
1c7f20f
Initial version of dramatiq integration
antonpirker 2f0ee82
Added dramatiq to test matrix
antonpirker bcc19cb
Some typing
antonpirker b0cdc1e
Merge branch 'master' into antonpirker/dramatiq
antonpirker 5eb2ce2
Some cleanup
antonpirker 44850d6
Removed hub
antonpirker ec22393
linting
antonpirker a665552
Merge branch 'master' into antonpirker/dramatiq
antonpirker 3ec5eed
Removed conftest
antonpirker 249d470
linting
antonpirker c83062d
Merge branch 'master' into antonpirker/dramatiq
antonpirker f7cb139
Merge branch 'master' into antonpirker/dramatiq
antonpirker 773432c
Do not create high cardinality tags
antonpirker 12a283c
Merge branch 'antonpirker/dramatiq' of github.com:getsentry/sentry-py…
antonpirker 7bc63ed
Updated test
antonpirker 387b5a2
Added shout out
antonpirker 05000db
make mypy happy
antonpirker 18fce4e
Apply suggestions from code review
antonpirker e0590ef
Implemented suggestions from code review
antonpirker 9f73944
easier readable code
antonpirker 5c75c4f
Cleanup
antonpirker 533d644
Merge branch 'master' into antonpirker/dramatiq
antonpirker File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,6 +80,7 @@ | |
"arq", | ||
"beam", | ||
"celery", | ||
"dramatiq", | ||
"huey", | ||
"rq", | ||
"spark", | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,180 @@ | ||
import json | ||
|
||
import sentry_sdk | ||
from sentry_sdk.integrations import Integration | ||
from sentry_sdk._types import TYPE_CHECKING | ||
from sentry_sdk.utils import ( | ||
AnnotatedValue, | ||
capture_internal_exceptions, | ||
event_from_exception, | ||
) | ||
|
||
from dramatiq.broker import Broker # type: ignore | ||
from dramatiq.message import Message # type: ignore | ||
from dramatiq.middleware import Middleware, default_middleware # type: ignore | ||
from dramatiq.errors import Retry # type: ignore | ||
|
||
if TYPE_CHECKING: | ||
from typing import Any, Callable, Dict, Optional, Union | ||
from sentry_sdk._types import Event, Hint | ||
|
||
|
||
class DramatiqIntegration(Integration): | ||
""" | ||
Dramatiq integration for Sentry | ||
|
||
Please make sure that you call `sentry_sdk.init` *before* initializing | ||
your broker, as it monkey patches `Broker.__init__`. | ||
|
||
This integration was originally developed and maintained | ||
by https://github.com/jacobsvante and later donated to the Sentry | ||
project. | ||
""" | ||
|
||
identifier = "dramatiq" | ||
|
||
@staticmethod | ||
def setup_once(): | ||
# type: () -> None | ||
_patch_dramatiq_broker() | ||
|
||
|
||
def _patch_dramatiq_broker(): | ||
# type: () -> None | ||
original_broker__init__ = Broker.__init__ | ||
|
||
def sentry_patched_broker__init__(self, *args, **kw): | ||
# type: (Broker, *Any, **Any) -> None | ||
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) | ||
|
||
try: | ||
middleware = kw.pop("middleware") | ||
except KeyError: | ||
# Unfortunately Broker and StubBroker allows middleware to be | ||
# passed in as positional arguments, whilst RabbitmqBroker and | ||
# RedisBroker does not. | ||
if len(args) > 0: | ||
assert len(args) < 2 | ||
middleware = None if args[0] is None else args[0] | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
args = [] # type: ignore | ||
else: | ||
middleware = None | ||
|
||
if middleware is None: | ||
middleware = list(m() for m in default_middleware) | ||
else: | ||
middleware = list(middleware) | ||
|
||
if integration is not None: | ||
assert SentryMiddleware not in ( | ||
m.__class__ for m in middleware | ||
), "Sentry middleware must not be passed in manually to broker" | ||
middleware.insert(0, SentryMiddleware()) | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
kw["middleware"] = middleware | ||
# raise Exception([args, kw]) | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
original_broker__init__(self, *args, **kw) | ||
|
||
Broker.__init__ = sentry_patched_broker__init__ | ||
|
||
|
||
class SentryMiddleware(Middleware): # type: ignore[misc] | ||
""" | ||
A Dramatiq middleware that automatically captures and sends | ||
exceptions to Sentry. | ||
|
||
This is automatically added to every instantiated broker via the | ||
DramatiqIntegration. | ||
""" | ||
|
||
def before_process_message(self, broker, message): | ||
# type: (Broker, Message) -> None | ||
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) | ||
if integration is None: | ||
return | ||
|
||
message._scope_manager = sentry_sdk.new_scope() | ||
message._scope_manager.__enter__() | ||
|
||
scope = sentry_sdk.get_current_scope() | ||
scope.transaction = message.actor_name | ||
scope.set_extra("dramatiq_message_id", message.message_id) | ||
scope.add_event_processor(_make_message_event_processor(message, integration)) | ||
|
||
def after_process_message(self, broker, message, *, result=None, exception=None): | ||
# type: (Broker, Message, Any, Optional[Any], Optional[Exception]) -> None | ||
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) | ||
if integration is None: | ||
return | ||
|
||
actor = broker.get_actor(message.actor_name) | ||
throws = message.options.get("throws") or actor.options.get("throws") | ||
|
||
try: | ||
if ( | ||
exception is not None | ||
and not (throws and isinstance(exception, throws)) | ||
and not isinstance(exception, Retry) | ||
): | ||
event, hint = event_from_exception( | ||
exception, | ||
client_options=sentry_sdk.get_client().options, | ||
mechanism={ | ||
"type": DramatiqIntegration.identifier, | ||
"handled": False, | ||
}, | ||
) | ||
sentry_sdk.capture_event(event, hint=hint) | ||
finally: | ||
message._scope_manager.__exit__(None, None, None) | ||
|
||
|
||
def _make_message_event_processor(message, integration): | ||
# type: (Message, DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]] | ||
|
||
def inner(event, hint): | ||
# type: (Event, Hint) -> Optional[Event] | ||
with capture_internal_exceptions(): | ||
DramatiqMessageExtractor(message).extract_into_event(event) | ||
|
||
return event | ||
|
||
return inner | ||
|
||
|
||
class DramatiqMessageExtractor(object): | ||
def __init__(self, message): | ||
# type: (Message) -> None | ||
self.message_data = dict(message.asdict()) | ||
|
||
def content_length(self): | ||
# type: () -> int | ||
return len(json.dumps(self.message_data)) | ||
|
||
def extract_into_event(self, event): | ||
# type: (Event) -> None | ||
client = sentry_sdk.get_client() | ||
if not client.is_active(): | ||
return | ||
|
||
data = None # type: Optional[Union[AnnotatedValue, Dict[str, Any]]] | ||
|
||
content_length = self.content_length() | ||
contexts = event.setdefault("contexts", {}) | ||
request_info = contexts.setdefault("dramatiq", {}) | ||
request_info["type"] = "dramatiq" | ||
|
||
max_request_body_size = client.options["max_request_body_size"] | ||
if ( | ||
max_request_body_size == "never" | ||
or (max_request_body_size == "small" and content_length > 10**3) | ||
or (max_request_body_size == "medium" and content_length > 10**4) | ||
): | ||
data = AnnotatedValue( | ||
"", | ||
{"rem": [["!config", "x", 0, content_length]], "len": content_length}, | ||
) | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
data = self.message_data | ||
|
||
request_info["data"] = data |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
import pytest | ||
|
||
pytest.importorskip("dramatiq") |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.