Skip to content

Commit 4e21c28

Browse files
committed
Adding streaming support
1 parent 4d8896e commit 4e21c28

File tree

15 files changed

+273
-67
lines changed

15 files changed

+273
-67
lines changed

packages/core/src/v3/apiClient/index.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,13 @@ import {
6161
SubscribeToRunsQueryParams,
6262
UpdateEnvironmentVariableParams,
6363
} from "./types.js";
64-
import type { AsyncIterableStream } from "./stream.js";
64+
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
6565

6666
export type {
6767
CreateEnvironmentVariableParams,
6868
ImportEnvironmentVariablesParams,
6969
SubscribeToRunsQueryParams,
7070
UpdateEnvironmentVariableParams,
71-
AsyncIterableStream,
7271
};
7372

7473
export type ClientTriggerOptions = {

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@ import {
1616
} from "../utils/ioSerialization.js";
1717
import { ApiError } from "./errors.js";
1818
import { ApiClient } from "./index.js";
19+
import { LineTransformStream, zodShapeStream } from "./stream.js";
1920
import {
2021
AsyncIterableStream,
2122
createAsyncIterableReadable,
22-
LineTransformStream,
23-
zodShapeStream,
24-
} from "./stream.js";
23+
} from "../streams/asyncIterableStream.js";
2524

2625
export type RunShape<TRunTypes extends AnyRunTypes> = TRunTypes extends AnyRunTypes
2726
? {

packages/core/src/v3/apiClient/stream.ts

Lines changed: 1 addition & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
type Row,
1010
type ShapeStreamInterface,
1111
} from "@electric-sql/client";
12+
import { AsyncIterableStream, createAsyncIterableStream } from "../streams/asyncIterableStream.js";
1213

1314
export type ZodShapeStreamOptions = {
1415
headers?: Record<string, string>;
@@ -82,57 +83,6 @@ export function zodShapeStream<TShapeSchema extends z.ZodTypeAny>(
8283
};
8384
}
8485

85-
export type AsyncIterableStream<T> = AsyncIterable<T> & ReadableStream<T>;
86-
87-
export function createAsyncIterableStream<S, T>(
88-
source: ReadableStream<S>,
89-
transformer: Transformer<S, T>
90-
): AsyncIterableStream<T> {
91-
const transformedStream: any = source.pipeThrough(new TransformStream(transformer));
92-
93-
transformedStream[Symbol.asyncIterator] = () => {
94-
const reader = transformedStream.getReader();
95-
return {
96-
async next(): Promise<IteratorResult<string>> {
97-
const { done, value } = await reader.read();
98-
return done ? { done: true, value: undefined } : { done: false, value };
99-
},
100-
};
101-
};
102-
103-
return transformedStream;
104-
}
105-
106-
export function createAsyncIterableReadable<S, T>(
107-
source: ReadableStream<S>,
108-
transformer: Transformer<S, T>,
109-
signal: AbortSignal
110-
): AsyncIterableStream<T> {
111-
return new ReadableStream<T>({
112-
async start(controller) {
113-
const transformedStream = source.pipeThrough(new TransformStream(transformer));
114-
const reader = transformedStream.getReader();
115-
116-
signal.addEventListener("abort", () => {
117-
queueMicrotask(() => {
118-
reader.cancel();
119-
controller.close();
120-
});
121-
});
122-
123-
while (true) {
124-
const { done, value } = await reader.read();
125-
if (done) {
126-
controller.close();
127-
break;
128-
}
129-
130-
controller.enqueue(value);
131-
}
132-
},
133-
}) as AsyncIterableStream<T>;
134-
}
135-
13686
class ReadableShapeStream<T extends Row<unknown> = Row> {
13787
readonly #stream: ShapeStreamInterface<T>;
13888
readonly #currentState: Map<string, T> = new Map();

packages/core/src/v3/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export * from "./types/index.js";
2121
export { links } from "./links.js";
2222
export * from "./jwt.js";
2323
export * from "./idempotencyKeys.js";
24+
export * from "./streams/asyncIterableStream.js";
2425
export {
2526
formatDuration,
2627
formatDurationInDays,

packages/core/src/v3/logger/index.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ export class LoggerAPI implements TaskLogger {
5151
return this.#getTaskLogger().trace(name, fn, options);
5252
}
5353

54+
public startSpan(name: string, options?: SpanOptions): Span {
55+
return this.#getTaskLogger().startSpan(name, options);
56+
}
57+
5458
#getTaskLogger(): TaskLogger {
5559
return getGlobal(API_NAME) ?? NOOP_TASK_LOGGER;
5660
}

packages/core/src/v3/logger/taskLogger.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export interface TaskLogger {
2424
warn(message: string, properties?: Record<string, unknown>): void;
2525
error(message: string, properties?: Record<string, unknown>): void;
2626
trace<T>(name: string, fn: (span: Span) => Promise<T>, options?: SpanOptions): Promise<T>;
27+
startSpan(name: string, options?: SpanOptions): Span;
2728
}
2829

2930
export class OtelTaskLogger implements TaskLogger {
@@ -90,6 +91,10 @@ export class OtelTaskLogger implements TaskLogger {
9091
return this._config.tracer.startActiveSpan(name, fn, options);
9192
}
9293

94+
startSpan(name: string, options?: SpanOptions): Span {
95+
return this._config.tracer.startSpan(name, options);
96+
}
97+
9398
#getTimestampInHrTime(): ClockTime {
9499
return clock.preciseNow();
95100
}
@@ -104,6 +109,9 @@ export class NoopTaskLogger implements TaskLogger {
104109
trace<T>(name: string, fn: (span: Span) => Promise<T>): Promise<T> {
105110
return fn({} as Span);
106111
}
112+
startSpan(): Span {
113+
return {} as Span;
114+
}
107115
}
108116

109117
function safeJsonProcess(value?: Record<string, unknown>): Record<string, unknown> | undefined {

packages/core/src/v3/runMetadata/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { DeserializedJson } from "../../schemas/json.js";
2-
import { AsyncIterableStream } from "../apiClient/stream.js";
2+
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
33
import { getGlobal, registerGlobal } from "../utils/globals.js";
44
import { ApiRequestOptions } from "../zodfetch.js";
55
import { NoopRunMetadataManager } from "./noopManager.js";

packages/core/src/v3/runMetadata/manager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { dequal } from "dequal/lite";
22
import { DeserializedJson } from "../../schemas/json.js";
33
import { ApiClient } from "../apiClient/index.js";
4-
import { AsyncIterableStream } from "../apiClient/stream.js";
54
import { FlushedRunMetadata, RunMetadataChangeOperation } from "../schemas/common.js";
65
import { ApiRequestOptions } from "../zodfetch.js";
76
import { MetadataStream } from "./metadataStream.js";
87
import { applyMetadataOperations } from "./operations.js";
98
import { RunMetadataManager, RunMetadataUpdater } from "./types.js";
9+
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
1010

1111
const MAXIMUM_ACTIVE_STREAMS = 5;
1212
const MAXIMUM_TOTAL_STREAMS = 10;

packages/core/src/v3/runMetadata/noopManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { DeserializedJson } from "../../schemas/json.js";
2-
import { AsyncIterableStream } from "../apiClient/stream.js";
2+
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
33
import { ApiRequestOptions } from "../zodfetch.js";
44
import type { RunMetadataManager, RunMetadataUpdater } from "./types.js";
55

packages/core/src/v3/runMetadata/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { DeserializedJson } from "../../schemas/json.js";
2-
import { AsyncIterableStream } from "../apiClient/stream.js";
2+
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
33
import { ApiRequestOptions } from "../zodfetch.js";
44

55
export interface RunMetadataUpdater {
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
export type AsyncIterableStream<T> = AsyncIterable<T> & ReadableStream<T>;
2+
3+
export function createAsyncIterableStream<S, T>(
4+
source: ReadableStream<S>,
5+
transformer: Transformer<S, T>
6+
): AsyncIterableStream<T> {
7+
const transformedStream: any = source.pipeThrough(new TransformStream(transformer));
8+
9+
transformedStream[Symbol.asyncIterator] = () => {
10+
const reader = transformedStream.getReader();
11+
return {
12+
async next(): Promise<IteratorResult<string>> {
13+
const { done, value } = await reader.read();
14+
return done ? { done: true, value: undefined } : { done: false, value };
15+
},
16+
};
17+
};
18+
19+
return transformedStream;
20+
}
21+
22+
export function createAsyncIterableReadable<S, T>(
23+
source: ReadableStream<S>,
24+
transformer: Transformer<S, T>,
25+
signal: AbortSignal
26+
): AsyncIterableStream<T> {
27+
return new ReadableStream<T>({
28+
async start(controller) {
29+
const transformedStream = source.pipeThrough(new TransformStream(transformer));
30+
const reader = transformedStream.getReader();
31+
32+
signal.addEventListener("abort", () => {
33+
queueMicrotask(() => {
34+
reader.cancel();
35+
controller.close();
36+
});
37+
});
38+
39+
while (true) {
40+
const { done, value } = await reader.read();
41+
if (done) {
42+
controller.close();
43+
break;
44+
}
45+
46+
controller.enqueue(value);
47+
}
48+
},
49+
}) as AsyncIterableStream<T>;
50+
}
51+
52+
export function createAsyncIterableStreamFromAsyncIterable<T>(
53+
asyncIterable: AsyncIterable<T>,
54+
transformer: Transformer<T, T>,
55+
signal?: AbortSignal
56+
): AsyncIterableStream<T> {
57+
const stream = new ReadableStream<T>({
58+
async start(controller) {
59+
try {
60+
if (signal) {
61+
signal.addEventListener("abort", () => {
62+
controller.close();
63+
});
64+
}
65+
66+
const iterator = asyncIterable[Symbol.asyncIterator]();
67+
68+
while (true) {
69+
if (signal?.aborted) {
70+
break;
71+
}
72+
73+
const { done, value } = await iterator.next();
74+
75+
if (done) {
76+
controller.close();
77+
break;
78+
}
79+
80+
controller.enqueue(value);
81+
}
82+
} catch (error) {
83+
controller.error(error);
84+
}
85+
},
86+
cancel() {
87+
// If the stream is a tinyexec process with a kill method, kill it
88+
if ("kill" in asyncIterable) {
89+
(asyncIterable as any).kill();
90+
}
91+
},
92+
});
93+
94+
const transformedStream = stream.pipeThrough(new TransformStream(transformer));
95+
96+
return transformedStream as AsyncIterableStream<T>;
97+
}

packages/python/README.md

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,22 @@ export const myScript = task({
7676
return result.stdout;
7777
},
7878
});
79+
80+
export const myStreamingScript = task({
81+
id: "my-streaming-python-script",
82+
run: async () => {
83+
// You can also stream the output of the script
84+
const result = python.stream.runScript("my_script.py", ["hello", "world"]);
85+
86+
// result is an async iterable/readable stream
87+
for await (const chunk of streamingResult) {
88+
logger.debug("convert-url-to-markdown", {
89+
url: payload.url,
90+
chunk,
91+
});
92+
}
93+
},
94+
});
7995
```
8096

8197
### Running Inline Python Code

0 commit comments

Comments
 (0)