Skip to content

Commit 565762b

Browse files
committed
Add ability to bias env selection by concurrency limit and capacity
1 parent 82eceec commit 565762b

File tree

4 files changed

+227
-13
lines changed

4 files changed

+227
-13
lines changed

apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts

Lines changed: 93 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,20 @@ import seedrandom from "seedrandom";
88
import { Tracer } from "@opentelemetry/api";
99
import { startSpan } from "../tracing.server";
1010

11+
export type FairDequeuingStrategyBiases = {
12+
/**
13+
* How much to bias towards environments with higher concurrency limits
14+
* 0 = no bias, 1 = full bias based on limit differences
15+
*/
16+
concurrencyLimitBias: number;
17+
18+
/**
19+
* How much to bias towards environments with more available capacity
20+
* 0 = no bias, 1 = full bias based on available capacity
21+
*/
22+
availableCapacityBias: number;
23+
};
24+
1125
export type FairDequeuingStrategyOptions = {
1226
redis: Redis;
1327
keys: MarQSKeyProducer;
@@ -17,6 +31,11 @@ export type FairDequeuingStrategyOptions = {
1731
checkForDisabledOrgs: boolean;
1832
tracer: Tracer;
1933
seed?: string;
34+
/**
35+
* Configure biasing for environment shuffling
36+
* If not provided, no biasing will be applied (completely random shuffling)
37+
*/
38+
biases?: FairDequeuingStrategyBiases;
2039
};
2140

2241
type FairQueueConcurrency = {
@@ -33,13 +52,23 @@ type FairQueueSnapshot = {
3352
queues: Array<FairQueue>;
3453
};
3554

55+
type WeightedEnv = {
56+
envId: string;
57+
weight: number;
58+
};
59+
3660
const emptyFairQueueSnapshot: FairQueueSnapshot = {
3761
id: "empty",
3862
orgs: {},
3963
envs: {},
4064
queues: [],
4165
};
4266

67+
const defaultBiases: FairDequeuingStrategyBiases = {
68+
concurrencyLimitBias: 0,
69+
availableCapacityBias: 0,
70+
};
71+
4372
export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
4473
private _cache: UnkeyCache<{
4574
concurrencyLimit: number;
@@ -107,30 +136,85 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
107136
);
108137
}
109138

110-
// Now we need to:
111-
// 1. Shuffle the environments
112-
// 2. Sort the queues by their environment order in the shuffled list
113-
// 3. Keep the queues sorted by their age inside their "environment" slice of the final array
114139
#shuffleQueuesByEnv(snapshot: FairQueueSnapshot): Array<string> {
115140
const envs = Object.keys(snapshot.envs);
141+
const biases = this.options.biases ?? defaultBiases;
142+
143+
if (biases.concurrencyLimitBias === 0 && biases.availableCapacityBias === 0) {
144+
const shuffledEnvs = this.#shuffle(envs);
145+
return this.#orderQueuesByEnvs(shuffledEnvs, snapshot);
146+
}
116147

117-
const shuffledEnvs = this.#shuffle(envs);
148+
// Find the maximum concurrency limit for normalization
149+
const maxLimit = Math.max(...envs.map((envId) => snapshot.envs[envId].concurrency.limit));
118150

151+
// Calculate weights for each environment
152+
const weightedEnvs: WeightedEnv[] = envs.map((envId) => {
153+
const env = snapshot.envs[envId];
154+
155+
// Start with base weight of 1
156+
let weight = 1;
157+
158+
// Add normalized concurrency limit bias if configured
159+
if (biases.concurrencyLimitBias > 0) {
160+
const normalizedLimit = env.concurrency.limit / maxLimit;
161+
// Square or cube the bias to make it more pronounced at higher values
162+
weight *= 1 + Math.pow(normalizedLimit * biases.concurrencyLimitBias, 2);
163+
}
164+
165+
// Add available capacity bias if configured
166+
if (biases.availableCapacityBias > 0) {
167+
const usedCapacityPercentage = env.concurrency.current / env.concurrency.limit;
168+
const availableCapacityBonus = 1 - usedCapacityPercentage;
169+
// Square or cube the bias to make it more pronounced at higher values
170+
weight *= 1 + Math.pow(availableCapacityBonus * biases.availableCapacityBias, 2);
171+
}
172+
173+
return { envId, weight };
174+
});
175+
176+
const shuffledEnvs = this.#weightedShuffle(weightedEnvs);
177+
return this.#orderQueuesByEnvs(shuffledEnvs, snapshot);
178+
}
179+
180+
#weightedShuffle(weightedItems: WeightedEnv[]): string[] {
181+
const totalWeight = weightedItems.reduce((sum, item) => sum + item.weight, 0);
182+
const result: string[] = [];
183+
const items = [...weightedItems];
184+
185+
while (items.length > 0) {
186+
let random = this._rng() * totalWeight;
187+
let index = 0;
188+
189+
// Find item based on weighted random selection
190+
while (random > 0 && index < items.length) {
191+
random -= items[index].weight;
192+
index++;
193+
}
194+
index = Math.max(0, index - 1);
195+
196+
// Add selected item to result and remove from items
197+
result.push(items[index].envId);
198+
items.splice(index, 1);
199+
}
200+
201+
return result;
202+
}
203+
204+
// Helper method to maintain DRY principle
205+
#orderQueuesByEnvs(envs: string[], snapshot: FairQueueSnapshot): Array<string> {
119206
const queuesByEnv = snapshot.queues.reduce((acc, queue) => {
120207
if (!acc[queue.env]) {
121208
acc[queue.env] = [];
122209
}
123-
124210
acc[queue.env].push(queue);
125-
126211
return acc;
127212
}, {} as Record<string, Array<FairQueue>>);
128213

129-
const queues = shuffledEnvs.reduce((acc, envId) => {
214+
const queues = envs.reduce((acc, envId) => {
130215
if (queuesByEnv[envId]) {
131216
acc.push(...queuesByEnv[envId].sort((a, b) => b.age - a.age));
132217
}
133-
134218
return acc;
135219
}, [] as Array<FairQueue>);
136220

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ export class MarQS {
6969
public keys: MarQSKeyProducer;
7070
#rebalanceWorkers: Array<AsyncWorker> = [];
7171

72-
private _staleQueues: Set<string> = new Set();
73-
private _staleQueueHits: Map<string, number> = new Map();
74-
7572
constructor(private readonly options: MarQSOptions) {
7673
this.redis = options.redis;
7774

@@ -372,6 +369,8 @@ export class MarQS {
372369
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
373370
age_in_seconds: ageOfMessageInMs / 1000,
374371
attempted_queues: queues.indexOf(messageQueue) + 1, // How many queues we tried before success
372+
message_timestamp: message.timestamp,
373+
message_age: Date.now() - message.timestamp,
375374
});
376375

377376
await this.options.subscriber?.messageDequeued(message);

apps/webapp/test/fairDequeuingStrategy.test.ts

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,4 +463,134 @@ describe("FairDequeuingStrategy", () => {
463463
}
464464
}
465465
);
466+
467+
redisTest(
468+
"should bias shuffling based on concurrency limits and available capacity",
469+
async ({ redis }) => {
470+
const keyProducer = createKeyProducer("test");
471+
const now = Date.now();
472+
473+
// Setup three environments with different concurrency settings
474+
const envSetups = [
475+
{
476+
envId: "env-1",
477+
limit: 100,
478+
current: 20, // Lots of available capacity
479+
queueCount: 3,
480+
},
481+
{
482+
envId: "env-2",
483+
limit: 50,
484+
current: 40, // Less available capacity
485+
queueCount: 3,
486+
},
487+
{
488+
envId: "env-3",
489+
limit: 10,
490+
current: 5, // Some available capacity
491+
queueCount: 3,
492+
},
493+
];
494+
495+
// Setup queues and concurrency for each environment
496+
for (const setup of envSetups) {
497+
await setupConcurrency({
498+
redis,
499+
keyProducer,
500+
org: { id: "org-1", currentConcurrency: 0, limit: 200 },
501+
env: {
502+
id: setup.envId,
503+
currentConcurrency: setup.current,
504+
limit: setup.limit,
505+
},
506+
});
507+
508+
for (let i = 0; i < setup.queueCount; i++) {
509+
await setupQueue({
510+
redis,
511+
keyProducer,
512+
parentQueue: "parent-queue",
513+
score: now - 1000 * (i + 1),
514+
queueId: `queue-${i}`,
515+
orgId: "org-1",
516+
envId: setup.envId,
517+
});
518+
}
519+
}
520+
521+
// Create multiple strategies with different seeds
522+
const numStrategies = 5;
523+
const strategies = Array.from(
524+
{ length: numStrategies },
525+
(_, i) =>
526+
new FairDequeuingStrategy({
527+
tracer,
528+
redis,
529+
keys: keyProducer,
530+
defaultOrgConcurrency: 10,
531+
defaultEnvConcurrency: 5,
532+
parentQueueLimit: 100,
533+
checkForDisabledOrgs: true,
534+
seed: `test-seed-${i}`,
535+
biases: {
536+
concurrencyLimitBias: 0.8,
537+
availableCapacityBias: 0.5,
538+
},
539+
})
540+
);
541+
542+
// Run iterations across all strategies
543+
const iterationsPerStrategy = 100;
544+
const allResults: Record<string, number>[] = [];
545+
546+
for (const strategy of strategies) {
547+
const firstPositionCounts: Record<string, number> = {};
548+
549+
for (let i = 0; i < iterationsPerStrategy; i++) {
550+
const result = await strategy.distributeFairQueuesFromParentQueue(
551+
"parent-queue",
552+
`consumer-${i % 3}`
553+
);
554+
555+
expect(result.length).toBeGreaterThan(0);
556+
557+
const firstEnv = keyProducer.envIdFromQueue(result[0]);
558+
firstPositionCounts[firstEnv] = (firstPositionCounts[firstEnv] || 0) + 1;
559+
}
560+
561+
allResults.push(firstPositionCounts);
562+
}
563+
564+
// Calculate average distributions across all strategies
565+
const avgDistribution: Record<string, number> = {};
566+
const envIds = ["env-1", "env-2", "env-3"];
567+
568+
for (const envId of envIds) {
569+
const sum = allResults.reduce((acc, result) => acc + (result[envId] || 0), 0);
570+
avgDistribution[envId] = sum / numStrategies;
571+
}
572+
573+
// Log individual strategy results and the average
574+
console.log("\nResults by strategy:");
575+
allResults.forEach((result, i) => {
576+
console.log(`Strategy ${i + 1}:`, result);
577+
});
578+
579+
console.log("\nAverage distribution:", avgDistribution);
580+
581+
// Calculate percentages from average distribution
582+
const totalCount = Object.values(avgDistribution).reduce((sum, count) => sum + count, 0);
583+
const highLimitPercentage = (avgDistribution["env-1"] / totalCount) * 100;
584+
const lowLimitPercentage = (avgDistribution["env-3"] / totalCount) * 100;
585+
586+
console.log("\nPercentages:");
587+
console.log("High limit percentage:", highLimitPercentage);
588+
console.log("Low limit percentage:", lowLimitPercentage);
589+
590+
// Verify distribution across all strategies
591+
expect(highLimitPercentage).toBeLessThan(60);
592+
expect(lowLimitPercentage).toBeGreaterThan(10);
593+
expect(highLimitPercentage).toBeGreaterThan(lowLimitPercentage);
594+
}
595+
);
466596
});

references/v3-catalog/trigger.config.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ export { handleError } from "./src/handleError.js";
1313

1414
export default defineConfig({
1515
runtime: "node",
16-
project: "yubjwjsfkxnylobaqvqz",
16+
// project: "yubjwjsfkxnylobaqvqz",
17+
project: "proj_oveexhsnehnarnusktpc",
1718
machine: "medium-1x",
1819
instrumentations: [new OpenAIInstrumentation()],
1920
additionalFiles: ["wrangler/wrangler.toml"],

0 commit comments

Comments
 (0)