Skip to content

Commit f65e0f0

Browse files
committed
Improve the stream throttling
1 parent e7ca7d3 commit f65e0f0

File tree

3 files changed

+100
-34
lines changed

3 files changed

+100
-34
lines changed

packages/react-hooks/src/hooks/useRealtime.ts

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import { AnyTask, ApiClient, InferRunTypes, RealtimeRun } from "@trigger.dev/core/v3";
44
import { useCallback, useEffect, useId, useRef, useState } from "react";
5-
import { throttle } from "../utils/throttle.js";
65
import { KeyedMutator, useSWR } from "../utils/trigger-swr.js";
76
import { useApiClient, UseApiClientOptions } from "./useApiClient.js";
7+
import { createThrottledQueue } from "../utils/throttle.js";
88

99
export type UseRealtimeRunOptions = UseApiClientOptions & {
1010
id?: string;
@@ -78,12 +78,7 @@ export function useRealtimeRun<TTask extends AnyTask>(
7878
const abortController = new AbortController();
7979
abortControllerRef.current = abortController;
8080

81-
await processRealtimeRun(
82-
runId,
83-
apiClient,
84-
throttle(mutateRun, options?.experimental_throttleInMs),
85-
abortControllerRef
86-
);
81+
await processRealtimeRun(runId, apiClient, mutateRun, abortControllerRef);
8782
} catch (err) {
8883
// Ignore abort errors as they are expected.
8984
if ((err as any).name === "AbortError") {
@@ -199,10 +194,11 @@ export function useRealtimeRunWithStreams<
199194
await processRealtimeRunWithStreams(
200195
runId,
201196
apiClient,
202-
throttle(mutateRun, options?.experimental_throttleInMs),
203-
throttle(mutateStreams, options?.experimental_throttleInMs),
197+
mutateRun,
198+
mutateStreams,
204199
streamsRef,
205-
abortControllerRef
200+
abortControllerRef,
201+
options?.experimental_throttleInMs
206202
);
207203
} catch (err) {
208204
// Ignore abort errors as they are expected.
@@ -285,13 +281,7 @@ export function useRealtimeRunsWithTag<TTask extends AnyTask>(
285281
const abortController = new AbortController();
286282
abortControllerRef.current = abortController;
287283

288-
await processRealtimeRunsWithTag(
289-
tag,
290-
apiClient,
291-
throttle(mutateRuns, options?.experimental_throttleInMs),
292-
runsRef,
293-
abortControllerRef
294-
);
284+
await processRealtimeRunsWithTag(tag, apiClient, mutateRuns, runsRef, abortControllerRef);
295285
} catch (err) {
296286
// Ignore abort errors as they are expected.
297287
if ((err as any).name === "AbortError") {
@@ -372,13 +362,7 @@ export function useRealtimeBatch<TTask extends AnyTask>(
372362
const abortController = new AbortController();
373363
abortControllerRef.current = abortController;
374364

375-
await processRealtimeBatch(
376-
batchId,
377-
apiClient,
378-
throttle(mutateRuns, options?.experimental_throttleInMs),
379-
runsRef,
380-
abortControllerRef
381-
);
365+
await processRealtimeBatch(batchId, apiClient, mutateRuns, runsRef, abortControllerRef);
382366
} catch (err) {
383367
// Ignore abort errors as they are expected.
384368
if ((err as any).name === "AbortError") {
@@ -486,23 +470,51 @@ async function processRealtimeRunWithStreams<
486470
mutateRunData: KeyedMutator<RealtimeRun<TTask>>,
487471
mutateStreamData: KeyedMutator<StreamResults<TStreams>>,
488472
existingDataRef: React.MutableRefObject<StreamResults<TStreams>>,
489-
abortControllerRef: React.MutableRefObject<AbortController | null>
473+
abortControllerRef: React.MutableRefObject<AbortController | null>,
474+
throttleInMs?: number
490475
) {
491476
const subscription = apiClient.subscribeToRun<InferRunTypes<TTask>>(runId, {
492477
signal: abortControllerRef.current?.signal,
493478
});
494479

480+
type StreamUpdate = {
481+
type: keyof TStreams;
482+
chunk: any;
483+
};
484+
485+
const streamQueue = createThrottledQueue<StreamUpdate>(async (updates) => {
486+
const nextStreamData = { ...existingDataRef.current };
487+
488+
// Group updates by type
489+
const updatesByType = updates.reduce(
490+
(acc, update) => {
491+
if (!acc[update.type]) {
492+
acc[update.type] = [];
493+
}
494+
acc[update.type].push(update.chunk);
495+
return acc;
496+
},
497+
{} as Record<keyof TStreams, any[]>
498+
);
499+
500+
// Apply all updates
501+
for (const [type, chunks] of Object.entries(updatesByType)) {
502+
// @ts-ignore
503+
nextStreamData[type] = [...(existingDataRef.current[type] || []), ...chunks];
504+
}
505+
506+
await mutateStreamData(nextStreamData);
507+
}, throttleInMs);
508+
495509
for await (const part of subscription.withStreams<TStreams>()) {
496510
if (part.type === "run") {
497511
mutateRunData(part.run);
498512
} else {
499-
const nextStreamData = {
500-
...existingDataRef.current,
513+
streamQueue.add({
514+
type: part.type,
501515
// @ts-ignore
502-
[part.type]: [...(existingDataRef.current[part.type] || []), part.chunk],
503-
};
504-
505-
mutateStreamData(nextStreamData);
516+
chunk: part.chunk,
517+
});
506518
}
507519
}
508520
}
Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,58 @@
1-
import throttleFunction from "throttleit";
1+
// Reusable throttle utility
2+
export type ThrottledQueue<T> = {
3+
add: (item: T) => void;
4+
flush: () => Promise<void>;
5+
isEmpty: () => boolean;
6+
};
27

3-
export function throttle<T extends (...args: any[]) => any>(fn: T, waitMs: number | undefined): T {
4-
return waitMs != null ? throttleFunction(fn, waitMs) : fn;
8+
export function createThrottledQueue<T>(
9+
onFlush: (items: T[]) => Promise<void>,
10+
throttleInMs?: number
11+
): ThrottledQueue<T> {
12+
let queue: T[] = [];
13+
let lastFlushTime = 0;
14+
let flushPromise: Promise<void> | null = null;
15+
16+
const scheduleFlush = async () => {
17+
// If no throttle specified or there's already a flush in progress, return
18+
if (!throttleInMs) {
19+
// Immediately flush when no throttling is specified
20+
const itemsToFlush = [...queue];
21+
queue = [];
22+
await onFlush(itemsToFlush);
23+
return;
24+
}
25+
26+
if (queue.length === 0 || flushPromise) return;
27+
28+
const now = Date.now();
29+
const timeUntilNextFlush = Math.max(0, lastFlushTime + throttleInMs - now);
30+
31+
if (timeUntilNextFlush === 0) {
32+
const itemsToFlush = [...queue];
33+
queue = [];
34+
lastFlushTime = now;
35+
flushPromise = onFlush(itemsToFlush).finally(() => {
36+
flushPromise = null;
37+
// Check if more items accumulated during flush
38+
scheduleFlush();
39+
});
40+
} else {
41+
setTimeout(scheduleFlush, timeUntilNextFlush);
42+
}
43+
};
44+
45+
return {
46+
add: (item: T) => {
47+
queue.push(item);
48+
scheduleFlush();
49+
},
50+
flush: async () => {
51+
if (queue.length === 0) return;
52+
const itemsToFlush = [...queue];
53+
queue = [];
54+
await onFlush(itemsToFlush);
55+
},
56+
isEmpty: () => queue.length === 0,
57+
};
558
}

references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export default function TriggerButton({ accessToken }: { accessToken: string })
2323
>("openai-streaming", {
2424
accessToken,
2525
baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL,
26+
experimental_throttleInMs: 100,
2627
});
2728

2829
const openWeatherReport = useCallback(() => {

0 commit comments

Comments
 (0)