Skip to content

Commit 4f773a1

Browse files
feat(celery): Allow to override propagate_traces per task (#2331)
Adds support for a sentry-propagate-traces header on apply_async that overrides the default behavior set through the Celery integration's propagate_traces flag. Example usage: my_task.apply_async(..., headers={"sentry-propagate-traces": False}) Example use case: We ourselves have a task that is running once every two weeks and it is sampled with ~0.01 Percent. So we can one transaction from this task a year (give or take). This task starts hundreds of child tasks. All those child tasks will inherit the sampling decision from the original task and thus will be dropped most of the times. But we want to have those child tasks transactions in our backend no matter the sampling decision of the parent. --------- Co-authored-by: Anton Pirker <[email protected]>
1 parent 0fb0dea commit 4f773a1

File tree

2 files changed

+88
-52
lines changed

2 files changed

+88
-52
lines changed

sentry_sdk/integrations/celery.py

Lines changed: 56 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -140,60 +140,65 @@ def apply_async(*args, **kwargs):
140140
# type: (*Any, **Any) -> Any
141141
hub = Hub.current
142142
integration = hub.get_integration(CeleryIntegration)
143-
if integration is not None and integration.propagate_traces:
144-
with hub.start_span(
145-
op=OP.QUEUE_SUBMIT_CELERY, description=args[0].name
146-
) as span:
147-
with capture_internal_exceptions():
148-
headers = dict(hub.iter_trace_propagation_headers(span))
149-
if integration.monitor_beat_tasks:
150-
headers.update(
151-
{
152-
"sentry-monitor-start-timestamp-s": "%.9f"
153-
% _now_seconds_since_epoch(),
154-
}
143+
144+
if integration is None:
145+
return f(*args, **kwargs)
146+
147+
# Note: kwargs can contain headers=None, so no setdefault!
148+
# Unsure which backend though.
149+
kwarg_headers = kwargs.get("headers") or {}
150+
propagate_traces = kwarg_headers.pop(
151+
"sentry-propagate-traces", integration.propagate_traces
152+
)
153+
154+
if not propagate_traces:
155+
return f(*args, **kwargs)
156+
157+
with hub.start_span(
158+
op=OP.QUEUE_SUBMIT_CELERY, description=args[0].name
159+
) as span:
160+
with capture_internal_exceptions():
161+
headers = dict(hub.iter_trace_propagation_headers(span))
162+
if integration.monitor_beat_tasks:
163+
headers.update(
164+
{
165+
"sentry-monitor-start-timestamp-s": "%.9f"
166+
% _now_seconds_since_epoch(),
167+
}
168+
)
169+
170+
if headers:
171+
existing_baggage = kwarg_headers.get(BAGGAGE_HEADER_NAME)
172+
sentry_baggage = headers.get(BAGGAGE_HEADER_NAME)
173+
174+
combined_baggage = sentry_baggage or existing_baggage
175+
if sentry_baggage and existing_baggage:
176+
combined_baggage = "{},{}".format(
177+
existing_baggage,
178+
sentry_baggage,
155179
)
156180

157-
if headers:
158-
# Note: kwargs can contain headers=None, so no setdefault!
159-
# Unsure which backend though.
160-
kwarg_headers = kwargs.get("headers") or {}
161-
162-
existing_baggage = kwarg_headers.get(BAGGAGE_HEADER_NAME)
163-
sentry_baggage = headers.get(BAGGAGE_HEADER_NAME)
164-
165-
combined_baggage = sentry_baggage or existing_baggage
166-
if sentry_baggage and existing_baggage:
167-
combined_baggage = "{},{}".format(
168-
existing_baggage,
169-
sentry_baggage,
170-
)
171-
172-
kwarg_headers.update(headers)
173-
if combined_baggage:
174-
kwarg_headers[BAGGAGE_HEADER_NAME] = combined_baggage
175-
176-
# https://github.com/celery/celery/issues/4875
177-
#
178-
# Need to setdefault the inner headers too since other
179-
# tracing tools (dd-trace-py) also employ this exact
180-
# workaround and we don't want to break them.
181-
kwarg_headers.setdefault("headers", {}).update(headers)
182-
if combined_baggage:
183-
kwarg_headers["headers"][
184-
BAGGAGE_HEADER_NAME
185-
] = combined_baggage
186-
187-
# Add the Sentry options potentially added in `sentry_apply_entry`
188-
# to the headers (done when auto-instrumenting Celery Beat tasks)
189-
for key, value in kwarg_headers.items():
190-
if key.startswith("sentry-"):
191-
kwarg_headers["headers"][key] = value
192-
193-
kwargs["headers"] = kwarg_headers
181+
kwarg_headers.update(headers)
182+
if combined_baggage:
183+
kwarg_headers[BAGGAGE_HEADER_NAME] = combined_baggage
184+
185+
# https://github.com/celery/celery/issues/4875
186+
#
187+
# Need to setdefault the inner headers too since other
188+
# tracing tools (dd-trace-py) also employ this exact
189+
# workaround and we don't want to break them.
190+
kwarg_headers.setdefault("headers", {}).update(headers)
191+
if combined_baggage:
192+
kwarg_headers["headers"][BAGGAGE_HEADER_NAME] = combined_baggage
193+
194+
# Add the Sentry options potentially added in `sentry_apply_entry`
195+
# to the headers (done when auto-instrumenting Celery Beat tasks)
196+
for key, value in kwarg_headers.items():
197+
if key.startswith("sentry-"):
198+
kwarg_headers["headers"][key] = value
199+
200+
kwargs["headers"] = kwarg_headers
194201

195-
return f(*args, **kwargs)
196-
else:
197202
return f(*args, **kwargs)
198203

199204
return apply_async # type: ignore

tests/integrations/celery/test_celery.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
pytest.importorskip("celery")
66

7-
from sentry_sdk import Hub, configure_scope, start_transaction
7+
from sentry_sdk import Hub, configure_scope, start_transaction, get_current_span
88
from sentry_sdk.integrations.celery import CeleryIntegration, _get_headers
99

1010
from sentry_sdk._compat import text_type
@@ -526,3 +526,34 @@ def dummy_task(self, x, y):
526526
"custom=value",
527527
]
528528
)
529+
530+
531+
def test_sentry_propagate_traces_override(init_celery):
532+
"""
533+
Test if the `sentry-propagate-traces` header given to `apply_async`
534+
overrides the `propagate_traces` parameter in the integration constructor.
535+
"""
536+
celery = init_celery(
537+
propagate_traces=True, traces_sample_rate=1.0, release="abcdef"
538+
)
539+
540+
@celery.task(name="dummy_task", bind=True)
541+
def dummy_task(self, message):
542+
trace_id = get_current_span().trace_id
543+
return trace_id
544+
545+
with start_transaction() as transaction:
546+
transaction_trace_id = transaction.trace_id
547+
548+
# should propagate trace
549+
task_transaction_id = dummy_task.apply_async(
550+
args=("some message",),
551+
).get()
552+
assert transaction_trace_id == task_transaction_id
553+
554+
# should NOT propagate trace (overrides `propagate_traces` parameter in integration constructor)
555+
task_transaction_id = dummy_task.apply_async(
556+
args=("another message",),
557+
headers={"sentry-propagate-traces": False},
558+
).get()
559+
assert transaction_trace_id != task_transaction_id

0 commit comments

Comments
 (0)