Skip to content

Commit 2fef889

Browse files
committed
make max dequeue count configurable from supervisor
1 parent 713729d commit 2fef889

File tree

6 files changed

+14
-1
lines changed

6 files changed

+14
-1
lines changed

apps/supervisor/src/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const Env = z.object({
3131
// Dequeue settings (provider mode)
3232
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
3333
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),
34+
TRIGGER_DEQUEUE_MAX_RUN_COUNT: z.coerce.number().int().default(10),
3435

3536
// Optional services
3637
TRIGGER_WARM_START_URL: z.string().optional(),

apps/supervisor/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ class ManagedSupervisor {
113113
managedWorkerSecret: env.MANAGED_WORKER_SECRET,
114114
dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS,
115115
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
116+
maxRunCount: env.TRIGGER_DEQUEUE_MAX_RUN_COUNT,
116117
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
117118
preDequeue: async () => {
118119
if (this.isKubernetes) {

apps/webapp/app/routes/engine.v1.worker-actions.dequeue.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import { json, TypedResponse } from "@remix-run/server-runtime";
2-
import { WorkerApiDequeueRequestBody, WorkerApiDequeueResponseBody } from "@trigger.dev/core/v3/workers";
2+
import {
3+
WorkerApiDequeueRequestBody,
4+
WorkerApiDequeueResponseBody,
5+
} from "@trigger.dev/core/v3/workers";
36
import { createActionWorkerApiRoute } from "~/services/routeBuilders/apiBuilder.server";
47

58
export const action = createActionWorkerApiRoute(
@@ -10,6 +13,7 @@ export const action = createActionWorkerApiRoute(
1013
return json(
1114
await authenticatedWorker.dequeue({
1215
maxResources: body.maxResources,
16+
maxRunCount: body.maxRunCount,
1317
})
1418
);
1519
}

packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ type RunQueueConsumerOptions = {
77
intervalMs?: number;
88
preDequeue?: PreDequeueFn;
99
preSkip?: PreSkipFn;
10+
maxRunCount?: number;
1011
onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
1112
};
1213

1314
export class RunQueueConsumer {
1415
private readonly client: SupervisorHttpClient;
1516
private readonly preDequeue?: PreDequeueFn;
1617
private readonly preSkip?: PreSkipFn;
18+
private readonly maxRunCount?: number;
1719
private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
1820

1921
private intervalMs: number;
@@ -24,6 +26,7 @@ export class RunQueueConsumer {
2426
this.intervalMs = opts.intervalMs ?? 5_000;
2527
this.preDequeue = opts.preDequeue;
2628
this.preSkip = opts.preSkip;
29+
this.maxRunCount = opts.maxRunCount;
2730
this.onDequeue = opts.onDequeue;
2831
this.client = opts.client;
2932
}
@@ -87,6 +90,7 @@ export class RunQueueConsumer {
8790
try {
8891
const response = await this.client.dequeue({
8992
maxResources: preDequeueResult?.maxResources,
93+
maxRunCount: this.maxRunCount,
9094
});
9195

9296
if (!response.success) {

packages/core/src/v3/runEngineWorker/supervisor/schemas.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ export type WorkerApiConnectResponseBody = z.infer<typeof WorkerApiConnectRespon
6666

6767
export const WorkerApiDequeueRequestBody = z.object({
6868
maxResources: MachineResources.optional(),
69+
maxRunCount: z.number().optional(),
6970
});
7071
export type WorkerApiDequeueRequestBody = z.infer<typeof WorkerApiDequeueRequestBody>;
7172

packages/core/src/v3/runEngineWorker/supervisor/session.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & {
1717
dequeueIntervalMs?: number;
1818
preDequeue?: PreDequeueFn;
1919
preSkip?: PreSkipFn;
20+
maxRunCount?: number;
2021
};
2122

2223
export class SupervisorSession extends EventEmitter<WorkerEvents> {
@@ -44,6 +45,7 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
4445
preSkip: opts.preSkip,
4546
onDequeue: this.onDequeue.bind(this),
4647
intervalMs: opts.dequeueIntervalMs,
48+
maxRunCount: opts.maxRunCount,
4749
});
4850

4951
// TODO: This should be dynamic and set by (or at least overridden by) the platform

0 commit comments

Comments
 (0)