Skip to content

Commit bf4660f

Browse files
committed
Add ability to fetch streams through metadata
1 parent 8ff1438 commit bf4660f

File tree

7 files changed

+50
-0
lines changed

7 files changed

+50
-0
lines changed

packages/core/src/v3/apiClient/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,14 @@ import {
5959
SubscribeToRunsQueryParams,
6060
UpdateEnvironmentVariableParams,
6161
} from "./types.js";
62+
import type { AsyncIterableStream } from "./stream.js";
6263

6364
export type {
6465
CreateEnvironmentVariableParams,
6566
ImportEnvironmentVariablesParams,
6667
SubscribeToRunsQueryParams,
6768
UpdateEnvironmentVariableParams,
69+
AsyncIterableStream,
6870
};
6971

7072
export type ClientTriggerOptions = {

packages/core/src/v3/runMetadata/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { DeserializedJson } from "../../schemas/json.js";
2+
import { AsyncIterableStream } from "../apiClient/stream.js";
23
import { getGlobal, registerGlobal } from "../utils/globals.js";
34
import { ApiRequestOptions } from "../zodfetch.js";
45
import { NoopRunMetadataManager } from "./noopManager.js";
@@ -84,6 +85,10 @@ export class RunMetadataAPI implements RunMetadataManager {
8485
return this.#getManager().stream(key, value, signal);
8586
}
8687

88+
public fetchStream<T>(key: string, signal?: AbortSignal): Promise<AsyncIterableStream<T>> {
89+
return this.#getManager().fetchStream(key, signal);
90+
}
91+
8792
flush(requestOptions?: ApiRequestOptions): Promise<void> {
8893
return this.#getManager().flush(requestOptions);
8994
}

packages/core/src/v3/runMetadata/manager.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import { MetadataStream } from "./metadataStream.js";
77
import { ApiClient } from "../apiClient/index.js";
88
import { FlushedRunMetadata, RunMetadataChangeOperation } from "../schemas/common.js";
99
import { applyMetadataOperations } from "./operations.js";
10+
import { SSEStreamSubscriptionFactory } from "../apiClient/runStream.js";
11+
import { AsyncIterableStream } from "../apiClient/stream.js";
1012

1113
const MAXIMUM_ACTIVE_STREAMS = 5;
1214
const MAXIMUM_TOTAL_STREAMS = 10;
@@ -197,6 +199,25 @@ export class StandardMetadataManager implements RunMetadataManager {
197199
return this.doStream(key, value, "self", this, signal);
198200
}
199201

202+
public async fetchStream<T>(key: string, signal?: AbortSignal): Promise<AsyncIterableStream<T>> {
203+
if (!this.runId) {
204+
throw new Error("Run ID is required to fetch metadata streams.");
205+
}
206+
207+
const baseUrl = this.getKey("$$streamsBaseUrl");
208+
209+
const $baseUrl = typeof baseUrl === "string" ? baseUrl : this.streamsBaseUrl;
210+
211+
const streamFactory = new SSEStreamSubscriptionFactory($baseUrl, {
212+
headers: this.apiClient.getHeaders(),
213+
signal,
214+
});
215+
216+
const subscription = streamFactory.createSubscription(this.store ?? {}, this.runId, key);
217+
218+
return (await subscription.subscribe()) as AsyncIterableStream<T>;
219+
}
220+
200221
private async doStream<T>(
201222
key: string,
202223
value: AsyncIterable<T> | ReadableStream<T>,

packages/core/src/v3/runMetadata/noopManager.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { DeserializedJson } from "../../schemas/json.js";
2+
import { AsyncIterableStream } from "../apiClient/stream.js";
23
import { ApiRequestOptions } from "../zodfetch.js";
34
import type { RunMetadataManager, RunMetadataUpdater } from "./types.js";
45

@@ -18,6 +19,9 @@ export class NoopRunMetadataManager implements RunMetadataManager {
1819
stream<T>(key: string, value: AsyncIterable<T>): Promise<AsyncIterable<T>> {
1920
throw new Error("Method not implemented.");
2021
}
22+
fetchStream<T>(key: string, signal?: AbortSignal): Promise<AsyncIterableStream<T>> {
23+
throw new Error("Method not implemented.");
24+
}
2125
flush(requestOptions?: ApiRequestOptions): Promise<void> {
2226
throw new Error("Method not implemented.");
2327
}

packages/core/src/v3/runMetadata/types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { DeserializedJson } from "../../schemas/json.js";
2+
import { AsyncIterableStream } from "../apiClient/stream.js";
23
import { ApiRequestOptions } from "../zodfetch.js";
34

45
export interface RunMetadataUpdater {
@@ -23,6 +24,7 @@ export interface RunMetadataManager extends RunMetadataUpdater {
2324
getKey(key: string): DeserializedJson | undefined;
2425
flush(requestOptions?: ApiRequestOptions): Promise<void>;
2526
refresh(requestOptions?: ApiRequestOptions): Promise<void>;
27+
fetchStream<T>(key: string, signal?: AbortSignal): Promise<AsyncIterableStream<T>>;
2628

2729
get parent(): RunMetadataUpdater;
2830
get root(): RunMetadataUpdater;

packages/trigger-sdk/src/v3/metadata.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
mergeRequestOptions,
55
runMetadata,
66
type RunMetadataUpdater,
7+
type AsyncIterableStream,
78
} from "@trigger.dev/core/v3";
89
import { tracer } from "./tracer.js";
910

@@ -36,6 +37,7 @@ export const metadata = {
3637
save: saveMetadata,
3738
replace: replaceMetadata,
3839
stream: stream,
40+
fetchStream: fetchStream,
3941
parent: parentMetadataUpdater,
4042
root: rootMetadataUpdater,
4143
refresh: refreshMetadata,
@@ -233,3 +235,7 @@ async function stream<T>(
233235
): Promise<AsyncIterable<T>> {
234236
return runMetadata.stream(key, value, signal);
235237
}
238+
239+
async function fetchStream<T>(key: string, signal?: AbortSignal): Promise<AsyncIterableStream<T>> {
240+
return runMetadata.fetchStream<T>(key, signal);
241+
}

references/nextjs-realtime/src/trigger/csv.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ export const handleCSVUpload = schemaTask({
6262
const successfulRows = results.runs.filter((r) => r.ok);
6363
const failedRows = results.runs.filter((r) => !r.ok);
6464

65+
const firstSuccessfulRow = successfulRows[0];
66+
67+
if (firstSuccessfulRow) {
68+
const stream = await metadata.fetchStream<string>(firstSuccessfulRow.id);
69+
70+
for await (const value of stream) {
71+
logger.info(`Stream value from ${firstSuccessfulRow.id}`, { value });
72+
}
73+
}
74+
6575
return {
6676
file,
6777
rows,

0 commit comments

Comments
 (0)