-
-
Notifications
You must be signed in to change notification settings - Fork 728
fix: realtime streams from tasks now retry if they receive a 408 timeout error from the realtime/trigger.dev server #1952
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
--- | ||
"@trigger.dev/sdk": patch | ||
--- | ||
|
||
Fix issue where realtime streams would cut off after 5 minutes |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -1,3 +1,7 @@ | ||||||||||||||||||||||||||||||||||||||||
import { request as httpsRequest } from "node:https"; | ||||||||||||||||||||||||||||||||||||||||
import { request as httpRequest } from "node:http"; | ||||||||||||||||||||||||||||||||||||||||
import { URL } from "node:url"; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
export type MetadataOptions<T> = { | ||||||||||||||||||||||||||||||||||||||||
baseUrl: string; | ||||||||||||||||||||||||||||||||||||||||
runId: string; | ||||||||||||||||||||||||||||||||||||||||
|
@@ -7,57 +11,140 @@ export type MetadataOptions<T> = { | |||||||||||||||||||||||||||||||||||||||
signal?: AbortSignal; | ||||||||||||||||||||||||||||||||||||||||
version?: "v1" | "v2"; | ||||||||||||||||||||||||||||||||||||||||
target?: "self" | "parent" | "root"; | ||||||||||||||||||||||||||||||||||||||||
maxRetries?: number; | ||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
export class MetadataStream<T> { | ||||||||||||||||||||||||||||||||||||||||
private controller = new AbortController(); | ||||||||||||||||||||||||||||||||||||||||
private serverStream: ReadableStream<T>; | ||||||||||||||||||||||||||||||||||||||||
private consumerStream: ReadableStream<T>; | ||||||||||||||||||||||||||||||||||||||||
private streamPromise: Promise<void | Response>; | ||||||||||||||||||||||||||||||||||||||||
private streamPromise: Promise<void>; | ||||||||||||||||||||||||||||||||||||||||
private retryCount = 0; | ||||||||||||||||||||||||||||||||||||||||
private readonly maxRetries: number; | ||||||||||||||||||||||||||||||||||||||||
private currentChunkIndex = 0; | ||||||||||||||||||||||||||||||||||||||||
private reader: ReadableStreamDefaultReader<T>; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
constructor(private options: MetadataOptions<T>) { | ||||||||||||||||||||||||||||||||||||||||
const [serverStream, consumerStream] = this.createTeeStreams(); | ||||||||||||||||||||||||||||||||||||||||
this.serverStream = serverStream; | ||||||||||||||||||||||||||||||||||||||||
this.consumerStream = consumerStream; | ||||||||||||||||||||||||||||||||||||||||
this.maxRetries = options.maxRetries ?? 10; | ||||||||||||||||||||||||||||||||||||||||
this.reader = this.serverStream.getReader(); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
this.streamPromise = this.initializeServerStream(); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private createTeeStreams() { | ||||||||||||||||||||||||||||||||||||||||
const readableSource = new ReadableStream<T>({ | ||||||||||||||||||||||||||||||||||||||||
start: async (controller) => { | ||||||||||||||||||||||||||||||||||||||||
for await (const value of this.options.source) { | ||||||||||||||||||||||||||||||||||||||||
controller.enqueue(value); | ||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||
for await (const value of this.options.source) { | ||||||||||||||||||||||||||||||||||||||||
controller.enqueue(value); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
controller.close(); | ||||||||||||||||||||||||||||||||||||||||
} catch (error) { | ||||||||||||||||||||||||||||||||||||||||
controller.error(error); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
controller.close(); | ||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
return readableSource.tee(); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private initializeServerStream(): Promise<Response> { | ||||||||||||||||||||||||||||||||||||||||
const serverStream = this.serverStream.pipeThrough( | ||||||||||||||||||||||||||||||||||||||||
new TransformStream<T, string>({ | ||||||||||||||||||||||||||||||||||||||||
async transform(chunk, controller) { | ||||||||||||||||||||||||||||||||||||||||
controller.enqueue(JSON.stringify(chunk) + "\n"); | ||||||||||||||||||||||||||||||||||||||||
private async makeRequest(startFromChunk: number = 0): Promise<void> { | ||||||||||||||||||||||||||||||||||||||||
return new Promise((resolve, reject) => { | ||||||||||||||||||||||||||||||||||||||||
const url = new URL(this.buildUrl()); | ||||||||||||||||||||||||||||||||||||||||
const timeout = 15 * 60 * 1000; // 15 minutes | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
const requestFn = url.protocol === "https:" ? httpsRequest : httpRequest; | ||||||||||||||||||||||||||||||||||||||||
const req = requestFn({ | ||||||||||||||||||||||||||||||||||||||||
method: "POST", | ||||||||||||||||||||||||||||||||||||||||
hostname: url.hostname, | ||||||||||||||||||||||||||||||||||||||||
port: url.port || (url.protocol === "https:" ? 443 : 80), | ||||||||||||||||||||||||||||||||||||||||
path: url.pathname + url.search, | ||||||||||||||||||||||||||||||||||||||||
headers: { | ||||||||||||||||||||||||||||||||||||||||
...this.options.headers, | ||||||||||||||||||||||||||||||||||||||||
"Content-Type": "application/json", | ||||||||||||||||||||||||||||||||||||||||
"X-Resume-From-Chunk": startFromChunk.toString(), | ||||||||||||||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
return fetch(this.buildUrl(), { | ||||||||||||||||||||||||||||||||||||||||
method: "POST", | ||||||||||||||||||||||||||||||||||||||||
headers: this.options.headers ?? {}, | ||||||||||||||||||||||||||||||||||||||||
body: serverStream, | ||||||||||||||||||||||||||||||||||||||||
signal: this.controller.signal, | ||||||||||||||||||||||||||||||||||||||||
// @ts-expect-error | ||||||||||||||||||||||||||||||||||||||||
duplex: "half", | ||||||||||||||||||||||||||||||||||||||||
timeout, | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
req.on("error", (error) => { | ||||||||||||||||||||||||||||||||||||||||
reject(error); | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
req.on("timeout", () => { | ||||||||||||||||||||||||||||||||||||||||
req.destroy(new Error("Request timed out")); | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
req.on("response", (res) => { | ||||||||||||||||||||||||||||||||||||||||
if (res.statusCode === 408) { | ||||||||||||||||||||||||||||||||||||||||
if (this.retryCount < this.maxRetries) { | ||||||||||||||||||||||||||||||||||||||||
this.retryCount++; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
resolve(this.makeRequest(this.currentChunkIndex)); | ||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
reject(new Error(`Max retries (${this.maxRetries}) exceeded after timeout`)); | ||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) { | ||||||||||||||||||||||||||||||||||||||||
const error = new Error(`HTTP error! status: ${res.statusCode}`); | ||||||||||||||||||||||||||||||||||||||||
reject(error); | ||||||||||||||||||||||||||||||||||||||||
return; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
res.on("end", () => { | ||||||||||||||||||||||||||||||||||||||||
resolve(); | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
res.resume(); | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
if (this.options.signal) { | ||||||||||||||||||||||||||||||||||||||||
this.options.signal.addEventListener("abort", () => { | ||||||||||||||||||||||||||||||||||||||||
req.destroy(new Error("Request aborted")); | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
const processStream = async () => { | ||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||
while (true) { | ||||||||||||||||||||||||||||||||||||||||
const { done, value } = await this.reader.read(); | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
if (done) { | ||||||||||||||||||||||||||||||||||||||||
req.end(); | ||||||||||||||||||||||||||||||||||||||||
break; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
const stringified = JSON.stringify(value) + "\n"; | ||||||||||||||||||||||||||||||||||||||||
req.write(stringified); | ||||||||||||||||||||||||||||||||||||||||
this.currentChunkIndex++; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+117
to
+125
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Release reader lock once the final chunk is flushed. After - if (done) {
- req.end();
- break;
- }
+ if (done) {
+ req.end();
+ this.reader.releaseLock();
+ break;
+ } 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||
} catch (error) { | ||||||||||||||||||||||||||||||||||||||||
req.destroy(error as Error); | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
}; | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
processStream().catch((error) => { | ||||||||||||||||||||||||||||||||||||||||
reject(error); | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
}); | ||||||||||||||||||||||||||||||||||||||||
Comment on lines
+54
to
134
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential socket & memory leak on 408 retry path.
@@
- if (res.statusCode === 408) {
- if (this.retryCount < this.maxRetries) {
- this.retryCount++;
-
- resolve(this.makeRequest(this.currentChunkIndex));
- return;
- }
- reject(new Error(`Max retries (${this.maxRetries}) exceeded after timeout`));
- return;
- }
+ if (res.statusCode === 408) {
+ res.resume(); // consume data
+ req.destroy(); // close socket
+ this.reader.releaseLock();
+
+ if (this.retryCount < this.maxRetries) {
+ this.retryCount++;
+ // Pass the *last* successful chunk index
+ resolve(this.makeRequest(this.currentChunkIndex - 1));
+ return;
+ }
+ reject(new Error(`Max retries (${this.maxRetries}) exceeded after timeout`));
+ return;
+ } Optionally, replace the recursive strategy with a |
||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
private async initializeServerStream(): Promise<void> { | ||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||
await this.makeRequest(0); | ||||||||||||||||||||||||||||||||||||||||
} catch (error) { | ||||||||||||||||||||||||||||||||||||||||
this.reader.releaseLock(); | ||||||||||||||||||||||||||||||||||||||||
throw error; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
public async wait(): Promise<void> { | ||||||||||||||||||||||||||||||||||||||||
return this.streamPromise.then(() => void 0); | ||||||||||||||||||||||||||||||||||||||||
return this.streamPromise; | ||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||
public [Symbol.asyncIterator]() { | ||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Error‑first propagation is good, but release reader lock on failure.
controller.error(error)
is great for signalling downstream consumers, however the reader lock onthis.serverStream
is still held.If an upstream error is thrown, subsequent retries will fail to acquire the lock, leading to a stalled pipeline.
📝 Committable suggestion