Skip to content

Commit edd3d70

Browse files
committed
Fixed incorrect span timings around checkpoints by implementing a precise wall clock that resets after restores
1 parent 685153b commit edd3d70

File tree

14 files changed

+109
-62
lines changed

14 files changed

+109
-62
lines changed

.env.example

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,5 @@ COORDINATOR_SECRET=coordinator-secret # generate the actual secret with `openssl
6363
# These are needed for the object store (for handling large payloads/outputs)
6464
# OBJECT_STORE_BASE_URL="https://{bucket}.{accountId}.r2.cloudflarestorage.com"
6565
# OBJECT_STORE_ACCESS_KEY_ID=
66-
# OBJECT_STORE_SECRET_ACCESS_KEY=
66+
# OBJECT_STORE_SECRET_ACCESS_KEY=
67+
# RUNTIME_WAIT_THRESHOLD_IN_MS=10000

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ const EnvironmentSchema = z.object({
116116
DEV_OTEL_LOG_SCHEDULED_DELAY_MILLIS: z.string().default("200"),
117117
DEV_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"),
118118
DEV_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"),
119+
RUNTIME_WAIT_THRESHOLD_IN_MS: z.coerce.number().int().default(30000),
119120
});
120121

121122
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,10 @@ export class EnvironmentVariablesRepository implements Repository {
473473
key: "TRIGGER_API_URL",
474474
value: env.APP_ORIGIN,
475475
},
476+
{
477+
key: "TRIGGER_RUNTIME_WAIT_THRESHOLD_IN_MS",
478+
value: String(env.RUNTIME_WAIT_THRESHOLD_IN_MS),
479+
},
476480
];
477481
}
478482

apps/webapp/app/v3/marqs.server.ts

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -591,14 +591,6 @@ export class MarQS {
591591
concurrencyLimitKey: string;
592592
currentConcurrencyKey: string;
593593
}) {
594-
logger.debug("Calling dequeueMessage", {
595-
messageQueue,
596-
parentQueue,
597-
visibilityQueue,
598-
concurrencyLimitKey,
599-
currentConcurrencyKey,
600-
});
601-
602594
const result = await this.redis.dequeueMessage(
603595
messageQueue,
604596
parentQueue,

packages/cli-v3/src/workers/prod/worker-facade.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,9 @@ const zodIpc = new ZodIpcConnection({
201201
},
202202
});
203203

204-
const prodRuntimeManager = new ProdRuntimeManager(zodIpc);
204+
const prodRuntimeManager = new ProdRuntimeManager(zodIpc, {
205+
waitThresholdInMs: parseInt(process.env.TRIGGER_RUNTIME_WAIT_THRESHOLD_IN_MS ?? "30000", 10),
206+
});
205207

206208
runtime.setGlobalRuntimeManager(prodRuntimeManager);
207209

packages/core/src/v3/clock/clock.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@ export type ClockTime = [number, number];
22

33
export interface Clock {
44
preciseNow(): ClockTime;
5+
reset(): void;
56
}

packages/core/src/v3/clock/durableClock.ts

Lines changed: 0 additions & 27 deletions
This file was deleted.

packages/core/src/v3/clock/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ export class ClockAPI {
2727
return this.#getClock().preciseNow();
2828
}
2929

30+
public reset(): void {
31+
this.#getClock().reset();
32+
}
33+
3034
#getClock(): Clock {
3135
return getGlobal(API_NAME) ?? SIMPLE_CLOCK;
3236
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { PreciseDate } from "@google-cloud/precise-date";
2+
import { Clock, ClockTime } from "./clock";
3+
4+
export type PreciseWallClockOptions = {
5+
origin?: ClockTime;
6+
now?: PreciseDate;
7+
};
8+
9+
export class PreciseWallClock implements Clock {
10+
private _origin: {
11+
clockTime: ClockTime;
12+
preciseDate: PreciseDate;
13+
};
14+
15+
get #originClockTime() {
16+
return this._origin.clockTime;
17+
}
18+
19+
get #originPreciseDate() {
20+
return this._origin.preciseDate;
21+
}
22+
23+
constructor(options: PreciseWallClockOptions = {}) {
24+
this._origin = {
25+
clockTime: options.origin ?? process.hrtime(),
26+
preciseDate: options.now ?? new PreciseDate(),
27+
};
28+
}
29+
30+
preciseNow(): [number, number] {
31+
const elapsedHrTime = process.hrtime(this.#originClockTime);
32+
const elapsedNanoseconds = BigInt(elapsedHrTime[0]) * BigInt(1e9) + BigInt(elapsedHrTime[1]);
33+
34+
const preciseDate = new PreciseDate(this.#originPreciseDate.getFullTime() + elapsedNanoseconds);
35+
const dateStruct = preciseDate.toStruct();
36+
37+
return [dateStruct.seconds, dateStruct.nanos];
38+
}
39+
40+
reset() {
41+
this._origin = {
42+
clockTime: process.hrtime(),
43+
preciseDate: new PreciseDate(),
44+
};
45+
}
46+
}

packages/core/src/v3/clock/simpleClock.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,8 @@ export class SimpleClock implements Clock {
88

99
return [nowStruct.seconds, nowStruct.nanos];
1010
}
11+
12+
reset() {
13+
// do nothing
14+
}
1115
}

packages/core/src/v3/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ export { taskContextManager, TaskContextSpanProcessor } from "./tasks/taskContex
3636
export type { RuntimeManager } from "./runtime/manager";
3737
export { DevRuntimeManager } from "./runtime/devRuntimeManager";
3838
export { ProdRuntimeManager } from "./runtime/prodRuntimeManager";
39-
export { DurableClock } from "./clock/durableClock";
39+
export { PreciseWallClock as DurableClock } from "./clock/preciseWallClock";
4040
export { TriggerTracer } from "./tracer";
4141

4242
export type { TaskLogger } from "./logger/taskLogger";

packages/core/src/v3/otel/tracingSDK.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,16 @@ export class TracingSDK {
136136

137137
traceProvider.addSpanProcessor(
138138
new TaskContextSpanProcessor(
139-
process.env.OTEL_BATCH_PROCESSING_ENABLED === "1"
139+
getEnvVar("OTEL_BATCH_PROCESSING_ENABLED") === "1"
140140
? new BatchSpanProcessor(spanExporter, {
141-
maxExportBatchSize: parseInt(process.env.OTEL_SPAN_MAX_EXPORT_BATCH_SIZE ?? "64"),
142-
scheduledDelayMillis: parseInt(process.env.OTEL_SPAN_SCHEDULED_DELAY_MILLIS ?? "200"),
143-
exportTimeoutMillis: parseInt(process.env.OTEL_SPAN_EXPORT_TIMEOUT_MILLIS ?? "30000"),
144-
maxQueueSize: parseInt(process.env.OTEL_SPAN_MAX_QUEUE_SIZE ?? "512"),
141+
maxExportBatchSize: parseInt(getEnvVar("OTEL_SPAN_MAX_EXPORT_BATCH_SIZE") ?? "64"),
142+
scheduledDelayMillis: parseInt(
143+
getEnvVar("OTEL_SPAN_SCHEDULED_DELAY_MILLIS") ?? "200"
144+
),
145+
exportTimeoutMillis: parseInt(
146+
getEnvVar("OTEL_SPAN_EXPORT_TIMEOUT_MILLIS") ?? "30000"
147+
),
148+
maxQueueSize: parseInt(getEnvVar("OTEL_SPAN_MAX_QUEUE_SIZE") ?? "512"),
145149
})
146150
: new SimpleSpanProcessor(spanExporter)
147151
)
@@ -169,12 +173,12 @@ export class TracingSDK {
169173

170174
loggerProvider.addLogRecordProcessor(
171175
new TaskContextLogProcessor(
172-
process.env.OTEL_BATCH_PROCESSING_ENABLED === "1"
176+
getEnvVar("OTEL_BATCH_PROCESSING_ENABLED") === "1"
173177
? new BatchLogRecordProcessor(logExporter, {
174-
maxExportBatchSize: parseInt(process.env.OTEL_LOG_MAX_EXPORT_BATCH_SIZE ?? "64"),
175-
scheduledDelayMillis: parseInt(process.env.OTEL_LOG_SCHEDULED_DELAY_MILLIS ?? "200"),
176-
exportTimeoutMillis: parseInt(process.env.OTEL_LOG_EXPORT_TIMEOUT_MILLIS ?? "30000"),
177-
maxQueueSize: parseInt(process.env.OTEL_LOG_MAX_QUEUE_SIZE ?? "512"),
178+
maxExportBatchSize: parseInt(getEnvVar("OTEL_LOG_MAX_EXPORT_BATCH_SIZE") ?? "64"),
179+
scheduledDelayMillis: parseInt(getEnvVar("OTEL_LOG_SCHEDULED_DELAY_MILLIS") ?? "200"),
180+
exportTimeoutMillis: parseInt(getEnvVar("OTEL_LOG_EXPORT_TIMEOUT_MILLIS") ?? "30000"),
181+
maxQueueSize: parseInt(getEnvVar("OTEL_LOG_MAX_QUEUE_SIZE") ?? "512"),
178182
})
179183
: new SimpleLogRecordProcessor(logExporter)
180184
)

packages/core/src/v3/runtime/prodRuntimeManager.ts

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { clock } from "../clock-api";
12
import {
23
BatchTaskRunExecutionResult,
34
ProdChildToWorkerMessages,
@@ -9,6 +10,11 @@ import {
910
} from "../schemas";
1011
import { ZodIpcConnection } from "../zodIpc";
1112
import { RuntimeManager } from "./manager";
13+
import { setTimeout } from "node:timers/promises";
14+
15+
export type ProdRuntimeManagerOptions = {
16+
waitThresholdInMs?: number;
17+
};
1218

1319
export class ProdRuntimeManager implements RuntimeManager {
1420
_taskWaits: Map<
@@ -21,15 +27,16 @@ export class ProdRuntimeManager implements RuntimeManager {
2127
{ resolve: (value: BatchTaskRunExecutionResult) => void; reject: (err?: any) => void }
2228
> = new Map();
2329

24-
_waitForRestore: { resolve: (value?: any) => void; reject: (err?: any) => void } | undefined;
30+
_waitForRestore: { resolve: (value: "restore") => void; reject: (err?: any) => void } | undefined;
2531

2632
_tasks: Map<string, TaskMetadataWithFilePath> = new Map();
2733

2834
constructor(
2935
private ipc: ZodIpcConnection<
3036
typeof ProdWorkerToChildMessages,
3137
typeof ProdChildToWorkerMessages
32-
>
38+
>,
39+
private options: ProdRuntimeManagerOptions = {}
3340
) {}
3441

3542
disable(): void {
@@ -47,20 +54,16 @@ export class ProdRuntimeManager implements RuntimeManager {
4754
}
4855

4956
async waitForDuration(ms: number): Promise<void> {
50-
let timeout: NodeJS.Timeout | undefined;
51-
5257
const now = Date.now();
5358

54-
const resolveAfterDuration = new Promise((resolve) => {
55-
timeout = setTimeout(resolve, ms);
56-
});
59+
const resolveAfterDuration = setTimeout(ms, "duration" as const);
5760

58-
if (ms < 10_000) {
61+
if (ms <= this.waitThresholdInMs) {
5962
await resolveAfterDuration;
6063
return;
6164
}
6265

63-
const waitForRestore = new Promise((resolve, reject) => {
66+
const waitForRestore = new Promise<"restore">((resolve, reject) => {
6467
this._waitForRestore = { resolve, reject };
6568
});
6669

@@ -81,16 +84,17 @@ export class ProdRuntimeManager implements RuntimeManager {
8184

8285
// The coordinator can then cancel any in-progress checkpoints
8386
this.ipc.send("CANCEL_CHECKPOINT", {});
84-
85-
clearTimeout(timeout);
8687
}
8788

8889
resumeAfterRestore(): void {
8990
if (!this._waitForRestore) {
9091
return;
9192
}
9293

93-
this._waitForRestore.resolve();
94+
// Resets the clock to the current time
95+
clock.reset();
96+
97+
this._waitForRestore.resolve("restore");
9498
this._waitForRestore = undefined;
9599
}
96100

@@ -155,4 +159,8 @@ export class ProdRuntimeManager implements RuntimeManager {
155159

156160
this._taskWaits.delete(execution.run.id);
157161
}
162+
163+
private get waitThresholdInMs(): number {
164+
return this.options.waitThresholdInMs ?? 30_000;
165+
}
158166
}

references/v3-catalog/src/trigger/simple.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,21 @@ export const parentTask = task({
107107

108108
export const childTask = task({
109109
id: "child-task",
110-
run: async (payload: { message: string; forceError: boolean }, { ctx }) => {
110+
run: async (
111+
payload: { message: string; forceError: boolean; delayInSeconds?: number },
112+
{ ctx }
113+
) => {
111114
logger.info("Child task payload", { payload });
112115
logger.info("Child task payload 2", { payload });
113116
logger.info("Child task payload 3", { payload });
114117
logger.info("Child task payload 4", { payload });
115118
logger.info("Child task payload 5", { payload });
116119

117-
await wait.for({ seconds: 10 });
120+
await wait.for({ seconds: payload.delayInSeconds ?? 5 });
121+
122+
logger.info("Child task payload 6", { payload });
123+
logger.info("Child task payload 7", { payload });
124+
logger.info("Child task payload 8", { payload });
118125

119126
const response = await fetch("https://jsonhero.io/api/create.json", {
120127
method: "POST",

0 commit comments

Comments
 (0)