Skip to content

fix: realtime streams from tasks now retry if they receive a 408 timeout error from the realtime/trigger.dev server #1952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/hip-cups-wave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

Fix issue where realtime streams would cut off after 5 minutes
129 changes: 108 additions & 21 deletions packages/core/src/v3/runMetadata/metadataStream.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import { request as httpsRequest } from "node:https";
import { request as httpRequest } from "node:http";
import { URL } from "node:url";

export type MetadataOptions<T> = {
baseUrl: string;
runId: string;
Expand All @@ -7,57 +11,140 @@ export type MetadataOptions<T> = {
signal?: AbortSignal;
version?: "v1" | "v2";
target?: "self" | "parent" | "root";
maxRetries?: number;
};

export class MetadataStream<T> {
private controller = new AbortController();
private serverStream: ReadableStream<T>;
private consumerStream: ReadableStream<T>;
private streamPromise: Promise<void | Response>;
private streamPromise: Promise<void>;
private retryCount = 0;
private readonly maxRetries: number;
private currentChunkIndex = 0;
private reader: ReadableStreamDefaultReader<T>;

constructor(private options: MetadataOptions<T>) {
const [serverStream, consumerStream] = this.createTeeStreams();
this.serverStream = serverStream;
this.consumerStream = consumerStream;
this.maxRetries = options.maxRetries ?? 10;
this.reader = this.serverStream.getReader();

this.streamPromise = this.initializeServerStream();
}

private createTeeStreams() {
const readableSource = new ReadableStream<T>({
start: async (controller) => {
for await (const value of this.options.source) {
controller.enqueue(value);
try {
for await (const value of this.options.source) {
controller.enqueue(value);
}
controller.close();
} catch (error) {
controller.error(error);
}

controller.close();
},
});
Comment on lines 38 to 49
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Error‑first propagation is good, but release reader lock on failure.

controller.error(error) is great for signalling downstream consumers, however the reader lock on this.serverStream is still held.
If an upstream error is thrown, subsequent retries will fail to acquire the lock, leading to a stalled pipeline.

         } catch (error) {
-          controller.error(error);
+          controller.error(error);
+          // Free the lock so a retry can acquire it.
+          this.reader?.releaseLock?.();
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const readableSource = new ReadableStream<T>({
start: async (controller) => {
for await (const value of this.options.source) {
controller.enqueue(value);
try {
for await (const value of this.options.source) {
controller.enqueue(value);
}
controller.close();
} catch (error) {
controller.error(error);
}
controller.close();
},
});
const readableSource = new ReadableStream<T>({
start: async (controller) => {
try {
for await (const value of this.options.source) {
controller.enqueue(value);
}
controller.close();
} catch (error) {
controller.error(error);
// Free the lock so a retry can acquire it.
this.reader?.releaseLock?.();
}
},
});


return readableSource.tee();
}

private initializeServerStream(): Promise<Response> {
const serverStream = this.serverStream.pipeThrough(
new TransformStream<T, string>({
async transform(chunk, controller) {
controller.enqueue(JSON.stringify(chunk) + "\n");
private async makeRequest(startFromChunk: number = 0): Promise<void> {
return new Promise((resolve, reject) => {
const url = new URL(this.buildUrl());
const timeout = 15 * 60 * 1000; // 15 minutes

const requestFn = url.protocol === "https:" ? httpsRequest : httpRequest;
const req = requestFn({
method: "POST",
hostname: url.hostname,
port: url.port || (url.protocol === "https:" ? 443 : 80),
path: url.pathname + url.search,
headers: {
...this.options.headers,
"Content-Type": "application/json",
"X-Resume-From-Chunk": startFromChunk.toString(),
},
})
);

return fetch(this.buildUrl(), {
method: "POST",
headers: this.options.headers ?? {},
body: serverStream,
signal: this.controller.signal,
// @ts-expect-error
duplex: "half",
timeout,
});

req.on("error", (error) => {
reject(error);
});

req.on("timeout", () => {
req.destroy(new Error("Request timed out"));
});

req.on("response", (res) => {
if (res.statusCode === 408) {
if (this.retryCount < this.maxRetries) {
this.retryCount++;

resolve(this.makeRequest(this.currentChunkIndex));
return;
}
reject(new Error(`Max retries (${this.maxRetries}) exceeded after timeout`));
return;
}

if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) {
const error = new Error(`HTTP error! status: ${res.statusCode}`);
reject(error);
return;
}

res.on("end", () => {
resolve();
});

res.resume();
});

if (this.options.signal) {
this.options.signal.addEventListener("abort", () => {
req.destroy(new Error("Request aborted"));
});
}

const processStream = async () => {
try {
while (true) {
const { done, value } = await this.reader.read();

if (done) {
req.end();
break;
}

const stringified = JSON.stringify(value) + "\n";
req.write(stringified);
this.currentChunkIndex++;
}
Comment on lines +117 to +125
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Release reader lock once the final chunk is flushed.

After done is returned the lock is still active.
This prevents GC of the stream and can block callers that might wish to re‑read.

-            if (done) {
-              req.end();
-              break;
-            }
+            if (done) {
+              req.end();
+              this.reader.releaseLock();
+              break;
+            }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (done) {
req.end();
break;
}
const stringified = JSON.stringify(value) + "\n";
req.write(stringified);
this.currentChunkIndex++;
}
if (done) {
req.end();
this.reader.releaseLock();
break;
}
const stringified = JSON.stringify(value) + "\n";
req.write(stringified);
this.currentChunkIndex++;
}

} catch (error) {
req.destroy(error as Error);
}
};

processStream().catch((error) => {
reject(error);
});
});
Comment on lines +54 to 134
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential socket & memory leak on 408 retry path.

  1. When a 408 is received you resolve(this.makeRequest(...)) but you never:
    • destroy the current socket
    • drain res (to prevent ‘socket hang up’ warnings)
    • release the reader lock

  2. makeRequest is recursively invoked, which builds an ever‑growing call stack if many retries occur.
    A simple loop is safer.

  3. The currentChunkIndex is incremented after the chunk is written, so the value passed in "X-Resume-From-Chunk" on retry points to the next chunk rather than the last confirmed one – the server may miss one chunk.

@@
-        if (res.statusCode === 408) {
-          if (this.retryCount < this.maxRetries) {
-            this.retryCount++;
-
-            resolve(this.makeRequest(this.currentChunkIndex));
-            return;
-          }
-          reject(new Error(`Max retries (${this.maxRetries}) exceeded after timeout`));
-          return;
-        }
+        if (res.statusCode === 408) {
+          res.resume();            // consume data
+          req.destroy();           // close socket
+          this.reader.releaseLock();
+
+          if (this.retryCount < this.maxRetries) {
+            this.retryCount++;
+            // Pass the *last* successful chunk index
+            resolve(this.makeRequest(this.currentChunkIndex - 1));
+            return;
+          }
+          reject(new Error(`Max retries (${this.maxRetries}) exceeded after timeout`));
+          return;
+        }

Optionally, replace the recursive strategy with a while loop to avoid unbounded stack growth.

}

private async initializeServerStream(): Promise<void> {
try {
await this.makeRequest(0);
} catch (error) {
this.reader.releaseLock();
throw error;
}
}

public async wait(): Promise<void> {
return this.streamPromise.then(() => void 0);
return this.streamPromise;
}

public [Symbol.asyncIterator]() {
Expand Down