Skip to content

Commit 614f74f

Browse files
committed
WIP
1 parent 6db88f2 commit 614f74f

File tree

10 files changed

+106
-190
lines changed

10 files changed

+106
-190
lines changed

.vscode/launch.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,14 @@
141141
"command": "pnpm run test --filter @internal/run-engine",
142142
"cwd": "${workspaceFolder}",
143143
"sourceMaps": true
144+
},
145+
{
146+
"type": "node-terminal",
147+
"request": "launch",
148+
"name": "Debug RunQueue tests",
149+
"command": "pnpm run test ./src/run-queue/index.test.ts",
150+
"cwd": "${workspaceFolder}/internal-packages/run-engine",
151+
"sourceMaps": true
144152
}
145153
]
146154
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ import {
7070
isPendingExecuting,
7171
} from "./statuses.js";
7272
import { HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js";
73-
import { RunQueueShortKeyProducer } from "../run-queue/keyProducer.js";
73+
import { RunQueueFullKeyProducer } from "../run-queue/keyProducer.js";
7474
import { retryOutcomeFromCompletion } from "./retrying";
7575

7676
const workerCatalog = {
@@ -153,7 +153,7 @@ export class RunEngine {
153153
);
154154
this.runLock = new RunLocker({ redis: this.runLockRedis });
155155

156-
const keys = new RunQueueShortKeyProducer("rq:");
156+
const keys = new RunQueueFullKeyProducer();
157157

158158
this.runQueue = new RunQueue({
159159
name: "rq",

internal-packages/run-engine/src/engine/tests/waitpoints.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import {
44
setupAuthenticatedEnvironment,
55
setupBackgroundWorker,
66
} from "@internal/testcontainers";
7-
import { trace } from "@opentelemetry/api";
7+
import { trace } from "@internal/tracing";
88
import { expect } from "vitest";
99
import { RunEngine } from "../index.js";
1010
import { setTimeout } from "timers/promises";
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
export { RunEngine, RunDuplicateIdempotencyKeyError } from "./engine/index";
2-
export type { EventBusEventArgs } from "./engine/eventBus";
1+
export { RunEngine, RunDuplicateIdempotencyKeyError } from "./engine/index.js";
2+
export type { EventBusEventArgs } from "./engine/eventBus.js";

internal-packages/run-engine/src/run-queue/fairDequeuingStrategy.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ export type FairDequeuingStrategyBiases = {
3636
export type FairDequeuingStrategyOptions = {
3737
redis: RedisOptions;
3838
keys: RunQueueKeyProducer;
39-
defaultEnvConcurrency?: number;
39+
defaultEnvConcurrencyLimit?: number;
4040
parentQueueLimit?: number;
4141
tracer?: Tracer;
4242
seed?: string;
@@ -97,7 +97,7 @@ export class FairDequeuingStrategy implements RunQueueFairDequeueStrategy {
9797
> = new Map();
9898
private _redis: Redis;
9999

100-
private _defaultEnvConcurrency: number;
100+
private _defaultEnvConcurrencyLimit: number;
101101
private _parentQueueLimit: number;
102102

103103
constructor(private options: FairDequeuingStrategyOptions) {
@@ -115,7 +115,7 @@ export class FairDequeuingStrategy implements RunQueueFairDequeueStrategy {
115115
this._rng = seedrandom(options.seed);
116116
this._redis = createRedisClient(options.redis);
117117

118-
this._defaultEnvConcurrency = options.defaultEnvConcurrency ?? 10;
118+
this._defaultEnvConcurrencyLimit = options.defaultEnvConcurrencyLimit ?? 10;
119119
this._parentQueueLimit = options.parentQueueLimit ?? 100;
120120
}
121121

@@ -561,13 +561,13 @@ export class FairDequeuingStrategy implements RunQueueFairDequeueStrategy {
561561
const value = await this._redis.get(key);
562562

563563
if (!value) {
564-
return this._defaultEnvConcurrency;
564+
return this._defaultEnvConcurrencyLimit;
565565
}
566566

567567
return Number(value);
568568
});
569569

570-
return result.val ?? this._defaultEnvConcurrency;
570+
return result.val ?? this._defaultEnvConcurrencyLimit;
571571
});
572572
}
573573

internal-packages/run-engine/src/run-queue/index.test.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { RunQueue } from "./index.js";
77
import { InputPayload } from "./types.js";
88
import { createRedisClient } from "@internal/redis";
99
import { FairDequeuingStrategy } from "./fairDequeuingStrategy.js";
10-
import { RunQueueShortKeyProducer } from "./keyProducer.js";
10+
import { RunQueueFullKeyProducer } from "./keyProducer.js";
1111

1212
const testOptions = {
1313
name: "rq",
@@ -23,7 +23,7 @@ const testOptions = {
2323
maxTimeoutInMs: 1_000,
2424
randomize: true,
2525
},
26-
keys: new RunQueueShortKeyProducer("rq:"),
26+
keys: new RunQueueFullKeyProducer(),
2727
};
2828

2929
const authenticatedEnvProd = {
@@ -383,7 +383,7 @@ describe("RunQueue", () => {
383383
}
384384
);
385385

386-
redisTest(
386+
redisTest.only(
387387
"Dequeue multiple messages from the queue",
388388
{ timeout: 5_000 },
389389
async ({ redisContainer }) => {
@@ -408,6 +408,7 @@ describe("RunQueue", () => {
408408
// Create 20 messages with different runIds and some with different queues
409409
const messages = Array.from({ length: 20 }, (_, i) => ({
410410
...messageProd,
411+
taskIdentifier: i < 15 ? "task/my-task" : "task/other-task", // Mix up the queues
411412
runId: `r${i + 1}`,
412413
queue: i < 15 ? "task/my-task" : "task/other-task", // Mix up the queues
413414
}));

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 48 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -259,10 +259,11 @@ export class RunQueue {
259259
return this.#trace(
260260
"dequeueMessageInSharedQueue",
261261
async (span) => {
262-
const envQueues = await this.queuePriorityStrategy.distributeFairQueuesFromParentQueue(
263-
masterQueue,
264-
consumerId
265-
);
262+
const envQueues =
263+
await this.options.queuePriorityStrategy.distributeFairQueuesFromParentQueue(
264+
masterQueue,
265+
consumerId
266+
);
266267

267268
span.setAttribute("environment_count", envQueues.length);
268269

@@ -275,76 +276,57 @@ export class RunQueue {
275276

276277
const messages: DequeuedMessage[] = [];
277278

278-
// Keep track of queues we've tried that didn't return a message
279-
const emptyQueues = new Set<string>();
280-
281-
// Continue until we've hit max count or tried all queues
282-
while (messages.length < maxCount) {
283-
// Calculate how many more messages we need
284-
const remainingCount = maxCount - messages.length;
285-
if (remainingCount <= 0) break;
286-
287-
// Find all available queues across environments that we haven't marked as empty
288-
const availableEnvQueues = envQueues
289-
.map((env) => ({
290-
env: env,
291-
queues: env.queues.filter((queue) => !emptyQueues.has(queue)),
292-
}))
293-
.filter((env) => env.queues.length > 0);
279+
// Each env starts with its list of candidate queues
280+
const tenantQueues: Record<string, string[]> = {};
294281

295-
if (availableEnvQueues.length === 0) break;
282+
// Initialize tenantQueues with the queues for each env
283+
for (const env of envQueues) {
284+
tenantQueues[env.envId] = [...env.queues]; // Create a copy of the queues array
285+
}
296286

297-
attemptedEnvs += availableEnvQueues.length;
287+
// Continue until we've hit max count or all tenants have empty queue lists
288+
while (
289+
messages.length < maxCount &&
290+
Object.values(tenantQueues).some((queues) => queues.length > 0)
291+
) {
292+
for (const env of envQueues) {
293+
attemptedEnvs++;
294+
295+
// Skip if this tenant has no more queues
296+
if (tenantQueues[env.envId].length === 0) {
297+
continue;
298+
}
298299

299-
// Create a dequeue operation for each environment, taking one queue from each
300-
const dequeueOperations = availableEnvQueues.map(({ env, queues }) => {
301-
const queue = queues[0];
300+
// Pop the next queue (using round-robin order)
301+
const queue = tenantQueues[env.envId].shift()!;
302302
attemptedQueues++;
303303

304-
return {
305-
queue,
306-
operation: this.#callDequeueMessage({
307-
messageQueue: queue,
308-
concurrencyLimitKey: this.keys.concurrencyLimitKeyFromQueue(queue),
309-
currentConcurrencyKey: this.keys.currentConcurrencyKeyFromQueue(queue),
310-
envConcurrencyLimitKey: this.keys.envConcurrencyLimitKeyFromQueue(queue),
311-
envCurrentConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(queue),
312-
projectCurrentConcurrencyKey:
313-
this.keys.projectCurrentConcurrencyKeyFromQueue(queue),
314-
messageKeyPrefix: this.keys.messageKeyPrefixFromQueue(queue),
315-
envQueueKey: this.keys.envQueueKeyFromQueue(queue),
316-
taskCurrentConcurrentKeyPrefix:
317-
this.keys.taskIdentifierCurrentConcurrencyKeyPrefixFromQueue(queue),
318-
}),
319-
};
320-
});
321-
322-
// Execute all dequeue operations in parallel
323-
const results = await Promise.all(
324-
dequeueOperations.map(async ({ queue, operation }) => {
325-
const message = await operation;
326-
return { queue, message };
327-
})
328-
);
304+
// Attempt to dequeue from this queue
305+
const message = await this.#callDequeueMessage({
306+
messageQueue: queue,
307+
concurrencyLimitKey: this.keys.concurrencyLimitKeyFromQueue(queue),
308+
currentConcurrencyKey: this.keys.currentConcurrencyKeyFromQueue(queue),
309+
envConcurrencyLimitKey: this.keys.envConcurrencyLimitKeyFromQueue(queue),
310+
envCurrentConcurrencyKey: this.keys.envCurrentConcurrencyKeyFromQueue(queue),
311+
projectCurrentConcurrencyKey: this.keys.projectCurrentConcurrencyKeyFromQueue(queue),
312+
messageKeyPrefix: this.keys.messageKeyPrefixFromQueue(queue),
313+
envQueueKey: this.keys.envQueueKeyFromQueue(queue),
314+
taskCurrentConcurrentKeyPrefix:
315+
this.keys.taskIdentifierCurrentConcurrencyKeyPrefixFromQueue(queue),
316+
});
329317

330-
// Process results
331-
let foundAnyMessage = false;
332-
for (const { queue, message } of results) {
333318
if (message) {
334319
messages.push(message);
335-
foundAnyMessage = true;
336-
} else {
337-
// Mark this queue as empty
338-
emptyQueues.add(queue);
320+
// Re-add this queue at the end, since it might have more messages
321+
tenantQueues[env.envId].push(queue);
339322
}
340-
}
341-
342-
// If we couldn't get a message from any queue in any env, break
343-
if (!foundAnyMessage) break;
323+
// If message is null, do not re-add the queue in this cycle
344324

345-
// If we've marked all queues as empty, break
346-
const totalQueues = envQueues.reduce((sum, env) => sum + env.queues.length, 0);
347-
if (emptyQueues.size >= totalQueues) break;
325+
// If we've reached maxCount, break out of the loop
326+
if (messages.length >= maxCount) {
327+
break;
328+
}
329+
}
348330
}
349331

350332
span.setAttributes({
@@ -635,42 +617,6 @@ export class RunQueue {
635617
);
636618
}
637619

638-
queueConcurrencyScanStream(
639-
count: number = 100,
640-
onEndCallback?: () => void,
641-
onErrorCallback?: (error: Error) => void
642-
) {
643-
const pattern = this.keys.queueCurrentConcurrencyScanPattern();
644-
645-
this.logger.debug("Starting queue concurrency scan stream", {
646-
pattern,
647-
component: "runqueue",
648-
operation: "queueConcurrencyScanStream",
649-
service: this.name,
650-
count,
651-
});
652-
653-
const redis = this.redis.duplicate();
654-
655-
const stream = redis.scanStream({
656-
match: pattern,
657-
type: "set",
658-
count,
659-
});
660-
661-
stream.on("end", () => {
662-
onEndCallback?.();
663-
redis.quit();
664-
});
665-
666-
stream.on("error", (error) => {
667-
onErrorCallback?.(error);
668-
redis.quit();
669-
});
670-
671-
return { stream, redis };
672-
}
673-
674620
async quit() {
675621
await this.subscriber.unsubscribe();
676622
await this.subscriber.quit();
@@ -1103,9 +1049,9 @@ local earliestMessage = redis.call('ZRANGE', childQueue, 0, 0, 'WITHSCORES')
11031049
for _, parentQueue in ipairs(decodedPayload.masterQueues) do
11041050
local prefixedParentQueue = keyPrefix .. parentQueue
11051051
if #earliestMessage == 0 then
1106-
redis.call('ZREM', prefixedParentQueue, childQueue)
1052+
redis.call('ZREM', prefixedParentQueue, childQueueName)
11071053
else
1108-
redis.call('ZADD', prefixedParentQueue, earliestMessage[2], childQueue)
1054+
redis.call('ZADD', prefixedParentQueue, earliestMessage[2], childQueueName)
11091055
end
11101056
end
11111057

0 commit comments

Comments
 (0)