Skip to content

feat(celery): Allow to override propagate_traces per task #2331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 6, 2023
107 changes: 56 additions & 51 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,60 +140,65 @@ def apply_async(*args, **kwargs):
# type: (*Any, **Any) -> Any
hub = Hub.current
integration = hub.get_integration(CeleryIntegration)
if integration is not None and integration.propagate_traces:
with hub.start_span(
op=OP.QUEUE_SUBMIT_CELERY, description=args[0].name
) as span:
with capture_internal_exceptions():
headers = dict(hub.iter_trace_propagation_headers(span))
if integration.monitor_beat_tasks:
headers.update(
{
"sentry-monitor-start-timestamp-s": "%.9f"
% _now_seconds_since_epoch(),
}

if integration is None:
return f(*args, **kwargs)

# Note: kwargs can contain headers=None, so no setdefault!
# Unsure which backend though.
kwarg_headers = kwargs.get("headers") or {}
propagate_traces = kwarg_headers.pop(
"sentry-propagate-traces", integration.propagate_traces
)

if not propagate_traces:
return f(*args, **kwargs)

with hub.start_span(
op=OP.QUEUE_SUBMIT_CELERY, description=args[0].name
) as span:
with capture_internal_exceptions():
headers = dict(hub.iter_trace_propagation_headers(span))
if integration.monitor_beat_tasks:
headers.update(
{
"sentry-monitor-start-timestamp-s": "%.9f"
% _now_seconds_since_epoch(),
}
)

if headers:
existing_baggage = kwarg_headers.get(BAGGAGE_HEADER_NAME)
sentry_baggage = headers.get(BAGGAGE_HEADER_NAME)

combined_baggage = sentry_baggage or existing_baggage
if sentry_baggage and existing_baggage:
combined_baggage = "{},{}".format(
existing_baggage,
sentry_baggage,
)

if headers:
# Note: kwargs can contain headers=None, so no setdefault!
# Unsure which backend though.
kwarg_headers = kwargs.get("headers") or {}

existing_baggage = kwarg_headers.get(BAGGAGE_HEADER_NAME)
sentry_baggage = headers.get(BAGGAGE_HEADER_NAME)

combined_baggage = sentry_baggage or existing_baggage
if sentry_baggage and existing_baggage:
combined_baggage = "{},{}".format(
existing_baggage,
sentry_baggage,
)

kwarg_headers.update(headers)
if combined_baggage:
kwarg_headers[BAGGAGE_HEADER_NAME] = combined_baggage

# https://github.com/celery/celery/issues/4875
#
# Need to setdefault the inner headers too since other
# tracing tools (dd-trace-py) also employ this exact
# workaround and we don't want to break them.
kwarg_headers.setdefault("headers", {}).update(headers)
if combined_baggage:
kwarg_headers["headers"][
BAGGAGE_HEADER_NAME
] = combined_baggage

# Add the Sentry options potentially added in `sentry_apply_entry`
# to the headers (done when auto-instrumenting Celery Beat tasks)
for key, value in kwarg_headers.items():
if key.startswith("sentry-"):
kwarg_headers["headers"][key] = value

kwargs["headers"] = kwarg_headers
kwarg_headers.update(headers)
if combined_baggage:
kwarg_headers[BAGGAGE_HEADER_NAME] = combined_baggage

# https://github.com/celery/celery/issues/4875
#
# Need to setdefault the inner headers too since other
# tracing tools (dd-trace-py) also employ this exact
# workaround and we don't want to break them.
kwarg_headers.setdefault("headers", {}).update(headers)
if combined_baggage:
kwarg_headers["headers"][BAGGAGE_HEADER_NAME] = combined_baggage

# Add the Sentry options potentially added in `sentry_apply_entry`
# to the headers (done when auto-instrumenting Celery Beat tasks)
for key, value in kwarg_headers.items():
if key.startswith("sentry-"):
kwarg_headers["headers"][key] = value

kwargs["headers"] = kwarg_headers

return f(*args, **kwargs)
else:
return f(*args, **kwargs)

return apply_async # type: ignore
Expand Down
33 changes: 32 additions & 1 deletion tests/integrations/celery/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

pytest.importorskip("celery")

from sentry_sdk import Hub, configure_scope, start_transaction
from sentry_sdk import Hub, configure_scope, start_transaction, get_current_span
from sentry_sdk.integrations.celery import CeleryIntegration, _get_headers

from sentry_sdk._compat import text_type
Expand Down Expand Up @@ -526,3 +526,34 @@ def dummy_task(self, x, y):
"custom=value",
]
)


def test_sentry_propagate_traces_override(init_celery):
"""
Test if the `sentry-propagate-traces` header given to `apply_async`
overrides the `propagate_traces` parameter in the integration constructor.
"""
celery = init_celery(
propagate_traces=True, traces_sample_rate=1.0, release="abcdef"
)

@celery.task(name="dummy_task", bind=True)
def dummy_task(self, message):
trace_id = get_current_span().trace_id
return trace_id

with start_transaction() as transaction:
transaction_trace_id = transaction.trace_id

# should propagate trace
task_transaction_id = dummy_task.apply_async(
args=("some message",),
).get()
assert transaction_trace_id == task_transaction_id

# should NOT propagate trace (overrides `propagate_traces` parameter in integration constructor)
task_transaction_id = dummy_task.apply_async(
args=("another message",),
headers={"sentry-propagate-traces": False},
).get()
assert transaction_trace_id != task_transaction_id