Skip to content

Commit 7a6dfe2

Browse files
committed
Add some randomization to the list of queues within each evn
1 parent 565762b commit 7a6dfe2

File tree

2 files changed

+162
-1
lines changed

2 files changed

+162
-1
lines changed

apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@ export type FairDequeuingStrategyBiases = {
2020
* 0 = no bias, 1 = full bias based on available capacity
2121
*/
2222
availableCapacityBias: number;
23+
24+
/**
25+
* Controls randomization of queue ordering within environments
26+
* 0 = strict age-based ordering (oldest first)
27+
* 1 = completely random ordering
28+
* Values between 0-1 blend between age-based and random ordering
29+
*/
30+
queueAgeRandomization: number;
2331
};
2432

2533
export type FairDequeuingStrategyOptions = {
@@ -57,6 +65,11 @@ type WeightedEnv = {
5765
weight: number;
5866
};
5967

68+
type WeightedQueue = {
69+
queue: FairQueue;
70+
weight: number;
71+
};
72+
6073
const emptyFairQueueSnapshot: FairQueueSnapshot = {
6174
id: "empty",
6275
orgs: {},
@@ -67,6 +80,7 @@ const emptyFairQueueSnapshot: FairQueueSnapshot = {
6780
const defaultBiases: FairDequeuingStrategyBiases = {
6881
concurrencyLimitBias: 0,
6982
availableCapacityBias: 0,
83+
queueAgeRandomization: 0, // Default to completely age-based ordering
7084
};
7185

7286
export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
@@ -213,14 +227,65 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
213227

214228
const queues = envs.reduce((acc, envId) => {
215229
if (queuesByEnv[envId]) {
216-
acc.push(...queuesByEnv[envId].sort((a, b) => b.age - a.age));
230+
// Instead of sorting by age, use weighted random selection
231+
acc.push(...this.#weightedRandomQueueOrder(queuesByEnv[envId]));
217232
}
218233
return acc;
219234
}, [] as Array<FairQueue>);
220235

221236
return queues.map((queue) => queue.id);
222237
}
223238

239+
#weightedRandomQueueOrder(queues: FairQueue[]): FairQueue[] {
240+
if (queues.length <= 1) return queues;
241+
242+
const biases = this.options.biases ?? defaultBiases;
243+
244+
// When queueAgeRandomization is 0, use strict age-based ordering
245+
if (biases.queueAgeRandomization === 0) {
246+
return [...queues].sort((a, b) => b.age - a.age);
247+
}
248+
249+
// Find the maximum age for normalization
250+
const maxAge = Math.max(...queues.map((q) => q.age));
251+
252+
// Calculate weights for each queue
253+
const weightedQueues: WeightedQueue[] = queues.map((queue) => {
254+
// Normalize age to be between 0 and 1
255+
const normalizedAge = queue.age / maxAge;
256+
257+
// Calculate weight: combine base weight with configurable age influence
258+
const baseWeight = 1;
259+
const weight = baseWeight + normalizedAge * biases.queueAgeRandomization;
260+
261+
return { queue, weight };
262+
});
263+
264+
// Perform weighted random selection for ordering
265+
const result: FairQueue[] = [];
266+
let remainingQueues = [...weightedQueues];
267+
let totalWeight = remainingQueues.reduce((sum, wq) => sum + wq.weight, 0);
268+
269+
while (remainingQueues.length > 0) {
270+
let random = this._rng() * totalWeight;
271+
let index = 0;
272+
273+
// Find queue based on weighted random selection
274+
while (random > 0 && index < remainingQueues.length) {
275+
random -= remainingQueues[index].weight;
276+
index++;
277+
}
278+
index = Math.max(0, index - 1);
279+
280+
// Add selected queue to result and remove from remaining
281+
result.push(remainingQueues[index].queue);
282+
totalWeight -= remainingQueues[index].weight;
283+
remainingQueues.splice(index, 1);
284+
}
285+
286+
return result;
287+
}
288+
224289
#shuffle<T>(array: Array<T>): Array<T> {
225290
let currentIndex = array.length;
226291
let temporaryValue;

apps/webapp/test/fairDequeuingStrategy.test.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,7 @@ describe("FairDequeuingStrategy", () => {
535535
biases: {
536536
concurrencyLimitBias: 0.8,
537537
availableCapacityBias: 0.5,
538+
queueAgeRandomization: 0.0,
538539
},
539540
})
540541
);
@@ -593,4 +594,99 @@ describe("FairDequeuingStrategy", () => {
593594
expect(highLimitPercentage).toBeGreaterThan(lowLimitPercentage);
594595
}
595596
);
597+
598+
redisTest("should respect ageInfluence parameter for queue ordering", async ({ redis }) => {
599+
const keyProducer = createKeyProducer("test");
600+
const now = Date.now();
601+
602+
// Setup queues with different ages in the same environment
603+
const queueAges = [
604+
{ id: "queue-1", age: 5000 }, // oldest
605+
{ id: "queue-2", age: 3000 },
606+
{ id: "queue-3", age: 1000 }, // newest
607+
];
608+
609+
// Helper function to run iterations with a specific age influence
610+
async function runWithQueueAgeRandomization(queueAgeRandomization: number) {
611+
const strategy = new FairDequeuingStrategy({
612+
tracer,
613+
redis,
614+
keys: keyProducer,
615+
defaultOrgConcurrency: 10,
616+
defaultEnvConcurrency: 5,
617+
parentQueueLimit: 100,
618+
checkForDisabledOrgs: true,
619+
seed: "fixed-seed",
620+
biases: {
621+
concurrencyLimitBias: 0,
622+
availableCapacityBias: 0,
623+
queueAgeRandomization,
624+
},
625+
});
626+
627+
const positionCounts: Record<string, number[]> = {
628+
"queue-1": [0, 0, 0],
629+
"queue-2": [0, 0, 0],
630+
"queue-3": [0, 0, 0],
631+
};
632+
633+
const iterations = 1000;
634+
for (let i = 0; i < iterations; i++) {
635+
const result = await strategy.distributeFairQueuesFromParentQueue(
636+
"parent-queue",
637+
"consumer-1"
638+
);
639+
640+
result.forEach((queueId, position) => {
641+
const baseQueueId = queueId.split(":").pop()!;
642+
positionCounts[baseQueueId][position]++;
643+
});
644+
}
645+
646+
return positionCounts;
647+
}
648+
649+
// Setup test data
650+
for (const { id, age } of queueAges) {
651+
await setupQueue({
652+
redis,
653+
keyProducer,
654+
parentQueue: "parent-queue",
655+
score: now - age,
656+
queueId: id,
657+
orgId: "org-1",
658+
envId: "env-1",
659+
});
660+
}
661+
662+
await setupConcurrency({
663+
redis,
664+
keyProducer,
665+
org: { id: "org-1", currentConcurrency: 0, limit: 10 },
666+
env: { id: "env-1", currentConcurrency: 0, limit: 5 },
667+
});
668+
669+
// Test with different age influence values
670+
const strictAge = await runWithQueueAgeRandomization(0); // Strict age-based ordering
671+
const mixed = await runWithQueueAgeRandomization(0.5); // Mix of age and random
672+
const fullyRandom = await runWithQueueAgeRandomization(1); // Completely random
673+
674+
console.log("Distribution with strict age ordering (0.0):", strictAge);
675+
console.log("Distribution with mixed ordering (0.5):", mixed);
676+
console.log("Distribution with random ordering (1.0):", fullyRandom);
677+
678+
// With strict age ordering (0.0), oldest should always be first
679+
expect(strictAge["queue-1"][0]).toBe(1000); // Always in first position
680+
expect(strictAge["queue-3"][0]).toBe(0); // Never in first position
681+
682+
// With fully random (1.0), positions should still allow for some age bias
683+
const randomFirstPositionSpread = Math.abs(
684+
fullyRandom["queue-1"][0] - fullyRandom["queue-3"][0]
685+
);
686+
expect(randomFirstPositionSpread).toBeLessThan(200); // Allow for larger spread in distribution
687+
688+
// With mixed (0.5), should show preference for age but not absolute
689+
expect(mixed["queue-1"][0]).toBeGreaterThan(mixed["queue-3"][0]); // Older preferred
690+
expect(mixed["queue-3"][0]).toBeGreaterThan(0); // But newer still gets chances
691+
});
596692
});

0 commit comments

Comments
 (0)