Skip to content

Make sure each task that is started by Celery Beat has its own trace. #2249

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 25 additions & 21 deletions sentry_sdk/integrations/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,30 +462,34 @@ def sentry_apply_entry(*args, **kwargs):
if match_regex_list(monitor_name, integration.exclude_beat_tasks):
return original_apply_entry(*args, **kwargs)

monitor_config = _get_monitor_config(celery_schedule, app)

is_supported_schedule = bool(monitor_config)
if is_supported_schedule:
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)
with hub.configure_scope() as scope:
# When tasks are started from Celery Beat, make sure each task has its own trace.
scope.set_new_propagation_context()

monitor_config = _get_monitor_config(celery_schedule, app)

is_supported_schedule = bool(monitor_config)
if is_supported_schedule:
headers = schedule_entry.options.pop("headers", {})
headers.update(
{
"sentry-monitor-slug": monitor_name,
"sentry-monitor-config": monitor_config,
}
)

check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})
check_in_id = capture_checkin(
monitor_slug=monitor_name,
monitor_config=monitor_config,
status=MonitorStatus.IN_PROGRESS,
)
headers.update({"sentry-monitor-check-in-id": check_in_id})

# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers
# Set the Sentry configuration in the options of the ScheduleEntry.
# Those will be picked up in `apply_async` and added to the headers.
schedule_entry.options["headers"] = headers

return original_apply_entry(*args, **kwargs)
return original_apply_entry(*args, **kwargs)

Scheduler.apply_entry = sentry_apply_entry

Expand Down
21 changes: 15 additions & 6 deletions sentry_sdk/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,23 @@ def _create_new_propagation_context(self):
"dynamic_sampling_context": None,
}

def set_new_propagation_context(self):
# type: () -> None
"""
Creates a new propagation context and sets it as `_propagation_context`. Overwriting existing one.
"""
self._propagation_context = self._create_new_propagation_context()
logger.debug(
"[Tracing] Create new propagation context: %s",
self._propagation_context,
)

def generate_propagation_context(self, incoming_data=None):
# type: (Optional[Dict[str, str]]) -> None
"""
Populates `_propagation_context`. Either from `incoming_data` or with a new propagation context.
Makes sure `_propagation_context` is set.
If there is `incoming_data` overwrite existing `_propagation_context`.
if there is no `incoming_data` create new `_propagation_context`, but do NOT overwrite if already existing.
"""
if incoming_data:
context = self._extract_propagation_context(incoming_data)
Expand All @@ -212,11 +225,7 @@ def generate_propagation_context(self, incoming_data=None):
)

if self._propagation_context is None:
self._propagation_context = self._create_new_propagation_context()
logger.debug(
"[Tracing] Create new propagation context: %s",
self._propagation_context,
)
self.set_new_propagation_context()

def get_dynamic_sampling_context(self):
# type: () -> Optional[Dict[str, str]]
Expand Down