Skip to content

Commit 01c2ca9

Browse files
committed
Make sure streams are subscribed in the "background"
1 parent fbcb749 commit 01c2ca9

File tree

1 file changed

+28
-23
lines changed

1 file changed

+28
-23
lines changed

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -391,30 +391,35 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
391391
this.options.client?.baseUrl
392392
);
393393

394-
const stream = await subscription.subscribe();
395-
396-
// Create the pipeline and start it
397-
stream
398-
.pipeThrough(
399-
new TransformStream({
400-
transform(chunk, controller) {
401-
controller.enqueue({
402-
type: streamKey,
403-
chunk: chunk as TStreams[typeof streamKey],
404-
run,
405-
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
406-
},
407-
})
408-
)
409-
.pipeTo(
410-
new WritableStream({
411-
write(chunk) {
412-
controller.enqueue(chunk);
413-
},
414-
})
415-
)
394+
// Start stream processing in the background
395+
subscription
396+
.subscribe()
397+
.then((stream) => {
398+
stream
399+
.pipeThrough(
400+
new TransformStream({
401+
transform(chunk, controller) {
402+
controller.enqueue({
403+
type: streamKey,
404+
chunk: chunk as TStreams[typeof streamKey],
405+
run,
406+
});
407+
},
408+
})
409+
)
410+
.pipeTo(
411+
new WritableStream({
412+
write(chunk) {
413+
controller.enqueue(chunk);
414+
},
415+
})
416+
)
417+
.catch((error) => {
418+
console.error(`Error in stream ${streamKey}:`, error);
419+
});
420+
})
416421
.catch((error) => {
417-
console.error(`Error in stream ${streamKey}:`, error);
422+
console.error(`Error subscribing to stream ${streamKey}:`, error);
418423
});
419424
}
420425
}

0 commit comments

Comments
 (0)