Skip to content

V4 dequeue performance (return faster) #1989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,6 @@ const EnvironmentSchema = z.object({
RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
RUN_ENGINE_MAX_DEQUEUE_LOOP_ATTEMPTS: z.coerce.number().int().default(10),

RUN_ENGINE_WORKER_REDIS_HOST: z
.string()
Expand Down
1 change: 0 additions & 1 deletion apps/webapp/app/v3/runEngine.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ function createRunEngine() {
maximumEnvCount: env.RUN_ENGINE_MAXIMUM_ENV_COUNT,
tracer,
},
maxDequeueLoopAttempts: env.RUN_ENGINE_MAX_DEQUEUE_LOOP_ATTEMPTS,
},
runLock: {
redis: {
Expand Down
1 change: 0 additions & 1 deletion internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ export class RunEngine {
logger: new Logger("RunQueue", "debug"),
redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` },
retryOptions: options.queue?.retryOptions,
maxDequeueLoopAttempts: options.queue?.maxDequeueLoopAttempts ?? 10,
});

this.worker = new Worker({
Expand Down
1 change: 0 additions & 1 deletion internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ export type RunEngineOptions = {
FairQueueSelectionStrategyOptions,
"parentQueueLimit" | "tracer" | "biases" | "reuseSnapshotCount" | "maximumEnvCount"
>;
maxDequeueLoopAttempts?: number;
};
runLock: {
redis: RedisOptions;
Expand Down
58 changes: 23 additions & 35 deletions internal-packages/run-engine/src/run-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
type Result,
} from "@internal/redis";
import { MessageNotFoundError } from "./errors.js";
import { tryCatch } from "@trigger.dev/core";

const SemanticAttributes = {
QUEUE: "runqueue.queue",
Expand All @@ -51,7 +52,6 @@ export type RunQueueOptions = {
verbose?: boolean;
logger?: Logger;
retryOptions?: RetryOptions;
maxDequeueLoopAttempts?: number;
};

type DequeuedMessage = {
Expand All @@ -78,7 +78,6 @@ export class RunQueue {
private redis: Redis;
public keys: RunQueueKeyProducer;
private queueSelectionStrategy: RunQueueSelectionStrategy;
private maxDequeueLoopAttempts: number;

constructor(private readonly options: RunQueueOptions) {
this.retryOptions = options.retryOptions ?? defaultRetrySettings;
Expand All @@ -94,7 +93,6 @@ export class RunQueue {

this.keys = options.keys;
this.queueSelectionStrategy = options.queueSelectionStrategy;
this.maxDequeueLoopAttempts = options.maxDequeueLoopAttempts ?? 10;

this.subscriber = createRedisClient(options.redis, {
onError: (error) => {
Expand Down Expand Up @@ -396,55 +394,45 @@ export class RunQueue {

let attemptedEnvs = 0;
let attemptedQueues = 0;
let dequeueLoopAttempts = 0;

const messages: DequeuedMessage[] = [];

// Each env starts with its list of candidate queues
const tenantQueues: Record<string, string[]> = {};

// Initialize tenantQueues with the queues for each env
for (const env of envQueues) {
tenantQueues[env.envId] = [...env.queues]; // Create a copy of the queues array
}

// Continue until we've hit max count or all tenants have empty queue lists
while (
messages.length < maxCount &&
Object.values(tenantQueues).some((queues) => queues.length > 0) &&
dequeueLoopAttempts < this.maxDequeueLoopAttempts
) {
dequeueLoopAttempts++;
attemptedEnvs++;

for (const env of envQueues) {
attemptedEnvs++;

// Skip if this tenant has no more queues
if (tenantQueues[env.envId].length === 0) {
continue;
}

// Pop the next queue (using round-robin order)
const queue = tenantQueues[env.envId].shift()!;
for (const queue of env.queues) {
attemptedQueues++;

// Attempt to dequeue from this queue
const message = await this.#callDequeueMessage({
messageQueue: queue,
});
const [error, message] = await tryCatch(
this.#callDequeueMessage({
messageQueue: queue,
})
);

if (error) {
this.logger.error(
`[dequeueMessageInSharedQueue][${this.name}] Failed to dequeue from queue ${queue}`,
{
error,
}
);
}

Comment on lines +407 to 421
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Errors are swallowed – consider surfacing them to callers

tryCatch logs the error but then execution continues as if nothing happened.
In production this can create “black-hole” failures where a queue is permanently unhealthy yet no call-site notices. At minimum:

 if (error) {
   this.logger.error(
     `[dequeueMessageInSharedQueue][${this.name}] Failed to dequeue from queue ${queue}`,
     {
       error,
     }
   );
+  // Bubble a warning to the tracing span so alerts can fire
+  span.recordException(error as Error);
 }

Better: propagate a metric / circuit-breaker so repeated failures trigger recovery logic.

Committable suggestion skipped: line range outside the PR's diff.

if (message) {
messages.push(message);
// Re-add this queue at the end, since it might have more messages
tenantQueues[env.envId].push(queue);
}
// If message is null, do not re-add the queue in this cycle

// If we've reached maxCount, break out of the loop
// If we've reached maxCount, we don't want to look at this env anymore
if (messages.length >= maxCount) {
break;
}
}

// If we've reached maxCount, we're completely done
if (messages.length >= maxCount) {
break;
}
}

span.setAttributes({
Expand Down
38 changes: 6 additions & 32 deletions references/hello-world/src/trigger/deadlocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,45 +14,19 @@ export const deadlockReleasingQueue = queue({
export const deadlockTester = task({
id: "deadlock-tester",
run: async (payload: any, { ctx }) => {
// await deadlockNestedTask.triggerAndWait({
// message: "Hello, world!",
// });

await deadlockNestedTask.batchTriggerAndWait([
{
payload: {
message: "Hello, world!",
},
},
{
payload: {
message: "Hello, world!",
},
},
]);
await deadlockNestedTask.triggerAndWait({
message: "Hello, world!",
});
Comment on lines +17 to +19
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Infinite mutual triggering – system can still dead-lock

deadlockTester now triggers deadlockNestedTask once (instead of twice) but nothing prevents the callee from immediately triggering the caller again (see lines 27-29). Without a depth or retry guard this still creates an unbounded recursion chain:

Tester → Nested → Tester → Nested …​

Add a counter or a unique correlation id to detect and stop after n hops, or use trigger (fire-and-forget) instead of triggerAndWait if synchronous wait isn’t required.

},
});

export const deadlockNestedTask = task({
id: "deadlock-nested-task",
queue: deadlockQueue,
run: async (payload: any, { ctx }) => {
// await deadlockTester.triggerAndWait({
// message: "Hello, world!",
// });

await deadlockTester.batchTriggerAndWait([
{
payload: {
message: "Hello, world!",
},
},
{
payload: {
message: "Hello, world!",
},
},
]);
await deadlockTester.triggerAndWait({
message: "Hello, world!",
});

return {
message: "Hello, world!",
Expand Down
Loading