Skip to content

Commit 719c0a0

Browse files
authored
v3: better clock management for spans/logs in CRIU envs (#980)
* durable clock WIP * Fixed incorrect span timings around checkpoints by implementing a precise wall clock that resets after restores * Add changeset
1 parent f1c768a commit 719c0a0

File tree

23 files changed

+299
-105
lines changed

23 files changed

+299
-105
lines changed

.changeset/real-planets-stare.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"trigger.dev": patch
4+
"@trigger.dev/core": patch
5+
---
6+
7+
Fixed incorrect span timings around checkpoints by implementing a precise wall clock that resets after restores

.env.example

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,5 @@ COORDINATOR_SECRET=coordinator-secret # generate the actual secret with `openssl
6363
# These are needed for the object store (for handling large payloads/outputs)
6464
# OBJECT_STORE_BASE_URL="https://{bucket}.{accountId}.r2.cloudflarestorage.com"
6565
# OBJECT_STORE_ACCESS_KEY_ID=
66-
# OBJECT_STORE_SECRET_ACCESS_KEY=
66+
# OBJECT_STORE_SECRET_ACCESS_KEY=
67+
# RUNTIME_WAIT_THRESHOLD_IN_MS=10000

apps/webapp/app/env.server.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,26 @@ const EnvironmentSchema = z.object({
9797
CONTAINER_REGISTRY_USERNAME: z.string().optional(),
9898
CONTAINER_REGISTRY_PASSWORD: z.string().optional(),
9999
DEPLOY_REGISTRY_HOST: z.string().optional(),
100-
DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(),
101100
OBJECT_STORE_BASE_URL: z.string().optional(),
102101
OBJECT_STORE_ACCESS_KEY_ID: z.string().optional(),
103102
OBJECT_STORE_SECRET_ACCESS_KEY: z.string().optional(),
104103
EVENTS_BATCH_SIZE: z.coerce.number().int().default(100),
105104
EVENTS_BATCH_INTERVAL: z.coerce.number().int().default(1000),
106105
EVENTS_DEFAULT_LOG_RETENTION: z.coerce.number().int().default(7),
106+
107+
// Development OTEL environment variables
108+
DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(),
109+
// If this is set to 1, then the below variables are used to configure the batch processor for spans and logs
110+
DEV_OTEL_BATCH_PROCESSING_ENABLED: z.string().default("0"),
111+
DEV_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE: z.string().default("64"),
112+
DEV_OTEL_SPAN_SCHEDULED_DELAY_MILLIS: z.string().default("200"),
113+
DEV_OTEL_SPAN_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"),
114+
DEV_OTEL_SPAN_MAX_QUEUE_SIZE: z.string().default("512"),
115+
DEV_OTEL_LOG_MAX_EXPORT_BATCH_SIZE: z.string().default("64"),
116+
DEV_OTEL_LOG_SCHEDULED_DELAY_MILLIS: z.string().default("200"),
117+
DEV_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"),
118+
DEV_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"),
119+
RUNTIME_WAIT_THRESHOLD_IN_MS: z.coerce.number().int().default(30000),
107120
});
108121

109122
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ function parseSecretKey(key: string) {
3030
const SecretValue = z.object({ secret: z.string() });
3131

3232
export class EnvironmentVariablesRepository implements Repository {
33-
constructor(private prismaClient: PrismaClient = prisma) { }
33+
constructor(private prismaClient: PrismaClient = prisma) {}
3434

3535
async create(
3636
projectId: string,
@@ -419,8 +419,49 @@ export class EnvironmentVariablesRepository implements Repository {
419419
{
420420
key: "OTEL_EXPORTER_OTLP_ENDPOINT",
421421
value: env.DEV_OTEL_EXPORTER_OTLP_ENDPOINT ?? env.APP_ORIGIN,
422-
}
423-
];
422+
},
423+
].concat(
424+
env.DEV_OTEL_BATCH_PROCESSING_ENABLED === "1"
425+
? [
426+
{
427+
key: "OTEL_BATCH_PROCESSING_ENABLED",
428+
value: "1",
429+
},
430+
{
431+
key: "OTEL_SPAN_MAX_EXPORT_BATCH_SIZE",
432+
value: env.DEV_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE,
433+
},
434+
{
435+
key: "OTEL_SPAN_SCHEDULED_DELAY_MILLIS",
436+
value: env.DEV_OTEL_SPAN_SCHEDULED_DELAY_MILLIS,
437+
},
438+
{
439+
key: "OTEL_SPAN_EXPORT_TIMEOUT_MILLIS",
440+
value: env.DEV_OTEL_SPAN_EXPORT_TIMEOUT_MILLIS,
441+
},
442+
{
443+
key: "OTEL_SPAN_MAX_QUEUE_SIZE",
444+
value: env.DEV_OTEL_SPAN_MAX_QUEUE_SIZE,
445+
},
446+
{
447+
key: "OTEL_LOG_MAX_EXPORT_BATCH_SIZE",
448+
value: env.DEV_OTEL_LOG_MAX_EXPORT_BATCH_SIZE,
449+
},
450+
{
451+
key: "OTEL_LOG_SCHEDULED_DELAY_MILLIS",
452+
value: env.DEV_OTEL_LOG_SCHEDULED_DELAY_MILLIS,
453+
},
454+
{
455+
key: "OTEL_LOG_EXPORT_TIMEOUT_MILLIS",
456+
value: env.DEV_OTEL_LOG_EXPORT_TIMEOUT_MILLIS,
457+
},
458+
{
459+
key: "OTEL_LOG_MAX_QUEUE_SIZE",
460+
value: env.DEV_OTEL_LOG_MAX_QUEUE_SIZE,
461+
},
462+
]
463+
: []
464+
);
424465
}
425466

426467
return [
@@ -432,6 +473,10 @@ export class EnvironmentVariablesRepository implements Repository {
432473
key: "TRIGGER_API_URL",
433474
value: env.APP_ORIGIN,
434475
},
476+
{
477+
key: "TRIGGER_RUNTIME_WAIT_THRESHOLD_IN_MS",
478+
value: String(env.RUNTIME_WAIT_THRESHOLD_IN_MS),
479+
},
435480
];
436481
}
437482

apps/webapp/app/v3/marqs.server.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -591,14 +591,6 @@ export class MarQS {
591591
concurrencyLimitKey: string;
592592
currentConcurrencyKey: string;
593593
}) {
594-
logger.debug("Calling dequeueMessage", {
595-
messageQueue,
596-
parentQueue,
597-
visibilityQueue,
598-
concurrencyLimitKey,
599-
currentConcurrencyKey,
600-
});
601-
602594
const result = await this.redis.dequeueMessage(
603595
messageQueue,
604596
parentQueue,

packages/cli-v3/src/workers/dev/worker-facade.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ import {
22
Config,
33
ProjectConfig,
44
TaskExecutor,
5-
preciseDateOriginNow,
65
type TracingSDK,
76
type HandleErrorFunction,
7+
DurableClock,
8+
clock,
89
} from "@trigger.dev/core/v3";
910

1011
__WORKER_SETUP__;
@@ -42,10 +43,11 @@ import { TaskMetadataWithFunctions } from "../../types.js";
4243

4344
declare const sender: ZodMessageSender<typeof childToWorkerMessages>;
4445

45-
const preciseDateOrigin = preciseDateOriginNow();
46+
const durableClock = new DurableClock();
47+
clock.setGlobalClock(durableClock);
4648

4749
const tracer = new TriggerTracer({ tracer: otelTracer, logger: otelLogger });
48-
const consoleInterceptor = new ConsoleInterceptor(otelLogger, preciseDateOrigin);
50+
const consoleInterceptor = new ConsoleInterceptor(otelLogger);
4951

5052
const devRuntimeManager = new DevRuntimeManager();
5153

@@ -55,7 +57,6 @@ const otelTaskLogger = new OtelTaskLogger({
5557
logger: otelLogger,
5658
tracer: tracer,
5759
level: "info",
58-
preciseDateOrigin,
5960
});
6061

6162
logger.setGlobalTaskLogger(otelTaskLogger);

packages/cli-v3/src/workers/prod/worker-facade.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import {
66
TaskExecutor,
77
ZodIpcConnection,
88
type TracingSDK,
9-
preciseDateOriginNow,
109
HandleErrorFunction,
10+
DurableClock,
11+
clock,
1112
} from "@trigger.dev/core/v3";
1213
import "source-map-support/register.js";
1314

@@ -40,16 +41,16 @@ import * as packageJson from "../../../package.json";
4041

4142
import { TaskMetadataWithFunctions } from "../../types";
4243

43-
const preciseDateOrigin = preciseDateOriginNow();
44+
const durableClock = new DurableClock();
45+
clock.setGlobalClock(durableClock);
4446

4547
const tracer = new TriggerTracer({ tracer: otelTracer, logger: otelLogger });
46-
const consoleInterceptor = new ConsoleInterceptor(otelLogger, preciseDateOrigin);
48+
const consoleInterceptor = new ConsoleInterceptor(otelLogger);
4749

4850
const otelTaskLogger = new OtelTaskLogger({
4951
logger: otelLogger,
5052
tracer: tracer,
5153
level: "info",
52-
preciseDateOrigin,
5354
});
5455

5556
logger.setGlobalTaskLogger(otelTaskLogger);
@@ -200,7 +201,9 @@ const zodIpc = new ZodIpcConnection({
200201
},
201202
});
202203

203-
const prodRuntimeManager = new ProdRuntimeManager(zodIpc);
204+
const prodRuntimeManager = new ProdRuntimeManager(zodIpc, {
205+
waitThresholdInMs: parseInt(process.env.TRIGGER_RUNTIME_WAIT_THRESHOLD_IN_MS ?? "30000", 10),
206+
});
204207

205208
runtime.setGlobalRuntimeManager(prodRuntimeManager);
206209

packages/core/src/v3/clock-api.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// Split module-level variable definition into separate files to allow
2+
// tree-shaking on each api instance.
3+
import { ClockAPI } from "./clock";
4+
/** Entrypoint for clock API */
5+
export const clock = ClockAPI.getInstance();

packages/core/src/v3/clock/clock.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export type ClockTime = [number, number];
2+
3+
export interface Clock {
4+
preciseNow(): ClockTime;
5+
reset(): void;
6+
}

packages/core/src/v3/clock/index.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
const API_NAME = "clock";
2+
3+
import { getGlobal, registerGlobal } from "../utils/globals";
4+
import type { Clock, ClockTime } from "./clock";
5+
import { SimpleClock } from "./simpleClock";
6+
7+
const SIMPLE_CLOCK = new SimpleClock();
8+
9+
export class ClockAPI {
10+
private static _instance?: ClockAPI;
11+
12+
private constructor() {}
13+
14+
public static getInstance(): ClockAPI {
15+
if (!this._instance) {
16+
this._instance = new ClockAPI();
17+
}
18+
19+
return this._instance;
20+
}
21+
22+
public setGlobalClock(clock: Clock): boolean {
23+
return registerGlobal(API_NAME, clock);
24+
}
25+
26+
public preciseNow(): ClockTime {
27+
return this.#getClock().preciseNow();
28+
}
29+
30+
public reset(): void {
31+
this.#getClock().reset();
32+
}
33+
34+
#getClock(): Clock {
35+
return getGlobal(API_NAME) ?? SIMPLE_CLOCK;
36+
}
37+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { PreciseDate } from "@google-cloud/precise-date";
2+
import { Clock, ClockTime } from "./clock";
3+
4+
export type PreciseWallClockOptions = {
5+
origin?: ClockTime;
6+
now?: PreciseDate;
7+
};
8+
9+
export class PreciseWallClock implements Clock {
10+
private _origin: {
11+
clockTime: ClockTime;
12+
preciseDate: PreciseDate;
13+
};
14+
15+
get #originClockTime() {
16+
return this._origin.clockTime;
17+
}
18+
19+
get #originPreciseDate() {
20+
return this._origin.preciseDate;
21+
}
22+
23+
constructor(options: PreciseWallClockOptions = {}) {
24+
this._origin = {
25+
clockTime: options.origin ?? process.hrtime(),
26+
preciseDate: options.now ?? new PreciseDate(),
27+
};
28+
}
29+
30+
preciseNow(): [number, number] {
31+
const elapsedHrTime = process.hrtime(this.#originClockTime);
32+
const elapsedNanoseconds = BigInt(elapsedHrTime[0]) * BigInt(1e9) + BigInt(elapsedHrTime[1]);
33+
34+
const preciseDate = new PreciseDate(this.#originPreciseDate.getFullTime() + elapsedNanoseconds);
35+
const dateStruct = preciseDate.toStruct();
36+
37+
return [dateStruct.seconds, dateStruct.nanos];
38+
}
39+
40+
reset() {
41+
this._origin = {
42+
clockTime: process.hrtime(),
43+
preciseDate: new PreciseDate(),
44+
};
45+
}
46+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import { PreciseDate } from "@google-cloud/precise-date";
2+
import { Clock } from "./clock";
3+
4+
export class SimpleClock implements Clock {
5+
preciseNow(): [number, number] {
6+
const now = new PreciseDate();
7+
const nowStruct = now.toStruct();
8+
9+
return [nowStruct.seconds, nowStruct.nanos];
10+
}
11+
12+
reset() {
13+
// do nothing
14+
}
15+
}

packages/core/src/v3/consoleInterceptor.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ import util from "node:util";
44
import { iconStringForSeverity } from "./icons";
55
import { SemanticInternalAttributes } from "./semanticInternalAttributes";
66
import { flattenAttributes } from "./utils/flattenAttributes";
7-
import { type PreciseDateOrigin, calculatePreciseDateHrTime } from "./utils/preciseDate";
8-
7+
import { ClockTime } from "./clock/clock";
8+
import { clock } from "./clock-api";
99

1010
export class ConsoleInterceptor {
11-
constructor(private readonly logger: logsAPI.Logger, private readonly preciseDateOrigin: PreciseDateOrigin) { }
11+
constructor(private readonly logger: logsAPI.Logger) {}
1212

1313
// Intercept the console and send logs to the OpenTelemetry logger
1414
// during the execution of the callback
@@ -39,24 +39,28 @@ export class ConsoleInterceptor {
3939
}
4040

4141
log(...args: unknown[]): void {
42-
this.#handleLog(SeverityNumber.INFO, "Log", ...args);
42+
this.#handleLog(SeverityNumber.INFO, this.#getTimestampInHrTime(), "Log", ...args);
4343
}
4444

4545
info(...args: unknown[]): void {
46-
this.#handleLog(SeverityNumber.INFO, "Info", ...args);
46+
this.#handleLog(SeverityNumber.INFO, this.#getTimestampInHrTime(), "Info", ...args);
4747
}
4848

4949
warn(...args: unknown[]): void {
50-
this.#handleLog(SeverityNumber.WARN, "Warn", ...args);
50+
this.#handleLog(SeverityNumber.WARN, this.#getTimestampInHrTime(), "Warn", ...args);
5151
}
5252

5353
error(...args: unknown[]): void {
54-
this.#handleLog(SeverityNumber.ERROR, "Error", ...args);
54+
this.#handleLog(SeverityNumber.ERROR, this.#getTimestampInHrTime(), "Error", ...args);
5555
}
5656

57-
#handleLog(severityNumber: SeverityNumber, severityText: string, ...args: unknown[]): void {
57+
#handleLog(
58+
severityNumber: SeverityNumber,
59+
timestamp: ClockTime,
60+
severityText: string,
61+
...args: unknown[]
62+
): void {
5863
const body = util.format(...args);
59-
const timestamp = this.#getTimestampInHrTime();
6064

6165
const parsed = tryParseJSON(body);
6266

@@ -81,8 +85,8 @@ export class ConsoleInterceptor {
8185
});
8286
}
8387

84-
#getTimestampInHrTime(): [number, number] {
85-
return calculatePreciseDateHrTime(this.preciseDateOrigin);
88+
#getTimestampInHrTime(): ClockTime {
89+
return clock.preciseNow();
8690
}
8791

8892
#getAttributes(severityNumber: SeverityNumber): logsAPI.LogAttributes {

0 commit comments

Comments
 (0)