Skip to content

Commit f8bd464

Browse files
committed
Associate child runs with the span ID of the span in the parent run that triggered the child run
1 parent f333537 commit f8bd464

File tree

7 files changed

+40
-7
lines changed

7 files changed

+40
-7
lines changed

.changeset/short-tomatoes-beam.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Add otel propagation headers "below" the API fetch span, to attribute the child runs with the proper parent span ID

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,8 @@ export class EventRepository {
835835
options: TraceEventOptions & { incomplete?: boolean },
836836
callback: (
837837
e: EventBuilder,
838-
traceContext: Record<string, string | undefined>
838+
traceContext: Record<string, string | undefined>,
839+
traceparent?: { traceId: string; spanId: string }
839840
) => Promise<TResult>
840841
): Promise<TResult> {
841842
const propagatedContext = extractContextFromCarrier(options.context ?? {});
@@ -892,7 +893,7 @@ export class EventRepository {
892893
},
893894
};
894895

895-
const result = await callback(eventBuilder, traceContext);
896+
const result = await callback(eventBuilder, traceContext, propagatedContext?.traceparent);
896897

897898
const duration = process.hrtime.bigint() - start;
898899

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ export class TriggerTaskService extends BaseService {
240240
incomplete: true,
241241
immediate: true,
242242
},
243-
async (event, traceContext) => {
243+
async (event, traceContext, traceparent) => {
244244
const run = await autoIncrementCounter.incrementInTransaction(
245245
`v3-run:${environment.id}:${taskId}`,
246246
async (num, tx) => {
@@ -307,6 +307,8 @@ export class TriggerTaskService extends BaseService {
307307
traceContext: traceContext,
308308
traceId: event.traceId,
309309
spanId: event.spanId,
310+
parentSpanId:
311+
options.parentAsLinkType === "replay" ? undefined : traceparent?.spanId,
310312
lockedToVersionId: lockedToBackgroundWorker?.id,
311313
concurrencyKey: body.options?.concurrencyKey,
312314
queue: queueName,

packages/core/src/v3/apiClient/core.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { RetryOptions } from "../schemas/index.js";
44
import { calculateNextRetryDelay } from "../utils/retries.js";
55
import { ApiConnectionError, ApiError, ApiSchemaValidationError } from "./errors.js";
66

7-
import { Attributes, Span } from "@opentelemetry/api";
7+
import { Attributes, Span, context, propagation } from "@opentelemetry/api";
88
import { SemanticInternalAttributes } from "../semanticInternalAttributes.js";
99
import { TriggerTracer } from "../tracer.js";
1010
import { accessoryAttributes } from "../utils/styleAttributes.js";
@@ -184,9 +184,11 @@ async function _doZodFetch<TResponseBodySchema extends z.ZodTypeAny>(
184184
requestInit?: PromiseOrValue<RequestInit>,
185185
options?: ZodFetchOptions
186186
): Promise<ZodFetchResult<z.output<TResponseBodySchema>>> {
187-
const $requestInit = await requestInit;
187+
let $requestInit = await requestInit;
188188

189189
return traceZodFetch({ url, requestInit: $requestInit, options }, async (span) => {
190+
$requestInit = injectPropagationHeadersIfInWorker($requestInit);
191+
190192
const result = await _doZodFetchWithRetries(schema, url, $requestInit, options);
191193

192194
if (options?.onResponseBody && span) {
@@ -577,3 +579,23 @@ export function isEmptyObj(obj: Object | null | undefined): boolean {
577579
export function hasOwn(obj: Object, key: string): boolean {
578580
return Object.prototype.hasOwnProperty.call(obj, key);
579581
}
582+
583+
// If the requestInit has a header x-trigger-worker = true, then we will do
584+
// propagation.inject(context.active(), headers);
585+
// and return the new requestInit.
586+
function injectPropagationHeadersIfInWorker(requestInit?: RequestInit): RequestInit | undefined {
587+
const headers = new Headers(requestInit?.headers);
588+
589+
if (headers.get("x-trigger-worker") !== "true") {
590+
return requestInit;
591+
}
592+
593+
const headersObject = Object.fromEntries(headers.entries());
594+
595+
propagation.inject(context.active(), headersObject);
596+
597+
return {
598+
...requestInit,
599+
headers: new Headers(headersObject),
600+
};
601+
}

packages/core/src/v3/apiClient/index.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { context, propagation } from "@opentelemetry/api";
21
import { z } from "zod";
32
import {
43
AddTagsRequestBody,
@@ -509,7 +508,6 @@ export class ApiClient {
509508
// Only inject the context if we are inside a task
510509
if (taskContext.isInsideTask) {
511510
headers["x-trigger-worker"] = "true";
512-
propagation.inject(context.active(), headers);
513511

514512
if (spanParentAsLink) {
515513
headers["x-trigger-span-parent-as-link"] = "1";
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable
2+
ALTER TABLE "TaskRun" ADD COLUMN "parentSpanId" TEXT;

packages/database/prisma/schema.prisma

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1748,6 +1748,9 @@ model TaskRun {
17481748
/// The depth of this task run in the task run hierarchy
17491749
depth Int @default(0)
17501750
1751+
/// The span ID of the "trigger" span in the parent task run
1752+
parentSpanId String?
1753+
17511754
@@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey])
17521755
// Finding child runs
17531756
@@index([parentTaskRunId])

0 commit comments

Comments
 (0)