Skip to content

Commit 31dc285

Browse files
committed
Add stream tests and improve streaming
1 parent 446386b commit 31dc285

File tree

8 files changed

+256
-110
lines changed

8 files changed

+256
-110
lines changed

.vscode/launch.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,15 @@
1313
"cwd": "${workspaceFolder}",
1414
"sourceMaps": true
1515
},
16+
{
17+
"type": "node-terminal",
18+
"request": "launch",
19+
"name": "Debug realtimeStreams.test.ts",
20+
"command": "pnpm run test -t RealtimeStreams",
21+
"envFile": "${workspaceFolder}/.env",
22+
"cwd": "${workspaceFolder}/apps/webapp",
23+
"sourceMaps": true
24+
},
1625
{
1726
"type": "chrome",
1827
"request": "launch",

apps/webapp/app/services/realtimeStreams.server.ts

Lines changed: 65 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -5,100 +5,93 @@ export type RealtimeStreamsOptions = {
55
redis: RedisOptions | undefined;
66
};
77

8+
const END_SENTINEL = "<<CLOSE_STREAM>>";
9+
810
export class RealtimeStreams {
911
constructor(private options: RealtimeStreamsOptions) {}
1012

1113
async streamResponse(runId: string, streamId: string, signal: AbortSignal): Promise<Response> {
1214
const redis = new Redis(this.options.redis ?? {});
1315
const streamKey = `stream:${runId}:${streamId}`;
16+
let isCleanedUp = false;
17+
18+
const stream = new ReadableStream({
19+
start: async (controller) => {
20+
let lastId = "0";
21+
let retryCount = 0;
22+
const maxRetries = 3;
1423

15-
const stream = new TransformStream({
16-
transform(chunk: string, controller) {
1724
try {
18-
const data = JSON.parse(chunk);
25+
while (!signal.aborted) {
26+
try {
27+
const messages = await redis.xread(
28+
"COUNT",
29+
100,
30+
"BLOCK",
31+
5000,
32+
"STREAMS",
33+
streamKey,
34+
lastId
35+
);
36+
37+
retryCount = 0;
38+
39+
if (messages && messages.length > 0) {
40+
const [_key, entries] = messages[0];
41+
42+
for (const [id, fields] of entries) {
43+
lastId = id;
44+
45+
if (fields && fields.length >= 2) {
46+
if (fields[1] === END_SENTINEL) {
47+
controller.close();
48+
return;
49+
}
50+
controller.enqueue(`data: ${fields[1]}\n\n`);
51+
52+
if (signal.aborted) {
53+
controller.close();
54+
return;
55+
}
56+
}
57+
}
58+
}
59+
} catch (error) {
60+
if (signal.aborted) break;
1961

20-
if (typeof data === "object" && data !== null && "__end" in data && data.__end === true) {
21-
controller.terminate();
22-
return;
62+
console.error("Error reading from Redis stream:", error);
63+
retryCount++;
64+
if (retryCount >= maxRetries) throw error;
65+
await new Promise((resolve) => setTimeout(resolve, 1000 * retryCount));
66+
}
2367
}
24-
controller.enqueue(`data: ${chunk}\n\n`);
2568
} catch (error) {
26-
console.error("Invalid JSON in stream:", error);
69+
console.error("Fatal error in stream processing:", error);
70+
controller.error(error);
71+
} finally {
72+
await cleanup();
2773
}
2874
},
29-
});
30-
31-
const response = new Response(stream.readable, {
32-
headers: {
33-
"Content-Type": "text/event-stream",
34-
"Cache-Control": "no-cache",
35-
Connection: "keep-alive",
75+
cancel: async () => {
76+
await cleanup();
3677
},
3778
});
3879

39-
let isCleanedUp = false;
40-
4180
async function cleanup() {
4281
if (isCleanedUp) return;
4382
isCleanedUp = true;
44-
await redis.quit();
45-
const writer = stream.writable.getWriter();
46-
if (writer) await writer.close().catch(() => {}); // Ensure close doesn't error if already closed
83+
await redis.quit().catch(console.error);
4784
}
4885

4986
signal.addEventListener("abort", cleanup);
5087

51-
(async () => {
52-
let lastId = "0";
53-
let retryCount = 0;
54-
const maxRetries = 3;
55-
56-
try {
57-
while (!signal.aborted) {
58-
try {
59-
const messages = await redis.xread(
60-
"COUNT",
61-
100,
62-
"BLOCK",
63-
5000,
64-
"STREAMS",
65-
streamKey,
66-
lastId
67-
);
68-
69-
retryCount = 0;
70-
71-
if (messages && messages.length > 0) {
72-
const [_key, entries] = messages[0];
73-
74-
for (const [id, fields] of entries) {
75-
lastId = id;
76-
77-
if (fields && fields.length >= 2 && !stream.writable.locked) {
78-
const writer = stream.writable.getWriter();
79-
try {
80-
await writer.write(fields[1]);
81-
} finally {
82-
writer.releaseLock();
83-
}
84-
}
85-
}
86-
}
87-
} catch (error) {
88-
console.error("Error reading from Redis stream:", error);
89-
retryCount++;
90-
if (retryCount >= maxRetries) throw error;
91-
await new Promise((resolve) => setTimeout(resolve, 1000 * retryCount));
92-
}
93-
}
94-
} catch (error) {
95-
console.error("Fatal error in stream processing:", error);
96-
} finally {
97-
await cleanup();
98-
}
99-
})();
100-
101-
return response;
88+
return new Response(stream.pipeThrough(new TextEncoderStream()), {
89+
headers: {
90+
"Content-Type": "text/event-stream",
91+
"Cache-Control": "no-cache",
92+
Connection: "keep-alive",
93+
},
94+
});
10295
}
10396

10497
async ingestData(
@@ -166,8 +159,7 @@ export class RealtimeStreams {
166159
}
167160

168161
// Send the __end message to indicate the end of the stream
169-
const endData = JSON.stringify({ __end: true });
170-
await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", endData);
162+
await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", END_SENTINEL);
171163

172164
return new Response(null, { status: 200 });
173165
} catch (error) {
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import { redisTest } from "@internal/testcontainers";
2+
import { describe, expect, vi } from "vitest";
3+
import { RealtimeStreams } from "../app/services/realtimeStreams.server.js";
4+
import { convertArrayToReadableStream, convertResponseSSEStreamToArray } from "./utils/streams.js";
5+
6+
vi.setConfig({ testTimeout: 10_000 }); // 5 seconds
7+
8+
// Mock the logger
9+
vi.mock("./logger.server", () => ({
10+
logger: {
11+
debug: vi.fn(),
12+
error: vi.fn(),
13+
},
14+
}));
15+
16+
describe("RealtimeStreams", () => {
17+
redisTest("should stream data from producer to consumer", async ({ redis }) => {
18+
const streams = new RealtimeStreams({ redis: redis.options });
19+
const runId = "test-run";
20+
const streamId = "test-stream";
21+
22+
// Create a stream of test data
23+
const stream = convertArrayToReadableStream(["chunk1", "chunk2", "chunk3"]).pipeThrough(
24+
new TextEncoderStream()
25+
);
26+
27+
// Start consuming the stream
28+
const abortController = new AbortController();
29+
const responsePromise = streams.streamResponse(runId, streamId, abortController.signal);
30+
31+
// Start ingesting data
32+
await streams.ingestData(stream, runId, streamId);
33+
34+
// Get the response and read the stream
35+
const response = await responsePromise;
36+
const received = await convertResponseSSEStreamToArray(response);
37+
38+
expect(received).toEqual(["chunk1", "chunk2", "chunk3"]);
39+
});
40+
41+
redisTest("should handle multiple concurrent streams", async ({ redis }) => {
42+
const streams = new RealtimeStreams({ redis: redis.options });
43+
const runId = "test-run";
44+
45+
// Set up two different streams
46+
const stream1 = convertArrayToReadableStream(["1a", "1b", "1c"]).pipeThrough(
47+
new TextEncoderStream()
48+
);
49+
const stream2 = convertArrayToReadableStream(["2a", "2b", "2c"]).pipeThrough(
50+
new TextEncoderStream()
51+
);
52+
53+
// Start consuming both streams
54+
const abortController = new AbortController();
55+
const response1Promise = streams.streamResponse(runId, "stream1", abortController.signal);
56+
const response2Promise = streams.streamResponse(runId, "stream2", abortController.signal);
57+
58+
// Ingest data to both streams
59+
await Promise.all([
60+
streams.ingestData(stream1, runId, "stream1"),
61+
streams.ingestData(stream2, runId, "stream2"),
62+
]);
63+
64+
// Get and verify both responses
65+
const [response1, response2] = await Promise.all([response1Promise, response2Promise]);
66+
const [received1, received2] = await Promise.all([
67+
convertResponseSSEStreamToArray(response1),
68+
convertResponseSSEStreamToArray(response2),
69+
]);
70+
71+
expect(received1).toEqual(["1a", "1b", "1c"]);
72+
expect(received2).toEqual(["2a", "2b", "2c"]);
73+
});
74+
75+
redisTest("should handle early consumer abort", async ({ redis }) => {
76+
const streams = new RealtimeStreams({ redis: redis.options });
77+
const runId = "test-run";
78+
const streamId = "test-stream";
79+
80+
const stream = convertArrayToReadableStream(["chunk1", "chunk2", "chunk3"]).pipeThrough(
81+
new TextEncoderStream()
82+
);
83+
84+
// Start consuming but abort early
85+
const abortController = new AbortController();
86+
const responsePromise = streams.streamResponse(runId, streamId, abortController.signal);
87+
88+
// Get the response before aborting to ensure stream is properly set up
89+
const response = await responsePromise;
90+
91+
// Start reading the stream
92+
const readPromise = convertResponseSSEStreamToArray(response);
93+
94+
// Abort after a small delay to ensure everything is set up
95+
await new Promise((resolve) => setTimeout(resolve, 100));
96+
abortController.abort();
97+
98+
// Start ingesting data after abort
99+
await streams.ingestData(stream, runId, streamId);
100+
101+
// Verify the stream was terminated
102+
const received = await readPromise;
103+
104+
expect(received).toEqual(["chunk1"]);
105+
});
106+
});

apps/webapp/test/utils/streams.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
export async function convertResponseStreamToArray(response: Response): Promise<string[]> {
2+
return convertReadableStreamToArray(response.body!.pipeThrough(new TextDecoderStream()));
3+
}
4+
5+
export async function convertResponseSSEStreamToArray(response: Response): Promise<string[]> {
6+
const parseSSEDataTransform = new TransformStream<string>({
7+
async transform(chunk, controller) {
8+
for (const line of chunk.split("\n")) {
9+
if (line.startsWith("data:")) {
10+
controller.enqueue(line.slice(6));
11+
}
12+
}
13+
},
14+
});
15+
16+
return convertReadableStreamToArray(
17+
response.body!.pipeThrough(new TextDecoderStream()).pipeThrough(parseSSEDataTransform)
18+
);
19+
}
20+
21+
export async function convertReadableStreamToArray<T>(stream: ReadableStream<T>): Promise<T[]> {
22+
const reader = stream.getReader();
23+
const result: T[] = [];
24+
25+
while (true) {
26+
const { done, value } = await reader.read();
27+
if (done) break;
28+
result.push(value);
29+
}
30+
31+
return result;
32+
}
33+
34+
export function convertArrayToReadableStream<T>(values: T[]): ReadableStream<T> {
35+
return new ReadableStream({
36+
start(controller) {
37+
try {
38+
for (const value of values) {
39+
controller.enqueue(value);
40+
}
41+
} finally {
42+
controller.close();
43+
}
44+
},
45+
});
46+
}

packages/core/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@
197197
"@opentelemetry/sdk-trace-node": "1.25.1",
198198
"@opentelemetry/semantic-conventions": "1.25.1",
199199
"dequal": "^2.0.3",
200+
"eventsource-parser": "^3.0.0",
200201
"execa": "^8.0.1",
201202
"humanize-duration": "^3.27.3",
202203
"jose": "^5.4.0",

0 commit comments

Comments
 (0)