Skip to content

Commit 001c71c

Browse files
committed
build uploadthing/fal demo and change how run metadata is synced to the server
1 parent 3543bc6 commit 001c71c

File tree

25 files changed

+850
-147
lines changed

25 files changed

+850
-147
lines changed

packages/cli-v3/src/entryPoints/deploy-run-worker.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
WorkerManifest,
1515
ExecutorToWorkerMessageCatalog,
1616
timeout,
17+
runMetadata,
1718
} from "@trigger.dev/core/v3";
1819
import { TriggerTracer } from "@trigger.dev/core/v3/tracer";
1920
import { ProdRuntimeManager } from "@trigger.dev/core/v3/prod";
@@ -22,6 +23,7 @@ import {
2223
DevUsageManager,
2324
DurableClock,
2425
getEnvVar,
26+
getNumberEnvVar,
2527
logLevels,
2628
OtelTaskLogger,
2729
ProdUsageManager,
@@ -303,6 +305,10 @@ const zodIpc = new ZodIpcConnection({
303305
_execution = execution;
304306
_isRunning = true;
305307

308+
runMetadata.startPeriodicFlush(
309+
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
310+
);
311+
306312
const measurement = usage.start();
307313

308314
// This lives outside of the executor because this will eventually be moved to the controller level
@@ -397,7 +403,11 @@ const zodIpc = new ZodIpcConnection({
397403
async function flushAll(timeoutInMs: number = 10_000) {
398404
const now = performance.now();
399405

400-
await Promise.all([flushUsage(timeoutInMs), flushTracingSDK(timeoutInMs)]);
406+
await Promise.all([
407+
flushUsage(timeoutInMs),
408+
flushTracingSDK(timeoutInMs),
409+
flushMetadata(timeoutInMs),
410+
]);
401411

402412
const duration = performance.now() - now;
403413

@@ -424,6 +434,16 @@ async function flushTracingSDK(timeoutInMs: number = 10_000) {
424434
console.log(`Flushed tracingSDK in ${duration}ms`);
425435
}
426436

437+
async function flushMetadata(timeoutInMs: number = 10_000) {
438+
const now = performance.now();
439+
440+
await Promise.race([runMetadata.flush(), setTimeout(timeoutInMs)]);
441+
442+
const duration = performance.now() - now;
443+
444+
console.log(`Flushed runMetadata in ${duration}ms`);
445+
}
446+
427447
const prodRuntimeManager = new ProdRuntimeManager(zodIpc, {
428448
waitThresholdInMs: parseInt(env.TRIGGER_RUNTIME_WAIT_THRESHOLD_IN_MS ?? "30000", 10),
429449
});

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
WorkerManifest,
1515
ExecutorToWorkerMessageCatalog,
1616
timeout,
17+
runMetadata,
1718
} from "@trigger.dev/core/v3";
1819
import { TriggerTracer } from "@trigger.dev/core/v3/tracer";
1920
import { DevRuntimeManager } from "@trigger.dev/core/v3/dev";
@@ -30,6 +31,7 @@ import {
3031
TracingDiagnosticLogLevel,
3132
TracingSDK,
3233
usage,
34+
getNumberEnvVar,
3335
} from "@trigger.dev/core/v3/workers";
3436
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
3537
import { readFile } from "node:fs/promises";
@@ -273,6 +275,9 @@ const zodIpc = new ZodIpcConnection({
273275
_execution = execution;
274276
_isRunning = true;
275277

278+
runMetadata.startPeriodicFlush(
279+
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
280+
);
276281
const measurement = usage.start();
277282

278283
// This lives outside of the executor because this will eventually be moved to the controller level
@@ -345,7 +350,7 @@ const zodIpc = new ZodIpcConnection({
345350
}
346351
},
347352
FLUSH: async ({ timeoutInMs }, sender) => {
348-
await _tracingSDK?.flush();
353+
await Promise.allSettled([_tracingSDK?.flush(), runMetadata.flush()]);
349354
},
350355
},
351356
});

packages/core/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@
181181
"dependencies": {
182182
"@electric-sql/client": "0.6.1",
183183
"@google-cloud/precise-date": "^4.0.0",
184+
"@jsonhero/path": "^1.0.21",
184185
"@opentelemetry/api": "1.9.0",
185186
"@opentelemetry/api-logs": "0.52.1",
186187
"@opentelemetry/exporter-logs-otlp-http": "0.52.1",
@@ -192,10 +193,11 @@
192193
"@opentelemetry/sdk-trace-base": "1.25.1",
193194
"@opentelemetry/sdk-trace-node": "1.25.1",
194195
"@opentelemetry/semantic-conventions": "1.25.1",
196+
"dequal": "^2.0.3",
195197
"execa": "^8.0.1",
196198
"humanize-duration": "^3.27.3",
197-
"nanoid": "^3.3.4",
198199
"jose": "^5.4.0",
200+
"nanoid": "^3.3.4",
199201
"socket.io-client": "4.7.5",
200202
"superjson": "^2.2.1",
201203
"zod": "3.22.3",

packages/core/src/v3/runMetadata/index.ts

Lines changed: 82 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
1+
import { dequal } from "dequal/lite";
12
import { DeserializedJson } from "../../schemas/json.js";
23
import { apiClientManager } from "../apiClientManager-api.js";
34
import { taskContext } from "../task-context-api.js";
45
import { getGlobal, registerGlobal } from "../utils/globals.js";
56
import { ApiRequestOptions } from "../zodfetch.js";
7+
import { JSONHeroPath } from "@jsonhero/path";
68

79
const API_NAME = "run-metadata";
810

911
export class RunMetadataAPI {
1012
private static _instance?: RunMetadataAPI;
13+
private flushTimeoutId: NodeJS.Timeout | null = null;
14+
private hasChanges: boolean = false;
1115

1216
private constructor() {}
1317

@@ -39,52 +43,53 @@ export class RunMetadataAPI {
3943
return this.store?.[key];
4044
}
4145

42-
public async setKey(
43-
key: string,
44-
value: DeserializedJson,
45-
requestOptions?: ApiRequestOptions
46-
): Promise<void> {
46+
public setKey(key: string, value: DeserializedJson) {
4747
const runId = taskContext.ctx?.run.id;
4848

4949
if (!runId) {
5050
return;
5151
}
5252

53-
const apiClient = apiClientManager.clientOrThrow();
53+
let nextStore: Record<string, DeserializedJson> | undefined = this.store
54+
? structuredClone(this.store)
55+
: undefined;
56+
57+
if (key.startsWith("$.")) {
58+
const path = new JSONHeroPath(key);
59+
path.set(nextStore, value);
60+
} else {
61+
nextStore = {
62+
...(nextStore ?? {}),
63+
[key]: value,
64+
};
65+
}
5466

55-
const nextStore = {
56-
...(this.store ?? {}),
57-
[key]: value,
58-
};
67+
if (!nextStore) {
68+
return;
69+
}
5970

60-
const response = await apiClient.updateRunMetadata(
61-
runId,
62-
{ metadata: nextStore },
63-
requestOptions
64-
);
71+
if (!dequal(this.store, nextStore)) {
72+
this.hasChanges = true;
73+
}
6574

66-
this.store = response.metadata;
75+
this.store = nextStore;
6776
}
6877

69-
public async deleteKey(key: string, requestOptions?: ApiRequestOptions): Promise<void> {
78+
public deleteKey(key: string, requestOptions?: ApiRequestOptions) {
7079
const runId = taskContext.ctx?.run.id;
7180

7281
if (!runId) {
7382
return;
7483
}
7584

76-
const apiClient = apiClientManager.clientOrThrow();
77-
7885
const nextStore = { ...(this.store ?? {}) };
7986
delete nextStore[key];
8087

81-
const response = await apiClient.updateRunMetadata(
82-
runId,
83-
{ metadata: nextStore },
84-
requestOptions
85-
);
88+
if (!dequal(this.store, nextStore)) {
89+
this.hasChanges = true;
90+
}
8691

87-
this.store = response.metadata;
92+
this.store = nextStore;
8893
}
8994

9095
public async update(
@@ -103,4 +108,56 @@ export class RunMetadataAPI {
103108

104109
this.store = response.metadata;
105110
}
111+
112+
public async flush(requestOptions?: ApiRequestOptions): Promise<void> {
113+
const runId = taskContext.ctx?.run.id;
114+
115+
if (!runId) {
116+
return;
117+
}
118+
119+
if (!this.store) {
120+
return;
121+
}
122+
123+
if (!this.hasChanges) {
124+
return;
125+
}
126+
127+
const apiClient = apiClientManager.clientOrThrow();
128+
129+
try {
130+
this.hasChanges = false;
131+
await apiClient.updateRunMetadata(runId, { metadata: this.store }, requestOptions);
132+
} catch (error) {
133+
this.hasChanges = true;
134+
throw error;
135+
}
136+
}
137+
138+
public startPeriodicFlush(intervalMs: number = 1000) {
139+
const periodicFlush = async (intervalMs: number) => {
140+
try {
141+
await this.flush();
142+
} catch (error) {
143+
console.error("Failed to flush metadata", error);
144+
throw error;
145+
} finally {
146+
scheduleNext();
147+
}
148+
};
149+
150+
const scheduleNext = () => {
151+
this.flushTimeoutId = setTimeout(() => periodicFlush(intervalMs), intervalMs);
152+
};
153+
154+
scheduleNext();
155+
}
156+
157+
stopPeriodicFlush(): void {
158+
if (this.flushTimeoutId) {
159+
clearTimeout(this.flushTimeoutId);
160+
this.flushTimeoutId = null;
161+
}
162+
}
106163
}

packages/core/src/v3/utils/getEnv.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,19 @@ export function getEnvVar(name: string): string | undefined {
66

77
return;
88
}
9+
10+
export function getNumberEnvVar(name: string, defaultValue?: number): number | undefined {
11+
const value = getEnvVar(name);
12+
13+
if (value === undefined) {
14+
return defaultValue;
15+
}
16+
17+
const parsed = Number(value);
18+
19+
if (isNaN(parsed)) {
20+
return defaultValue;
21+
}
22+
23+
return parsed;
24+
}

packages/core/src/v3/workers/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
export { TaskExecutor, type TaskExecutorOptions } from "./taskExecutor.js";
22
export type { RuntimeManager } from "../runtime/manager.js";
33
export { PreciseWallClock as DurableClock } from "../clock/preciseWallClock.js";
4-
export { getEnvVar } from "../utils/getEnv.js";
4+
export { getEnvVar, getNumberEnvVar } from "../utils/getEnv.js";
55
export { OtelTaskLogger, logLevels } from "../logger/taskLogger.js";
66
export { ConsoleInterceptor } from "../consoleInterceptor.js";
77
export { TracingSDK, type TracingDiagnosticLogLevel, recordSpanException } from "../otel/index.js";

0 commit comments

Comments
 (0)