Skip to content

Commit c8e8bc9

Browse files
authored
engine v2: Fix selectTopEnvs missing queues (#2069)
1 parent a69621b commit c8e8bc9

File tree

2 files changed

+136
-2
lines changed

2 files changed

+136
-2
lines changed

internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ export class FairQueueSelectionStrategy implements RunQueueSelectionStrategy {
448448
// Group queues by env
449449
const queuesByEnv = queues.reduce(
450450
(acc, queue) => {
451-
if (!acc[`${queue.org}:${queue.project}:${queue.env}`]) {
451+
if (!acc[queue.env]) {
452452
acc[queue.env] = [];
453453
}
454454
acc[queue.env].push(queue);

internal-packages/run-engine/src/run-queue/tests/fairQueueSelectionStrategy.test.ts

Lines changed: 135 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1021,7 +1021,141 @@ describe("FairDequeuingStrategy", () => {
10211021
expect(selectionPercentages["env-2"]).toBeGreaterThan(40);
10221022

10231023
// Verify that env-4 (lowest average age) gets selected in less than 20% of iterations
1024-
expect(selectionPercentages["env-4"] || 0).toBeLessThan(20);
1024+
expect(selectionPercentages["env-4"] || 0).toBeLessThan(20);
1025+
}
1026+
);
1027+
1028+
redisTest(
1029+
"#selectTopEnvs groups queues by environment",
1030+
async ({ redisOptions: redis }) => {
1031+
const keyProducer = new RunQueueFullKeyProducer();
1032+
const strategy = new FairQueueSelectionStrategy({
1033+
redis,
1034+
keys: keyProducer,
1035+
defaultEnvConcurrencyLimit: 5,
1036+
parentQueueLimit: 100,
1037+
seed: "group-test",
1038+
maximumEnvCount: 2,
1039+
});
1040+
1041+
const now = Date.now();
1042+
1043+
// env-1 with two queues from different orgs/projects
1044+
await setupQueue({
1045+
redis,
1046+
keyProducer,
1047+
parentQueue: "parent-queue",
1048+
score: now - 1000,
1049+
queueId: "queue-1-old",
1050+
orgId: "org-a",
1051+
projectId: "proj-a",
1052+
envId: "env-1",
1053+
});
1054+
1055+
await setupQueue({
1056+
redis,
1057+
keyProducer,
1058+
parentQueue: "parent-queue",
1059+
score: now - 10,
1060+
queueId: "queue-1-new",
1061+
orgId: "org-b",
1062+
projectId: "proj-b",
1063+
envId: "env-1",
1064+
});
1065+
1066+
await setupQueue({
1067+
redis,
1068+
keyProducer,
1069+
parentQueue: "parent-queue",
1070+
score: now - 400,
1071+
queueId: "queue-2",
1072+
orgId: "org-2",
1073+
projectId: "proj-2",
1074+
envId: "env-2",
1075+
});
1076+
1077+
await setupQueue({
1078+
redis,
1079+
keyProducer,
1080+
parentQueue: "parent-queue",
1081+
score: now - 300,
1082+
queueId: "queue-3",
1083+
orgId: "org-3",
1084+
projectId: "proj-3",
1085+
envId: "env-3",
1086+
});
1087+
1088+
// Setup concurrency limits
1089+
await setupConcurrency({
1090+
redis,
1091+
keyProducer,
1092+
env: {
1093+
envId: "env-1",
1094+
projectId: "proj-a",
1095+
orgId: "org-a",
1096+
currentConcurrency: 0,
1097+
limit: 5,
1098+
},
1099+
});
1100+
1101+
await setupConcurrency({
1102+
redis,
1103+
keyProducer,
1104+
env: {
1105+
envId: "env-1",
1106+
projectId: "proj-b",
1107+
orgId: "org-b",
1108+
currentConcurrency: 0,
1109+
limit: 5,
1110+
},
1111+
});
1112+
1113+
await setupConcurrency({
1114+
redis,
1115+
keyProducer,
1116+
env: {
1117+
envId: "env-2",
1118+
projectId: "proj-2",
1119+
orgId: "org-2",
1120+
currentConcurrency: 0,
1121+
limit: 5,
1122+
},
1123+
});
1124+
1125+
await setupConcurrency({
1126+
redis,
1127+
keyProducer,
1128+
env: {
1129+
envId: "env-3",
1130+
projectId: "proj-3",
1131+
orgId: "org-3",
1132+
currentConcurrency: 0,
1133+
limit: 5,
1134+
},
1135+
});
1136+
1137+
const envResult = await strategy.distributeFairQueuesFromParentQueue(
1138+
"parent-queue",
1139+
"consumer-1"
1140+
);
1141+
1142+
const result = flattenResults(envResult);
1143+
1144+
const queuesByEnv = result.reduce(
1145+
(acc, queueId) => {
1146+
const envId = keyProducer.envIdFromQueue(queueId);
1147+
if (!acc[envId]) {
1148+
acc[envId] = [];
1149+
}
1150+
acc[envId].push(queueId);
1151+
return acc;
1152+
},
1153+
{} as Record<string, string[]>
1154+
);
1155+
1156+
expect(Object.keys(queuesByEnv).length).toBe(2);
1157+
expect(queuesByEnv["env-1"]).toBeDefined();
1158+
expect(queuesByEnv["env-1"].length).toBe(2);
10251159
}
10261160
);
10271161
});

0 commit comments

Comments
 (0)