Skip to content

v3: better clock management for spans/logs in CRIU envs #980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/real-planets-stare.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@trigger.dev/sdk": patch
"trigger.dev": patch
"@trigger.dev/core": patch
---

Fixed incorrect span timings around checkpoints by implementing a precise wall clock that resets after restores
3 changes: 2 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,5 @@ COORDINATOR_SECRET=coordinator-secret # generate the actual secret with `openssl
# These are needed for the object store (for handling large payloads/outputs)
# OBJECT_STORE_BASE_URL="https://{bucket}.{accountId}.r2.cloudflarestorage.com"
# OBJECT_STORE_ACCESS_KEY_ID=
# OBJECT_STORE_SECRET_ACCESS_KEY=
# OBJECT_STORE_SECRET_ACCESS_KEY=
# RUNTIME_WAIT_THRESHOLD_IN_MS=10000
15 changes: 14 additions & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,26 @@ const EnvironmentSchema = z.object({
CONTAINER_REGISTRY_USERNAME: z.string().optional(),
CONTAINER_REGISTRY_PASSWORD: z.string().optional(),
DEPLOY_REGISTRY_HOST: z.string().optional(),
DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(),
OBJECT_STORE_BASE_URL: z.string().optional(),
OBJECT_STORE_ACCESS_KEY_ID: z.string().optional(),
OBJECT_STORE_SECRET_ACCESS_KEY: z.string().optional(),
EVENTS_BATCH_SIZE: z.coerce.number().int().default(100),
EVENTS_BATCH_INTERVAL: z.coerce.number().int().default(1000),
EVENTS_DEFAULT_LOG_RETENTION: z.coerce.number().int().default(7),

// Development OTEL environment variables
DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(),
// If this is set to 1, then the below variables are used to configure the batch processor for spans and logs
DEV_OTEL_BATCH_PROCESSING_ENABLED: z.string().default("0"),
DEV_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE: z.string().default("64"),
DEV_OTEL_SPAN_SCHEDULED_DELAY_MILLIS: z.string().default("200"),
DEV_OTEL_SPAN_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"),
DEV_OTEL_SPAN_MAX_QUEUE_SIZE: z.string().default("512"),
DEV_OTEL_LOG_MAX_EXPORT_BATCH_SIZE: z.string().default("64"),
DEV_OTEL_LOG_SCHEDULED_DELAY_MILLIS: z.string().default("200"),
DEV_OTEL_LOG_EXPORT_TIMEOUT_MILLIS: z.string().default("30000"),
DEV_OTEL_LOG_MAX_QUEUE_SIZE: z.string().default("512"),
RUNTIME_WAIT_THRESHOLD_IN_MS: z.coerce.number().int().default(30000),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function parseSecretKey(key: string) {
const SecretValue = z.object({ secret: z.string() });

export class EnvironmentVariablesRepository implements Repository {
constructor(private prismaClient: PrismaClient = prisma) { }
constructor(private prismaClient: PrismaClient = prisma) {}

async create(
projectId: string,
Expand Down Expand Up @@ -419,8 +419,49 @@ export class EnvironmentVariablesRepository implements Repository {
{
key: "OTEL_EXPORTER_OTLP_ENDPOINT",
value: env.DEV_OTEL_EXPORTER_OTLP_ENDPOINT ?? env.APP_ORIGIN,
}
];
},
].concat(
env.DEV_OTEL_BATCH_PROCESSING_ENABLED === "1"
? [
{
key: "OTEL_BATCH_PROCESSING_ENABLED",
value: "1",
},
{
key: "OTEL_SPAN_MAX_EXPORT_BATCH_SIZE",
value: env.DEV_OTEL_SPAN_MAX_EXPORT_BATCH_SIZE,
},
{
key: "OTEL_SPAN_SCHEDULED_DELAY_MILLIS",
value: env.DEV_OTEL_SPAN_SCHEDULED_DELAY_MILLIS,
},
{
key: "OTEL_SPAN_EXPORT_TIMEOUT_MILLIS",
value: env.DEV_OTEL_SPAN_EXPORT_TIMEOUT_MILLIS,
},
{
key: "OTEL_SPAN_MAX_QUEUE_SIZE",
value: env.DEV_OTEL_SPAN_MAX_QUEUE_SIZE,
},
{
key: "OTEL_LOG_MAX_EXPORT_BATCH_SIZE",
value: env.DEV_OTEL_LOG_MAX_EXPORT_BATCH_SIZE,
},
{
key: "OTEL_LOG_SCHEDULED_DELAY_MILLIS",
value: env.DEV_OTEL_LOG_SCHEDULED_DELAY_MILLIS,
},
{
key: "OTEL_LOG_EXPORT_TIMEOUT_MILLIS",
value: env.DEV_OTEL_LOG_EXPORT_TIMEOUT_MILLIS,
},
{
key: "OTEL_LOG_MAX_QUEUE_SIZE",
value: env.DEV_OTEL_LOG_MAX_QUEUE_SIZE,
},
]
: []
);
}

return [
Expand All @@ -432,6 +473,10 @@ export class EnvironmentVariablesRepository implements Repository {
key: "TRIGGER_API_URL",
value: env.APP_ORIGIN,
},
{
key: "TRIGGER_RUNTIME_WAIT_THRESHOLD_IN_MS",
value: String(env.RUNTIME_WAIT_THRESHOLD_IN_MS),
},
];
}

Expand Down
8 changes: 0 additions & 8 deletions apps/webapp/app/v3/marqs.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -591,14 +591,6 @@ export class MarQS {
concurrencyLimitKey: string;
currentConcurrencyKey: string;
}) {
logger.debug("Calling dequeueMessage", {
messageQueue,
parentQueue,
visibilityQueue,
concurrencyLimitKey,
currentConcurrencyKey,
});

const result = await this.redis.dequeueMessage(
messageQueue,
parentQueue,
Expand Down
9 changes: 5 additions & 4 deletions packages/cli-v3/src/workers/dev/worker-facade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import {
Config,
ProjectConfig,
TaskExecutor,
preciseDateOriginNow,
type TracingSDK,
type HandleErrorFunction,
DurableClock,
clock,
} from "@trigger.dev/core/v3";

__WORKER_SETUP__;
Expand Down Expand Up @@ -42,10 +43,11 @@ import { TaskMetadataWithFunctions } from "../../types.js";

declare const sender: ZodMessageSender<typeof childToWorkerMessages>;

const preciseDateOrigin = preciseDateOriginNow();
const durableClock = new DurableClock();
clock.setGlobalClock(durableClock);

const tracer = new TriggerTracer({ tracer: otelTracer, logger: otelLogger });
const consoleInterceptor = new ConsoleInterceptor(otelLogger, preciseDateOrigin);
const consoleInterceptor = new ConsoleInterceptor(otelLogger);

const devRuntimeManager = new DevRuntimeManager();

Expand All @@ -55,7 +57,6 @@ const otelTaskLogger = new OtelTaskLogger({
logger: otelLogger,
tracer: tracer,
level: "info",
preciseDateOrigin,
});

logger.setGlobalTaskLogger(otelTaskLogger);
Expand Down
13 changes: 8 additions & 5 deletions packages/cli-v3/src/workers/prod/worker-facade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import {
TaskExecutor,
ZodIpcConnection,
type TracingSDK,
preciseDateOriginNow,
HandleErrorFunction,
DurableClock,
clock,
} from "@trigger.dev/core/v3";
import "source-map-support/register.js";

Expand Down Expand Up @@ -40,16 +41,16 @@ import * as packageJson from "../../../package.json";

import { TaskMetadataWithFunctions } from "../../types";

const preciseDateOrigin = preciseDateOriginNow();
const durableClock = new DurableClock();
clock.setGlobalClock(durableClock);

const tracer = new TriggerTracer({ tracer: otelTracer, logger: otelLogger });
const consoleInterceptor = new ConsoleInterceptor(otelLogger, preciseDateOrigin);
const consoleInterceptor = new ConsoleInterceptor(otelLogger);

const otelTaskLogger = new OtelTaskLogger({
logger: otelLogger,
tracer: tracer,
level: "info",
preciseDateOrigin,
});

logger.setGlobalTaskLogger(otelTaskLogger);
Expand Down Expand Up @@ -200,7 +201,9 @@ const zodIpc = new ZodIpcConnection({
},
});

const prodRuntimeManager = new ProdRuntimeManager(zodIpc);
const prodRuntimeManager = new ProdRuntimeManager(zodIpc, {
waitThresholdInMs: parseInt(process.env.TRIGGER_RUNTIME_WAIT_THRESHOLD_IN_MS ?? "30000", 10),
});

runtime.setGlobalRuntimeManager(prodRuntimeManager);

Expand Down
5 changes: 5 additions & 0 deletions packages/core/src/v3/clock-api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// Split module-level variable definition into separate files to allow
// tree-shaking on each api instance.
import { ClockAPI } from "./clock";
/** Entrypoint for clock API */
export const clock = ClockAPI.getInstance();
6 changes: 6 additions & 0 deletions packages/core/src/v3/clock/clock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export type ClockTime = [number, number];

export interface Clock {
preciseNow(): ClockTime;
reset(): void;
}
37 changes: 37 additions & 0 deletions packages/core/src/v3/clock/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const API_NAME = "clock";

import { getGlobal, registerGlobal } from "../utils/globals";
import type { Clock, ClockTime } from "./clock";
import { SimpleClock } from "./simpleClock";

const SIMPLE_CLOCK = new SimpleClock();

export class ClockAPI {
private static _instance?: ClockAPI;

private constructor() {}

public static getInstance(): ClockAPI {
if (!this._instance) {
this._instance = new ClockAPI();
}

return this._instance;
}

public setGlobalClock(clock: Clock): boolean {
return registerGlobal(API_NAME, clock);
}

public preciseNow(): ClockTime {
return this.#getClock().preciseNow();
}

public reset(): void {
this.#getClock().reset();
}

#getClock(): Clock {
return getGlobal(API_NAME) ?? SIMPLE_CLOCK;
}
}
46 changes: 46 additions & 0 deletions packages/core/src/v3/clock/preciseWallClock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { PreciseDate } from "@google-cloud/precise-date";
import { Clock, ClockTime } from "./clock";

export type PreciseWallClockOptions = {
origin?: ClockTime;
now?: PreciseDate;
};

export class PreciseWallClock implements Clock {
private _origin: {
clockTime: ClockTime;
preciseDate: PreciseDate;
};

get #originClockTime() {
return this._origin.clockTime;
}

get #originPreciseDate() {
return this._origin.preciseDate;
}

constructor(options: PreciseWallClockOptions = {}) {
this._origin = {
clockTime: options.origin ?? process.hrtime(),
preciseDate: options.now ?? new PreciseDate(),
};
}

preciseNow(): [number, number] {
const elapsedHrTime = process.hrtime(this.#originClockTime);
const elapsedNanoseconds = BigInt(elapsedHrTime[0]) * BigInt(1e9) + BigInt(elapsedHrTime[1]);

const preciseDate = new PreciseDate(this.#originPreciseDate.getFullTime() + elapsedNanoseconds);
const dateStruct = preciseDate.toStruct();

return [dateStruct.seconds, dateStruct.nanos];
}

reset() {
this._origin = {
clockTime: process.hrtime(),
preciseDate: new PreciseDate(),
};
}
}
15 changes: 15 additions & 0 deletions packages/core/src/v3/clock/simpleClock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { PreciseDate } from "@google-cloud/precise-date";
import { Clock } from "./clock";

export class SimpleClock implements Clock {
preciseNow(): [number, number] {
const now = new PreciseDate();
const nowStruct = now.toStruct();

return [nowStruct.seconds, nowStruct.nanos];
}

reset() {
// do nothing
}
}
26 changes: 15 additions & 11 deletions packages/core/src/v3/consoleInterceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import util from "node:util";
import { iconStringForSeverity } from "./icons";
import { SemanticInternalAttributes } from "./semanticInternalAttributes";
import { flattenAttributes } from "./utils/flattenAttributes";
import { type PreciseDateOrigin, calculatePreciseDateHrTime } from "./utils/preciseDate";

import { ClockTime } from "./clock/clock";
import { clock } from "./clock-api";

export class ConsoleInterceptor {
constructor(private readonly logger: logsAPI.Logger, private readonly preciseDateOrigin: PreciseDateOrigin) { }
constructor(private readonly logger: logsAPI.Logger) {}

// Intercept the console and send logs to the OpenTelemetry logger
// during the execution of the callback
Expand Down Expand Up @@ -39,24 +39,28 @@ export class ConsoleInterceptor {
}

log(...args: unknown[]): void {
this.#handleLog(SeverityNumber.INFO, "Log", ...args);
this.#handleLog(SeverityNumber.INFO, this.#getTimestampInHrTime(), "Log", ...args);
}

info(...args: unknown[]): void {
this.#handleLog(SeverityNumber.INFO, "Info", ...args);
this.#handleLog(SeverityNumber.INFO, this.#getTimestampInHrTime(), "Info", ...args);
}

warn(...args: unknown[]): void {
this.#handleLog(SeverityNumber.WARN, "Warn", ...args);
this.#handleLog(SeverityNumber.WARN, this.#getTimestampInHrTime(), "Warn", ...args);
}

error(...args: unknown[]): void {
this.#handleLog(SeverityNumber.ERROR, "Error", ...args);
this.#handleLog(SeverityNumber.ERROR, this.#getTimestampInHrTime(), "Error", ...args);
}

#handleLog(severityNumber: SeverityNumber, severityText: string, ...args: unknown[]): void {
#handleLog(
severityNumber: SeverityNumber,
timestamp: ClockTime,
severityText: string,
...args: unknown[]
): void {
const body = util.format(...args);
const timestamp = this.#getTimestampInHrTime();

const parsed = tryParseJSON(body);

Expand All @@ -81,8 +85,8 @@ export class ConsoleInterceptor {
});
}

#getTimestampInHrTime(): [number, number] {
return calculatePreciseDateHrTime(this.preciseDateOrigin);
#getTimestampInHrTime(): ClockTime {
return clock.preciseNow();
}

#getAttributes(severityNumber: SeverityNumber): logsAPI.LogAttributes {
Expand Down
Loading