Skip to content

Commit b558a1e

Browse files
committed
Add ability to stream into parent and root task runs
1 parent c91c6a8 commit b558a1e

File tree

6 files changed

+99
-20
lines changed

6 files changed

+99
-20
lines changed

apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { z } from "zod";
33
import { $replica } from "~/db.server";
44
import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server";
5-
import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server";
65
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";
76

87
const ParamsSchema = z.object({
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import { z } from "zod";
2+
import { $replica } from "~/db.server";
3+
import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server";
4+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
5+
6+
const ParamsSchema = z.object({
7+
runId: z.string(),
8+
target: z.enum(["self", "parent", "root"]),
9+
streamId: z.string(),
10+
});
11+
12+
const { action } = createActionApiRoute(
13+
{
14+
params: ParamsSchema,
15+
},
16+
async ({ request, params, authentication }) => {
17+
if (!request.body) {
18+
return new Response("No body provided", { status: 400 });
19+
}
20+
21+
const run = await $replica.taskRun.findFirst({
22+
where: {
23+
friendlyId: params.runId,
24+
runtimeEnvironmentId: authentication.environment.id,
25+
},
26+
select: {
27+
id: true,
28+
friendlyId: true,
29+
parentTaskRun: {
30+
select: {
31+
friendlyId: true,
32+
},
33+
},
34+
rootTaskRun: {
35+
select: {
36+
friendlyId: true,
37+
},
38+
},
39+
},
40+
});
41+
42+
if (!run) {
43+
return new Response("Run not found", { status: 404 });
44+
}
45+
46+
const targetId =
47+
params.target === "self"
48+
? run.friendlyId
49+
: params.target === "parent"
50+
? run.parentTaskRun?.friendlyId
51+
: run.rootTaskRun?.friendlyId;
52+
53+
if (!targetId) {
54+
return new Response("Target not found", { status: 404 });
55+
}
56+
57+
return relayRealtimeStreams.ingestData(request.body, targetId, params.streamId);
58+
}
59+
);
60+
61+
export { action };

packages/core/src/v3/runMetadata/manager.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ export class StandardMetadataManager implements RunMetadataManager {
6060
this.queuedParentOperations.add({ type: "update", value });
6161
return this.parent;
6262
},
63-
stream: (key, value, signal) => this.doStream(key, value, this.parent, signal),
63+
stream: (key, value, signal) => this.doStream(key, value, "parent", this.parent, signal),
6464
};
6565
}
6666

@@ -94,7 +94,7 @@ export class StandardMetadataManager implements RunMetadataManager {
9494
this.queuedParentOperations.add({ type: "update", value });
9595
return this.root;
9696
},
97-
stream: (key, value, signal) => this.doStream(key, value, this.root, signal),
97+
stream: (key, value, signal) => this.doStream(key, value, "root", this.root, signal),
9898
};
9999
}
100100

@@ -194,12 +194,13 @@ export class StandardMetadataManager implements RunMetadataManager {
194194
value: AsyncIterable<T> | ReadableStream<T>,
195195
signal?: AbortSignal
196196
): Promise<AsyncIterable<T>> {
197-
return this.doStream(key, value, this, signal);
197+
return this.doStream(key, value, "self", this, signal);
198198
}
199199

200200
private async doStream<T>(
201201
key: string,
202202
value: AsyncIterable<T> | ReadableStream<T>,
203+
target: "self" | "parent" | "root",
203204
updater: RunMetadataUpdater = this,
204205
signal?: AbortSignal
205206
): Promise<AsyncIterable<T>> {
@@ -236,6 +237,7 @@ export class StandardMetadataManager implements RunMetadataManager {
236237
headers: this.apiClient.getHeaders(),
237238
signal,
238239
version: this.streamsVersion,
240+
target,
239241
});
240242

241243
this.activeStreams.set(key, streamInstance);

packages/core/src/v3/runMetadata/metadataStream.ts

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export type MetadataOptions<T> = {
66
headers?: Record<string, string>;
77
signal?: AbortSignal;
88
version?: "v1" | "v2";
9+
target?: "self" | "parent" | "root";
910
};
1011

1112
export class MetadataStream<T> {
@@ -45,19 +46,14 @@ export class MetadataStream<T> {
4546
})
4647
);
4748

48-
return fetch(
49-
`${this.options.baseUrl}/realtime/${this.options.version ?? "v1"}/streams/${
50-
this.options.runId
51-
}/${this.options.key}`,
52-
{
53-
method: "POST",
54-
headers: this.options.headers ?? {},
55-
body: serverStream,
56-
// @ts-expect-error
57-
duplex: "half",
58-
signal: this.controller.signal,
59-
}
60-
);
49+
return fetch(this.buildUrl(), {
50+
method: "POST",
51+
headers: this.options.headers ?? {},
52+
body: serverStream,
53+
signal: this.controller.signal,
54+
// @ts-expect-error
55+
duplex: "half",
56+
});
6157
}
6258

6359
public async wait(): Promise<void> {
@@ -67,6 +63,19 @@ export class MetadataStream<T> {
6763
public [Symbol.asyncIterator]() {
6864
return streamToAsyncIterator(this.consumerStream);
6965
}
66+
67+
private buildUrl(): string {
68+
switch (this.options.version ?? "v1") {
69+
case "v1": {
70+
return `${this.options.baseUrl}/realtime/v1/streams/${this.options.runId}/${
71+
this.options.target ?? "self"
72+
}/${this.options.key}`;
73+
}
74+
case "v2": {
75+
return `${this.options.baseUrl}/realtime/v2/streams/${this.options.runId}/${this.options.key}`;
76+
}
77+
}
78+
}
7079
}
7180

7281
async function* streamToAsyncIterator<T>(stream: ReadableStream<T>): AsyncIterableIterator<T> {

references/nextjs-realtime/src/trigger/csv.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,14 @@ export const handleCSVRow = schemaTask({
8383

8484
metadata.parent.increment("processedRows", 1).append("rowRuns", ctx.run.id);
8585

86+
await metadata.parent.stream(
87+
ctx.run.id,
88+
(async function* () {
89+
yield "hello";
90+
yield "world";
91+
})()
92+
);
93+
8694
return row;
8795
},
8896
});

references/v3-catalog/src/trigger/runMetadata.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ export const runMetadataTask = task({
2121
export const runMetadataChildTask = task({
2222
id: "run-metadata-child-task",
2323
run: async (payload: any, { ctx }) => {
24-
metadata.parent.incrementKey("numberOfChildren", 1);
25-
metadata.root.incrementKey("numberOfChildren", 1);
24+
metadata.parent.increment("numberOfChildren", 1);
25+
metadata.root.increment("numberOfChildren", 1);
2626

2727
logger.info("metadata", { metadata: metadata.current() });
2828

@@ -63,7 +63,7 @@ export const runMetadataChildTask = task({
6363
export const runMetadataChildTask2 = task({
6464
id: "run-metadata-child-task-2",
6565
run: async (payload: any, { ctx }) => {
66-
metadata.root.incrementKey("numberOfChildren", 1);
66+
metadata.root.increment("numberOfChildren", 1);
6767
},
6868
});
6969

0 commit comments

Comments
 (0)