Skip to content

Commit edd6342

Browse files
committed
Remove context propagation through graphile jobs
1 parent b1d84d2 commit edd6342

File tree

1 file changed

+8
-38
lines changed

1 file changed

+8
-38
lines changed

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 8 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { SpanKind, SpanStatusCode, trace } from "@opentelemetry/api";
12
import type {
23
CronItem,
34
CronItemOptions,
@@ -11,34 +12,26 @@ import type {
1112
WorkerUtils,
1213
} from "graphile-worker";
1314
import {
15+
Logger as GraphileLogger,
1416
run as graphileRun,
1517
makeWorkerUtils,
1618
parseCronItems,
17-
Logger as GraphileLogger,
1819
} from "graphile-worker";
19-
import { SpanKind, SpanStatusCode, context, propagation, trace } from "@opentelemetry/api";
2020

21+
import { flattenAttributes } from "@trigger.dev/core/v3";
2122
import omit from "lodash.omit";
2223
import { z } from "zod";
2324
import { $replica, PrismaClient, PrismaClientOrTransaction } from "~/db.server";
25+
import { env } from "~/env.server";
2426
import { PgListenService } from "~/services/db/pgListen.server";
2527
import { workerLogger as logger } from "~/services/logger.server";
26-
import { flattenAttributes } from "@trigger.dev/core/v3";
27-
import { env } from "~/env.server";
28-
import { getHttpContext } from "~/services/httpAsyncStorage.server";
2928

3029
const tracer = trace.getTracer("zodWorker", "3.0.0.dp.1");
3130

3231
export interface MessageCatalogSchema {
3332
[key: string]: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>;
3433
}
3534

36-
const ZodWorkerMessageSchema = z.object({
37-
version: z.literal("1"),
38-
payload: z.unknown(),
39-
context: z.record(z.string().optional()).optional(),
40-
});
41-
4235
const RawCronPayloadSchema = z.object({
4336
_cron: z.object({
4437
ts: z.coerce.date(),
@@ -416,18 +409,6 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
416409
) {
417410
const now = performance.now();
418411

419-
let $payload = payload;
420-
421-
if (!getHttpContext()) {
422-
const $context = {};
423-
propagation.inject(context.active(), $context);
424-
$payload = {
425-
version: "1",
426-
payload,
427-
context: $context,
428-
};
429-
}
430-
431412
const results = await tx.$queryRawUnsafe(
432413
`SELECT * FROM ${this.graphileWorkerSchema}.add_job(
433414
identifier => $1::text,
@@ -440,7 +421,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
440421
job_key_mode => $8::text
441422
)`,
442423
identifier,
443-
JSON.stringify($payload),
424+
JSON.stringify(payload),
444425
spec.runAt || null,
445426
spec.maxAttempts || null,
446427
spec.jobKey || null,
@@ -456,7 +437,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
456437
if (!rows.success) {
457438
logger.debug("results returned from add_job could not be parsed", {
458439
identifier,
459-
$payload,
440+
payload,
460441
spec,
461442
});
462443

@@ -481,9 +462,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
481462
const job = AddJobResultsSchema.safeParse(result);
482463

483464
if (!job.success) {
484-
logger.debug("results returned from remove_job could not be parsed", {
485-
error: job.error.flatten(),
486-
result,
465+
logger.debug("could not remove job, job_key did not exist", {
487466
jobKey,
488467
});
489468

@@ -602,11 +581,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
602581
throw new Error(`Unknown message type: ${String(typeName)}`);
603582
}
604583

605-
const messagePayload = ZodWorkerMessageSchema.safeParse(rawPayload);
606-
607-
const payload = messageSchema.parse(
608-
messagePayload.success ? messagePayload.data.payload : rawPayload
609-
);
584+
const payload = messageSchema.parse(rawPayload);
610585
const job = helpers.job;
611586

612587
logger.debug("Received worker task, calling handler", {
@@ -621,10 +596,6 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
621596
throw new Error(`No task for message type: ${String(typeName)}`);
622597
}
623598

624-
const activeContext = messagePayload.success
625-
? propagation.extract(context.active(), messagePayload.data.context ?? {})
626-
: undefined;
627-
628599
await tracer.startActiveSpan(
629600
`Run ${typeName as string}`,
630601
{
@@ -648,7 +619,6 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
648619
"worker.name": this.#name,
649620
},
650621
},
651-
activeContext ?? context.active(),
652622
async (span) => {
653623
try {
654624
await task.handler(payload, job, helpers);

0 commit comments

Comments
 (0)