Skip to content

fix: prevent unbounded looping while dequeueing by exiting early when no queues have messages to dequeue #1946

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions internal-packages/run-engine/src/run-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export type RunQueueOptions = {
keys: RunQueueKeyProducer;
queueSelectionStrategy: RunQueueSelectionStrategy;
verbose?: boolean;
logger: Logger;
logger?: Logger;
retryOptions?: RetryOptions;
};

Expand Down Expand Up @@ -88,7 +88,7 @@ export class RunQueue {
});
},
});
this.logger = options.logger;
this.logger = options.logger ?? new Logger("RunQueue", "warn");

this.keys = options.keys;
this.queueSelectionStrategy = options.queueSelectionStrategy;
Expand Down Expand Up @@ -404,11 +404,17 @@ export class RunQueue {
tenantQueues[env.envId] = [...env.queues]; // Create a copy of the queues array
}

// Track if we successfully dequeued any message in a complete cycle
let successfulDequeueInCycle = false;

// Continue until we've hit max count or all tenants have empty queue lists
while (
messages.length < maxCount &&
Object.values(tenantQueues).some((queues) => queues.length > 0)
) {
// Reset the success flag at the start of each cycle
successfulDequeueInCycle = false;

for (const env of envQueues) {
attemptedEnvs++;

Expand All @@ -428,6 +434,7 @@ export class RunQueue {

if (message) {
messages.push(message);
successfulDequeueInCycle = true;
// Re-add this queue at the end, since it might have more messages
tenantQueues[env.envId].push(queue);
}
Expand All @@ -438,6 +445,14 @@ export class RunQueue {
break;
}
}

// If we completed a full cycle through all tenants with no successful dequeues,
// exit early as we're likely hitting concurrency limits or have no ready messages
if (!successfulDequeueInCycle) {
// IMPORTANT: Keep this log message as it's used in tests
this.logger.log("No successful dequeues in a full cycle, exiting...");
break;
}
}

span.setAttributes({
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { redisTest } from "@internal/testcontainers";
import { trace } from "@internal/tracing";
import { Logger } from "@trigger.dev/core/logger";
import { describe } from "node:test";
import { FairQueueSelectionStrategy } from "../fairQueueSelectionStrategy.js";
import { RunQueue } from "../index.js";
Expand All @@ -12,7 +11,6 @@ const testOptions = {
tracer: trace.getTracer("rq"),
workers: 1,
defaultEnvConcurrency: 25,
logger: new Logger("RunQueue", "warn"),
retryOptions: {
maxAttempts: 5,
factor: 1.1,
Expand Down Expand Up @@ -264,4 +262,95 @@ describe("RunQueue.dequeueMessageFromMasterQueue", () => {
}
}
);

redisTest(
"should exit early when no messages can be dequeued in a full cycle",
async ({ redisContainer }) => {
const mockLogger = {
log: vi.fn(),
error: vi.fn(),
warn: vi.fn(),
debug: vi.fn(),
name: "test-logger",
level: "debug",
filteredKeys: [],
additionalFields: {},
setLevel: vi.fn(),
setFilteredKeys: vi.fn(),
setAdditionalFields: vi.fn(),
child: vi.fn(),
};

const queue = new RunQueue({
...testOptions,
queueSelectionStrategy: new FairQueueSelectionStrategy({
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
keys: testOptions.keys,
}),
redis: {
keyPrefix: "runqueue:test:",
host: redisContainer.getHost(),
port: redisContainer.getPort(),
},
// @ts-expect-error
logger: mockLogger,
});

try {
const envMasterQueue = `env:${authenticatedEnvDev.id}`;
const queueCount = 10; // Reduced for simplicity

// First, create all queues and enqueue initial messages
for (let i = 0; i < queueCount; i++) {
const queueName = `${messageDev.queue}_${i}`;
// Set each queue's concurrency limit to 0 (this guarantees dequeue will fail)
await queue.updateQueueConcurrencyLimits(authenticatedEnvDev, queueName, 0);

// Enqueue a message to each queue
await queue.enqueueMessage({
env: authenticatedEnvDev,
message: { ...messageDev, runId: `r${4321 + i}`, queue: queueName },
masterQueues: ["main", envMasterQueue],
});
}

// Try to dequeue messages - this should exit early due to concurrency limits
const startTime = Date.now();
const dequeued = await queue.dequeueMessageFromMasterQueue(
"test_12345",
envMasterQueue,
queueCount
);
const endTime = Date.now();

// Verify no messages were dequeued
expect(dequeued.length).toBe(0);

// Verify the operation completed quickly (under 1000ms)
const duration = endTime - startTime;
expect(duration).toBeLessThan(1000);

// Verify we only logged one early exit message
expect(mockLogger.log).toHaveBeenCalledWith(
expect.stringContaining("No successful dequeues in a full cycle, exiting")
);
expect(mockLogger.log.mock.calls.length).toBeLessThanOrEqual(2);

// Verify all messages are still in queues
let totalRemaining = 0;
for (let i = 0; i < queueCount; i++) {
const queueName = `${messageDev.queue}_${i}`;
const length = await queue.lengthOfQueue(authenticatedEnvDev, queueName);
totalRemaining += length;
}
expect(totalRemaining).toBe(queueCount);
} finally {
await queue.quit();
}
}
);
});