Skip to content

Commit 9336397

Browse files
committed
Fix streaming splits in realtime streams v2
1 parent 30ea5eb commit 9336397

File tree

3 files changed

+79
-28
lines changed

3 files changed

+79
-28
lines changed

apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,14 @@ export class DatabaseRealtimeStreams implements StreamIngestor, StreamResponder
3737
): Promise<Response> {
3838
try {
3939
const textStream = stream.pipeThrough(new TextDecoderStream());
40+
4041
const reader = textStream.getReader();
4142
let sequence = 0;
4243

4344
while (true) {
4445
const { done, value } = await reader.read();
4546

46-
if (done) {
47+
if (done || !value) {
4748
break;
4849
}
4950

@@ -53,25 +54,13 @@ export class DatabaseRealtimeStreams implements StreamIngestor, StreamResponder
5354
value,
5455
});
5556

56-
const chunks = value
57-
.split("\n")
58-
.filter((chunk) => chunk) // Remove empty lines
59-
.map((line) => {
60-
return {
61-
sequence: sequence++,
62-
value: line,
63-
};
64-
});
65-
66-
await this.options.prisma.realtimeStreamChunk.createMany({
67-
data: chunks.map((chunk) => {
68-
return {
69-
runId,
70-
key: streamId,
71-
sequence: chunk.sequence,
72-
value: chunk.value,
73-
};
74-
}),
57+
await this.options.prisma.realtimeStreamChunk.create({
58+
data: {
59+
runId,
60+
key: streamId,
61+
sequence: sequence++,
62+
value,
63+
},
7564
});
7665
}
7766

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@ import {
1616
} from "../utils/ioSerialization.js";
1717
import { ApiError } from "./errors.js";
1818
import { ApiClient } from "./index.js";
19-
import { AsyncIterableStream, createAsyncIterableReadable, zodShapeStream } from "./stream.js";
19+
import {
20+
AsyncIterableStream,
21+
createAsyncIterableReadable,
22+
LineTransformStream,
23+
zodShapeStream,
24+
} from "./stream.js";
2025

2126
export type RunShape<TRunTypes extends AnyRunTypes> = TRunTypes extends AnyRunTypes
2227
? {
@@ -209,13 +214,28 @@ export class ElectricStreamSubscription implements StreamSubscription {
209214
) {}
210215

211216
async subscribe(): Promise<ReadableStream<unknown>> {
212-
return zodShapeStream(SubscribeRealtimeStreamChunkRawShape, this.url, this.options).pipeThrough(
213-
new TransformStream({
214-
transform(chunk, controller) {
215-
controller.enqueue(safeParseJSON(chunk.value));
216-
},
217-
})
218-
);
217+
return zodShapeStream(SubscribeRealtimeStreamChunkRawShape, this.url, this.options)
218+
.pipeThrough(
219+
new TransformStream({
220+
transform(chunk, controller) {
221+
console.log("ElectricStreamSubscription chunk.value", chunk.value);
222+
223+
controller.enqueue(chunk.value);
224+
},
225+
})
226+
)
227+
.pipeThrough(new LineTransformStream(this.url))
228+
.pipeThrough(
229+
new TransformStream({
230+
transform(chunk, controller) {
231+
for (const line of chunk) {
232+
console.log("ElectricStreamSubscription line", line);
233+
234+
controller.enqueue(safeParseJSON(line));
235+
}
236+
},
237+
})
238+
);
219239
}
220240
}
221241

packages/core/src/v3/apiClient/stream.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,3 +203,45 @@ class ReadableShapeStream<T extends Row<unknown> = Row> {
203203
}
204204
}
205205
}
206+
207+
export class LineTransformStream extends TransformStream<string, string[]> {
208+
private buffer = "";
209+
210+
constructor(streamId: string) {
211+
super({
212+
transform: (chunk, controller) => {
213+
// Append the chunk to the buffer
214+
this.buffer += chunk;
215+
216+
// Split on newlines
217+
const lines = this.buffer.split("\n");
218+
219+
// The last element might be incomplete, hold it back in buffer
220+
this.buffer = lines.pop() || "";
221+
222+
// Filter out empty or whitespace-only lines
223+
const fullLines = lines.filter((line) => line.trim().length > 0);
224+
225+
console.log("LineTransformStream", {
226+
chunk,
227+
lines,
228+
fullLines,
229+
buffer: this.buffer,
230+
streamId,
231+
});
232+
233+
// If we got any complete lines, emit them as an array
234+
if (fullLines.length > 0) {
235+
controller.enqueue(fullLines);
236+
}
237+
},
238+
flush: (controller) => {
239+
// On stream end, if there's leftover text, emit it as a single-element array
240+
const trimmed = this.buffer.trim();
241+
if (trimmed.length > 0) {
242+
controller.enqueue([trimmed]);
243+
}
244+
},
245+
});
246+
}
247+
}

0 commit comments

Comments
 (0)