Skip to content

Commit 6a8db74

Browse files
committed
Add support for re-using queue snapshots across dequeues
1 parent c2e859b commit 6a8db74

File tree

4 files changed

+143
-8
lines changed

4 files changed

+143
-8
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ const EnvironmentSchema = z.object({
232232
MARQS_CONCURRENCY_LIMIT_BIAS: z.coerce.number().default(0.75),
233233
MARQS_AVAILABLE_CAPACITY_BIAS: z.coerce.number().default(0.3),
234234
MARQS_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25),
235+
MARQS_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
235236

236237
PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),
237238

apps/webapp/app/v3/marqs/fairDequeuingStrategy.server.ts

Lines changed: 51 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export type FairDequeuingStrategyOptions = {
4444
* If not provided, no biasing will be applied (completely random shuffling)
4545
*/
4646
biases?: FairDequeuingStrategyBiases;
47+
reuseSnapshotCount?: number;
4748
};
4849

4950
type FairQueueConcurrency = {
@@ -90,6 +91,10 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
9091
}>;
9192

9293
private _rng: seedrandom.PRNG;
94+
private _reusedSnapshotForConsumer: Map<
95+
string,
96+
{ snapshot: FairQueueSnapshot; reuseCount: number }
97+
> = new Map();
9398

9499
constructor(private options: FairDequeuingStrategyOptions) {
95100
const ctx = new DefaultStatefulContext();
@@ -310,6 +315,31 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
310315
span.setAttribute("consumer_id", consumerId);
311316
span.setAttribute("parent_queue", parentQueue);
312317

318+
if (
319+
typeof this.options.reuseSnapshotCount === "number" &&
320+
this.options.reuseSnapshotCount > 0
321+
) {
322+
const key = `${parentQueue}:${consumerId}`;
323+
const reusedSnapshot = this._reusedSnapshotForConsumer.get(key);
324+
325+
if (reusedSnapshot) {
326+
if (reusedSnapshot.reuseCount < this.options.reuseSnapshotCount) {
327+
span.setAttribute("reused_snapshot", true);
328+
329+
this._reusedSnapshotForConsumer.set(key, {
330+
snapshot: reusedSnapshot.snapshot,
331+
reuseCount: reusedSnapshot.reuseCount + 1,
332+
});
333+
334+
return reusedSnapshot.snapshot;
335+
} else {
336+
this._reusedSnapshotForConsumer.delete(key);
337+
}
338+
}
339+
}
340+
341+
span.setAttribute("reused_snapshot", false);
342+
313343
const now = Date.now();
314344

315345
const queues = await this.#allChildQueuesByScore(parentQueue, consumerId, now);
@@ -341,10 +371,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
341371
(org) => org.concurrency.current >= org.concurrency.limit
342372
);
343373

344-
span.setAttributes({
345-
...flattenAttributes(orgsAtFullConcurrency, "orgs_at_full_concurrency"),
346-
});
347-
348374
const orgIdsAtFullConcurrency = new Set(orgsAtFullConcurrency.map((org) => org.id));
349375

350376
const orgsSnapshot = orgs.reduce((acc, org) => {
@@ -355,6 +381,12 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
355381
return acc;
356382
}, {} as Record<string, { concurrency: FairQueueConcurrency }>);
357383

384+
span.setAttributes({
385+
org_count: orgs.length,
386+
orgs_at_full_concurrency_count: orgsAtFullConcurrency.length,
387+
orgs_snapshot_count: Object.keys(orgsSnapshot).length,
388+
});
389+
358390
if (Object.keys(orgsSnapshot).length === 0) {
359391
return emptyFairQueueSnapshot;
360392
}
@@ -376,10 +408,6 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
376408
(env) => env.concurrency.current >= env.concurrency.limit
377409
);
378410

379-
span.setAttributes({
380-
...flattenAttributes(envsAtFullConcurrency, "envs_at_full_concurrency"),
381-
});
382-
383411
const envIdsAtFullConcurrency = new Set(envsAtFullConcurrency.map((env) => env.id));
384412

385413
const envsSnapshot = envs.reduce((acc, env) => {
@@ -390,6 +418,11 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
390418
return acc;
391419
}, {} as Record<string, { concurrency: FairQueueConcurrency }>);
392420

421+
span.setAttributes({
422+
env_count: envs.length,
423+
envs_at_full_concurrency_count: envsAtFullConcurrency.length,
424+
});
425+
393426
const queuesSnapshot = queues.filter(
394427
(queue) =>
395428
!orgIdsAtFullConcurrency.has(queue.org) && !envIdsAtFullConcurrency.has(queue.env)
@@ -402,6 +435,16 @@ export class FairDequeuingStrategy implements MarQSFairDequeueStrategy {
402435
queues: queuesSnapshot,
403436
};
404437

438+
if (
439+
typeof this.options.reuseSnapshotCount === "number" &&
440+
this.options.reuseSnapshotCount > 0
441+
) {
442+
this._reusedSnapshotForConsumer.set(`${parentQueue}:${consumerId}`, {
443+
snapshot,
444+
reuseCount: 0,
445+
});
446+
}
447+
405448
return snapshot;
406449
});
407450
}

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1625,6 +1625,7 @@ function getMarQSClient() {
16251625
availableCapacityBias: env.MARQS_AVAILABLE_CAPACITY_BIAS,
16261626
queueAgeRandomization: env.MARQS_QUEUE_AGE_RANDOMIZATION_BIAS,
16271627
},
1628+
reuseSnapshotCount: env.MARQS_REUSE_SNAPSHOT_COUNT,
16281629
}),
16291630
envQueuePriorityStrategy: new FairDequeuingStrategy({
16301631
tracer: tracer,

apps/webapp/test/fairDequeuingStrategy.test.ts

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,96 @@ describe("FairDequeuingStrategy", () => {
203203
expect(result).toEqual([queue1, queue2]);
204204
});
205205

206+
redisTest("should reuse snapshots across calls for the same consumer", async ({ redis }) => {
207+
const keyProducer = createKeyProducer("test");
208+
const strategy = new FairDequeuingStrategy({
209+
tracer,
210+
redis,
211+
keys: keyProducer,
212+
defaultOrgConcurrency: 10,
213+
defaultEnvConcurrency: 5,
214+
parentQueueLimit: 10,
215+
checkForDisabledOrgs: true,
216+
seed: "test-seed-reuse-1",
217+
reuseSnapshotCount: 1,
218+
});
219+
220+
const now = Date.now();
221+
222+
await setupQueue({
223+
redis,
224+
keyProducer,
225+
parentQueue: "parent-queue",
226+
score: now - 3000,
227+
queueId: "queue-1",
228+
orgId: "org-1",
229+
envId: "env-1",
230+
});
231+
232+
await setupQueue({
233+
redis,
234+
keyProducer,
235+
parentQueue: "parent-queue",
236+
score: now - 2000,
237+
queueId: "queue-2",
238+
orgId: "org-2",
239+
envId: "env-2",
240+
});
241+
242+
await setupQueue({
243+
redis,
244+
keyProducer,
245+
parentQueue: "parent-queue",
246+
score: now - 1000,
247+
queueId: "queue-3",
248+
orgId: "org-3",
249+
envId: "env-3",
250+
});
251+
252+
const startDistribute1 = performance.now();
253+
254+
const result = await strategy.distributeFairQueuesFromParentQueue("parent-queue", "consumer-1");
255+
256+
const distribute1Duration = performance.now() - startDistribute1;
257+
258+
console.log("First distribution took", distribute1Duration, "ms");
259+
260+
expect(result).toHaveLength(3);
261+
// Should only get the two oldest queues
262+
const queue1 = keyProducer.queueKey("org-1", "env-1", "queue-1");
263+
const queue2 = keyProducer.queueKey("org-2", "env-2", "queue-2");
264+
const queue3 = keyProducer.queueKey("org-3", "env-3", "queue-3");
265+
expect(result).toEqual([queue2, queue1, queue3]);
266+
267+
const startDistribute2 = performance.now();
268+
269+
const result2 = await strategy.distributeFairQueuesFromParentQueue(
270+
"parent-queue",
271+
"consumer-1"
272+
);
273+
274+
const distribute2Duration = performance.now() - startDistribute2;
275+
276+
console.log("Second distribution took", distribute2Duration, "ms");
277+
278+
// Make sure the second call is more than 10 times faster than the first
279+
expect(distribute2Duration).toBeLessThan(distribute1Duration / 10);
280+
281+
const startDistribute3 = performance.now();
282+
283+
const result3 = await strategy.distributeFairQueuesFromParentQueue(
284+
"parent-queue",
285+
"consumer-1"
286+
);
287+
288+
const distribute3Duration = performance.now() - startDistribute3;
289+
290+
console.log("Third distribution took", distribute3Duration, "ms");
291+
292+
// Make sure the third call is more than 4 times the second
293+
expect(distribute3Duration).toBeGreaterThan(distribute2Duration * 4);
294+
});
295+
206296
redisTest("should fairly distribute queues across environments over time", async ({ redis }) => {
207297
const keyProducer = createKeyProducer("test");
208298
const strategy = new FairDequeuingStrategy({

0 commit comments

Comments
 (0)