Skip to content

Commit 3793026

Browse files
committed
v3: WIP maximum queues per env implementation
1 parent e9c6cc7 commit 3793026

File tree

2 files changed

+104
-4
lines changed

2 files changed

+104
-4
lines changed

apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ export type FairDequeuingStrategyOptions = {
4343
biases?: FairDequeuingStrategyBiases;
4444
reuseSnapshotCount?: number;
4545
maximumEnvCount?: number;
46+
/**
47+
* Maximum number of queues to process per environment
48+
* If not provided, all queues in an environment will be processed
49+
*/
50+
maximumQueuePerEnvCount?: number;
4651
};
4752

4853
type FairQueueConcurrency = {
@@ -216,8 +221,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
216221
return result;
217222
}
218223

219-
// Helper method to maintain DRY principle
220-
// Update return type
221224
#orderQueuesByEnvs(envs: string[], snapshot: FairQueueSnapshot): Array<EnvQueues> {
222225
const queuesByEnv = snapshot.queues.reduce((acc, queue) => {
223226
if (!acc[queue.env]) {
@@ -231,11 +234,17 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
231234
if (queuesByEnv[envId]) {
232235
// Get ordered queues for this env
233236
const orderedQueues = this.#weightedRandomQueueOrder(queuesByEnv[envId]);
237+
238+
// Apply queue limit if maximumQueuePerEnvCount is set
239+
const limitedQueues = this.options.maximumQueuePerEnvCount
240+
? orderedQueues.slice(0, this.options.maximumQueuePerEnvCount)
241+
: orderedQueues;
242+
234243
// Only add the env if it has queues
235-
if (orderedQueues.length > 0) {
244+
if (limitedQueues.length > 0) {
236245
acc.push({
237246
envId,
238-
queues: orderedQueues.map((queue) => queue.id),
247+
queues: limitedQueues.map((queue) => queue.id),
239248
});
240249
}
241250
}
@@ -512,6 +521,10 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
512521

513522
span.setAttribute("queue_count", result.length);
514523

524+
if (result.length === this.options.parentQueueLimit) {
525+
span.setAttribute("parent_queue_limit_reached", true);
526+
}
527+
515528
return result;
516529
});
517530
}

apps/webapp/test/fairDequeuingStrategy.test.ts

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,6 +1032,93 @@ describe("FairDequeuingStrategy", () => {
10321032
expect(selectionPercentages["env-4"] || 0).toBeLessThan(20);
10331033
}
10341034
);
1035+
1036+
redisTest(
1037+
"should respect maximumQueuePerEnvCount when distributing queues",
1038+
async ({ redisOptions }) => {
1039+
const redis = createRedisClient(redisOptions);
1040+
1041+
const keyProducer = createKeyProducer("test");
1042+
const strategy = new FairDequeuingStrategy({
1043+
tracer,
1044+
redis,
1045+
keys: keyProducer,
1046+
defaultEnvConcurrency: 5,
1047+
parentQueueLimit: 100,
1048+
seed: "test-seed-max-queues",
1049+
maximumQueuePerEnvCount: 2, // Only take 2 queues per env
1050+
});
1051+
1052+
const now = Date.now();
1053+
1054+
// Setup two environments with different numbers of queues
1055+
const envSetups = [
1056+
{
1057+
envId: "env-1",
1058+
queues: [
1059+
{ age: 5000 }, // Oldest
1060+
{ age: 4000 },
1061+
{ age: 3000 }, // This should be excluded due to maximumQueuePerEnvCount
1062+
],
1063+
},
1064+
{
1065+
envId: "env-2",
1066+
queues: [
1067+
{ age: 2000 },
1068+
{ age: 1000 }, // Newest
1069+
],
1070+
},
1071+
];
1072+
1073+
// Setup queues and concurrency for each env
1074+
for (const setup of envSetups) {
1075+
await setupConcurrency({
1076+
redis,
1077+
keyProducer,
1078+
env: { id: setup.envId, currentConcurrency: 0, limit: 5 },
1079+
});
1080+
1081+
for (let i = 0; i < setup.queues.length; i++) {
1082+
await setupQueue({
1083+
redis,
1084+
keyProducer,
1085+
parentQueue: "parent-queue",
1086+
score: now - setup.queues[i].age,
1087+
queueId: `queue-${setup.envId}-${i}`,
1088+
orgId: `org-${setup.envId}`,
1089+
envId: setup.envId,
1090+
});
1091+
}
1092+
}
1093+
1094+
const result = await strategy.distributeFairQueuesFromParentQueue(
1095+
"parent-queue",
1096+
"consumer-1"
1097+
);
1098+
1099+
// Verify that each environment has at most 2 queues
1100+
for (const envQueues of result) {
1101+
expect(envQueues.queues.length).toBeLessThanOrEqual(2);
1102+
}
1103+
1104+
// Get queues for env-1 (which had 3 queues originally)
1105+
const env1Queues = result.find((eq) => eq.envId === "env-1")?.queues ?? [];
1106+
1107+
// Should have exactly 2 queues
1108+
expect(env1Queues.length).toBe(2);
1109+
1110+
// The queues should be the two oldest ones (queue-env-1-0 and queue-env-1-1)
1111+
expect(env1Queues).toContain(keyProducer.queueKey("org-env-1", "env-1", "queue-env-1-0"));
1112+
expect(env1Queues).toContain(keyProducer.queueKey("org-env-1", "env-1", "queue-env-1-1"));
1113+
expect(env1Queues).not.toContain(keyProducer.queueKey("org-env-1", "env-1", "queue-env-1-2"));
1114+
1115+
// Get queues for env-2 (which had 2 queues originally)
1116+
const env2Queues = result.find((eq) => eq.envId === "env-2")?.queues ?? [];
1117+
1118+
// Should still have both queues since it was within the limit
1119+
expect(env2Queues.length).toBe(2);
1120+
}
1121+
);
10351122
});
10361123

10371124
// Helper function to flatten results for counting

0 commit comments

Comments
 (0)