Skip to content

Commit 685153b

Browse files
committed
durable clock WIP
1 parent a22b586 commit 685153b

File tree

19 files changed

+221
-81
lines changed

19 files changed

+221
-81
lines changed

apps/webapp/app/env.server.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,25 @@ const EnvironmentSchema = z.object({
9797
CONTAINER_REGISTRY_USERNAME: z.string().optional(),
9898
CONTAINER_REGISTRY_PASSWORD: z.string().optional(),
9999
DEPLOY_REGISTRY_HOST: z.string().optional(),
100-
DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(),
101100
OBJECT_STORE_BASE_URL: z.string().optional(),
102101
OBJECT_STORE_ACCESS_KEY_ID: z.string().optional(),
103102
OBJECT_STORE_SECRET_ACCESS_KEY: z.string().optional(),
104103
EVENTS_BATCH_SIZE: z.coerce.number().int().default(100),
105104
EVENTS_BATCH_INTERVAL: z.coerce.number().int().default(1000),
106105
EVENTS_DEFAULT_LOG_RETENTION: z.coerce.number().int().default(7),
106+
107+
// Development OTEL environment variables
108+
DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(),
109+
// If this is set to 1, then the below variables are used to configure the batch processor for spans and logs
110+
DEV_OTEL_BATCH_PROCESSING_ENABLED: z.string().default("0"),
111+
DEV_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE: z.string().default("64"),
112+
DEV_OTEL_SPAN_SCHEDULED_DELAY_MILLIS: z.string().default("200"),
113+
DEV_OTEL_SPAN_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"),
114+
DEV_OTEL_SPAN_MAX_QUEUE_SIZE: z.string().default("512"),
115+
DEV_OTEL_LOG_MAX_EXPORT_BATCH_SIZE: z.string().default("64"),
116+
DEV_OTEL_LOG_SCHEDULED_DELAY_MILLIS: z.string().default("200"),
117+
DEV_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"),
118+
DEV_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"),
107119
});
108120

109121
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts

Lines changed: 44 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ function parseSecretKey(key: string) {
3030
const SecretValue = z.object({ secret: z.string() });
3131

3232
export class EnvironmentVariablesRepository implements Repository {
33-
constructor(private prismaClient: PrismaClient = prisma) { }
33+
constructor(private prismaClient: PrismaClient = prisma) {}
3434

3535
async create(
3636
projectId: string,
@@ -419,8 +419,49 @@ export class EnvironmentVariablesRepository implements Repository {
419419
{
420420
key: "OTEL_EXPORTER_OTLP_ENDPOINT",
421421
value: env.DEV_OTEL_EXPORTER_OTLP_ENDPOINT ?? env.APP_ORIGIN,
422-
}
423-
];
422+
},
423+
].concat(
424+
env.DEV_OTEL_BATCH_PROCESSING_ENABLED === "1"
425+
? [
426+
{
427+
key: "OTEL_BATCH_PROCESSING_ENABLED",
428+
value: "1",
429+
},
430+
{
431+
key: "OTEL_SPAN_MAX_EXPORT_BATCH_SIZE",
432+
value: env.DEV_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE,
433+
},
434+
{
435+
key: "OTEL_SPAN_SCHEDULED_DELAY_MILLIS",
436+
value: env.DEV_OTEL_SPAN_SCHEDULED_DELAY_MILLIS,
437+
},
438+
{
439+
key: "OTEL_SPAN_EXPORT_TIMEOUT_MILLIS",
440+
value: env.DEV_OTEL_SPAN_EXPORT_TIMEOUT_MILLIS,
441+
},
442+
{
443+
key: "OTEL_SPAN_MAX_QUEUE_SIZE",
444+
value: env.DEV_OTEL_SPAN_MAX_QUEUE_SIZE,
445+
},
446+
{
447+
key: "OTEL_LOG_MAX_EXPORT_BATCH_SIZE",
448+
value: env.DEV_OTEL_LOG_MAX_EXPORT_BATCH_SIZE,
449+
},
450+
{
451+
key: "OTEL_LOG_SCHEDULED_DELAY_MILLIS",
452+
value: env.DEV_OTEL_LOG_SCHEDULED_DELAY_MILLIS,
453+
},
454+
{
455+
key: "OTEL_LOG_EXPORT_TIMEOUT_MILLIS",
456+
value: env.DEV_OTEL_LOG_EXPORT_TIMEOUT_MILLIS,
457+
},
458+
{
459+
key: "OTEL_LOG_MAX_QUEUE_SIZE",
460+
value: env.DEV_OTEL_LOG_MAX_QUEUE_SIZE,
461+
},
462+
]
463+
: []
464+
);
424465
}
425466

426467
return [

packages/cli-v3/src/workers/dev/worker-facade.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ import {
22
Config,
33
ProjectConfig,
44
TaskExecutor,
5-
preciseDateOriginNow,
65
type TracingSDK,
76
type HandleErrorFunction,
7+
DurableClock,
8+
clock,
89
} from "@trigger.dev/core/v3";
910

1011
__WORKER_SETUP__;
@@ -42,10 +43,11 @@ import { TaskMetadataWithFunctions } from "../../types.js";
4243

4344
declare const sender: ZodMessageSender<typeof childToWorkerMessages>;
4445

45-
const preciseDateOrigin = preciseDateOriginNow();
46+
const durableClock = new DurableClock();
47+
clock.setGlobalClock(durableClock);
4648

4749
const tracer = new TriggerTracer({ tracer: otelTracer, logger: otelLogger });
48-
const consoleInterceptor = new ConsoleInterceptor(otelLogger, preciseDateOrigin);
50+
const consoleInterceptor = new ConsoleInterceptor(otelLogger);
4951

5052
const devRuntimeManager = new DevRuntimeManager();
5153

@@ -55,7 +57,6 @@ const otelTaskLogger = new OtelTaskLogger({
5557
logger: otelLogger,
5658
tracer: tracer,
5759
level: "info",
58-
preciseDateOrigin,
5960
});
6061

6162
logger.setGlobalTaskLogger(otelTaskLogger);

packages/cli-v3/src/workers/prod/worker-facade.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,9 @@ import {
66
TaskExecutor,
77
ZodIpcConnection,
88
type TracingSDK,
9-
preciseDateOriginNow,
109
HandleErrorFunction,
10+
DurableClock,
11+
clock,
1112
} from "@trigger.dev/core/v3";
1213
import "source-map-support/register.js";
1314

@@ -40,16 +41,16 @@ import * as packageJson from "../../../package.json";
4041

4142
import { TaskMetadataWithFunctions } from "../../types";
4243

43-
const preciseDateOrigin = preciseDateOriginNow();
44+
const durableClock = new DurableClock();
45+
clock.setGlobalClock(durableClock);
4446

4547
const tracer = new TriggerTracer({ tracer: otelTracer, logger: otelLogger });
46-
const consoleInterceptor = new ConsoleInterceptor(otelLogger, preciseDateOrigin);
48+
const consoleInterceptor = new ConsoleInterceptor(otelLogger);
4749

4850
const otelTaskLogger = new OtelTaskLogger({
4951
logger: otelLogger,
5052
tracer: tracer,
5153
level: "info",
52-
preciseDateOrigin,
5354
});
5455

5556
logger.setGlobalTaskLogger(otelTaskLogger);

packages/core/src/v3/clock-api.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
// Split module-level variable definition into separate files to allow
2+
// tree-shaking on each api instance.
3+
import { ClockAPI } from "./clock";
4+
/** Entrypoint for clock API */
5+
export const clock = ClockAPI.getInstance();

packages/core/src/v3/clock/clock.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export type ClockTime = [number, number];
2+
3+
export interface Clock {
4+
preciseNow(): ClockTime;
5+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import { PreciseDate } from "@google-cloud/precise-date";
2+
import { Clock, ClockTime } from "./clock";
3+
4+
export type DurableClockOptions = {
5+
origin?: ClockTime;
6+
now?: PreciseDate;
7+
};
8+
9+
export class DurableClock implements Clock {
10+
private _originClockTime: ClockTime;
11+
private _originPreciseDate: PreciseDate;
12+
13+
constructor(options: DurableClockOptions = {}) {
14+
this._originClockTime = options.origin ?? process.hrtime();
15+
this._originPreciseDate = options.now ?? new PreciseDate();
16+
}
17+
18+
preciseNow(): [number, number] {
19+
const elapsedHrTime = process.hrtime(this._originClockTime);
20+
const elapsedNanoseconds = BigInt(elapsedHrTime[0]) * BigInt(1e9) + BigInt(elapsedHrTime[1]);
21+
22+
const preciseDate = new PreciseDate(this._originPreciseDate.getFullTime() + elapsedNanoseconds);
23+
const dateStruct = preciseDate.toStruct();
24+
25+
return [dateStruct.seconds, dateStruct.nanos];
26+
}
27+
}

packages/core/src/v3/clock/index.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
const API_NAME = "clock";
2+
3+
import { getGlobal, registerGlobal } from "../utils/globals";
4+
import type { Clock, ClockTime } from "./clock";
5+
import { SimpleClock } from "./simpleClock";
6+
7+
const SIMPLE_CLOCK = new SimpleClock();
8+
9+
export class ClockAPI {
10+
private static _instance?: ClockAPI;
11+
12+
private constructor() {}
13+
14+
public static getInstance(): ClockAPI {
15+
if (!this._instance) {
16+
this._instance = new ClockAPI();
17+
}
18+
19+
return this._instance;
20+
}
21+
22+
public setGlobalClock(clock: Clock): boolean {
23+
return registerGlobal(API_NAME, clock);
24+
}
25+
26+
public preciseNow(): ClockTime {
27+
return this.#getClock().preciseNow();
28+
}
29+
30+
#getClock(): Clock {
31+
return getGlobal(API_NAME) ?? SIMPLE_CLOCK;
32+
}
33+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
import { PreciseDate } from "@google-cloud/precise-date";
2+
import { Clock } from "./clock";
3+
4+
export class SimpleClock implements Clock {
5+
preciseNow(): [number, number] {
6+
const now = new PreciseDate();
7+
const nowStruct = now.toStruct();
8+
9+
return [nowStruct.seconds, nowStruct.nanos];
10+
}
11+
}

packages/core/src/v3/consoleInterceptor.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ import util from "node:util";
44
import { iconStringForSeverity } from "./icons";
55
import { SemanticInternalAttributes } from "./semanticInternalAttributes";
66
import { flattenAttributes } from "./utils/flattenAttributes";
7-
import { type PreciseDateOrigin, calculatePreciseDateHrTime } from "./utils/preciseDate";
8-
7+
import { ClockTime } from "./clock/clock";
8+
import { clock } from "./clock-api";
99

1010
export class ConsoleInterceptor {
11-
constructor(private readonly logger: logsAPI.Logger, private readonly preciseDateOrigin: PreciseDateOrigin) { }
11+
constructor(private readonly logger: logsAPI.Logger) {}
1212

1313
// Intercept the console and send logs to the OpenTelemetry logger
1414
// during the execution of the callback
@@ -39,24 +39,28 @@ export class ConsoleInterceptor {
3939
}
4040

4141
log(...args: unknown[]): void {
42-
this.#handleLog(SeverityNumber.INFO, "Log", ...args);
42+
this.#handleLog(SeverityNumber.INFO, this.#getTimestampInHrTime(), "Log", ...args);
4343
}
4444

4545
info(...args: unknown[]): void {
46-
this.#handleLog(SeverityNumber.INFO, "Info", ...args);
46+
this.#handleLog(SeverityNumber.INFO, this.#getTimestampInHrTime(), "Info", ...args);
4747
}
4848

4949
warn(...args: unknown[]): void {
50-
this.#handleLog(SeverityNumber.WARN, "Warn", ...args);
50+
this.#handleLog(SeverityNumber.WARN, this.#getTimestampInHrTime(), "Warn", ...args);
5151
}
5252

5353
error(...args: unknown[]): void {
54-
this.#handleLog(SeverityNumber.ERROR, "Error", ...args);
54+
this.#handleLog(SeverityNumber.ERROR, this.#getTimestampInHrTime(), "Error", ...args);
5555
}
5656

57-
#handleLog(severityNumber: SeverityNumber, severityText: string, ...args: unknown[]): void {
57+
#handleLog(
58+
severityNumber: SeverityNumber,
59+
timestamp: ClockTime,
60+
severityText: string,
61+
...args: unknown[]
62+
): void {
5863
const body = util.format(...args);
59-
const timestamp = this.#getTimestampInHrTime();
6064

6165
const parsed = tryParseJSON(body);
6266

@@ -81,8 +85,8 @@ export class ConsoleInterceptor {
8185
});
8286
}
8387

84-
#getTimestampInHrTime(): [number, number] {
85-
return calculatePreciseDateHrTime(this.preciseDateOrigin);
88+
#getTimestampInHrTime(): ClockTime {
89+
return clock.preciseNow();
8690
}
8791

8892
#getAttributes(severityNumber: SeverityNumber): logsAPI.LogAttributes {

packages/core/src/v3/index.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export * from "./zodIpc";
99
export * from "./errors";
1010
export * from "./runtime-api";
1111
export * from "./logger-api";
12+
export * from "./clock-api";
1213
export * from "./types";
1314
export * from "./limits";
1415
export { SemanticInternalAttributes } from "./semanticInternalAttributes";
@@ -35,6 +36,7 @@ export { taskContextManager, TaskContextSpanProcessor } from "./tasks/taskContex
3536
export type { RuntimeManager } from "./runtime/manager";
3637
export { DevRuntimeManager } from "./runtime/devRuntimeManager";
3738
export { ProdRuntimeManager } from "./runtime/prodRuntimeManager";
39+
export { DurableClock } from "./clock/durableClock";
3840
export { TriggerTracer } from "./tracer";
3941

4042
export type { TaskLogger } from "./logger/taskLogger";
@@ -57,11 +59,6 @@ export { omit } from "./utils/omit";
5759
export { TracingSDK, type TracingDiagnosticLogLevel, recordSpanException } from "./otel";
5860
export { TaskExecutor, type TaskExecutorOptions } from "./workers/taskExecutor";
5961
export { detectDependencyVersion } from "./utils/detectDependencyVersion";
60-
export {
61-
type PreciseDateOrigin,
62-
calculatePreciseDateHrTime,
63-
preciseDateOriginNow,
64-
} from "./utils/preciseDate";
6562
export {
6663
parsePacket,
6764
stringifyIO,

0 commit comments

Comments
 (0)