Skip to content

Commit 1150497

Browse files
committed
Add release concurrency metrics sending to otel
1 parent 317135e commit 1150497

File tree

4 files changed

+196
-6
lines changed

4 files changed

+196
-6
lines changed

.vscode/launch.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@
138138
"type": "node-terminal",
139139
"request": "launch",
140140
"name": "Debug RunEngine tests",
141-
"command": "pnpm run test ./src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts -t 'Should provide metrics about queues via getQueueMetrics'",
141+
"command": "pnpm run test ./src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts -t 'Should retrieve metrics for all queues via getQueueMetrics'",
142142
"cwd": "${workspaceFolder}/internal-packages/run-engine",
143143
"sourceMaps": true
144144
},

internal-packages/redis/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const defaultOptions: Partial<RedisOptions> = {
88
const delay = Math.min(times * 50, 1000);
99
return delay;
1010
},
11-
maxRetriesPerRequest: process.env.GITHUB_ACTIONS ? 50 : process.env.VITEST ? 1 : 20,
11+
maxRetriesPerRequest: process.env.GITHUB_ACTIONS ? 50 : process.env.VITEST ? 5 : 20,
1212
};
1313

1414
const logger = new Logger("Redis", "debug");

internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts

Lines changed: 160 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
import { Callback, createRedisClient, Redis, Result, type RedisOptions } from "@internal/redis";
2-
import { Tracer } from "@internal/tracing";
2+
import { startSpan, Tracer } from "@internal/tracing";
33
import { Logger } from "@trigger.dev/core/logger";
4-
import { setInterval } from "node:timers/promises";
54
import { z } from "zod";
5+
import { setInterval } from "node:timers/promises";
6+
import { flattenAttributes } from "@trigger.dev/core/v3";
67

78
export type ReleaseConcurrencyQueueRetryOptions = {
89
maxRetries?: number;
@@ -81,6 +82,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
8182

8283
if (!options.disableConsumers) {
8384
this.#startConsumers();
85+
this.#startMetricsProducer();
8486
}
8587
}
8688

@@ -397,6 +399,30 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
397399
}
398400
}
399401

402+
async #startMetricsProducer() {
403+
try {
404+
// Produce metrics every 60 seconds, using a tracer span
405+
for await (const _ of setInterval(60_000)) {
406+
const metrics = await this.getQueueMetrics();
407+
this.logger.info("Queue metrics:", { metrics });
408+
409+
await startSpan(
410+
this.options.tracer,
411+
"ReleaseConcurrencyTokenBucketQueue.metrics",
412+
async (span) => {},
413+
{
414+
attributes: {
415+
...flattenAttributes(metrics, "queues"),
416+
forceRecording: true,
417+
},
418+
}
419+
);
420+
}
421+
} catch (error) {
422+
this.logger.error("Error starting metrics producer:", { error });
423+
}
424+
}
425+
400426
#calculateBackoffScore(item: QueueItemMetadata): string {
401427
const delay = Math.min(
402428
this.backoff.maxDelay,
@@ -405,6 +431,137 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
405431
return String(Date.now() + delay);
406432
}
407433

434+
async getQueueMetrics(): Promise<
435+
Array<{ releaseQueue: string; currentTokens: number; queueLength: number }>
436+
> {
437+
const streamRedis = this.redis.duplicate();
438+
const queuePattern = `${this.keyPrefix}*:queue`;
439+
const stream = streamRedis.scanStream({
440+
match: queuePattern,
441+
type: "zset",
442+
count: 100,
443+
});
444+
445+
let resolvePromise: (
446+
value: Array<{ releaseQueue: string; currentTokens: number; queueLength: number }>
447+
) => void;
448+
let rejectPromise: (reason?: any) => void;
449+
450+
const promise = new Promise<
451+
Array<{ releaseQueue: string; currentTokens: number; queueLength: number }>
452+
>((resolve, reject) => {
453+
resolvePromise = resolve;
454+
rejectPromise = reject;
455+
});
456+
457+
const metrics: Map<
458+
string,
459+
{ releaseQueue: string; currentTokens: number; queueLength: number }
460+
> = new Map();
461+
462+
async function getMetricsForKeys(queueKeys: string[]) {
463+
if (queueKeys.length === 0) {
464+
return [];
465+
}
466+
467+
const pipeline = streamRedis.pipeline();
468+
469+
queueKeys.forEach((queueKey) => {
470+
const releaseQueue = queueKey
471+
.replace(":queue", "")
472+
.replace(streamRedis.options.keyPrefix ?? "", "");
473+
const bucketKey = `${releaseQueue}:bucket`;
474+
475+
pipeline.get(bucketKey);
476+
pipeline.zcard(`${releaseQueue}:queue`);
477+
});
478+
479+
const result = await pipeline.exec();
480+
481+
if (!result) {
482+
return [];
483+
}
484+
485+
const results = result.map(([resultError, queueLengthOrCurrentTokens]) => {
486+
if (resultError) {
487+
return null;
488+
}
489+
490+
return queueLengthOrCurrentTokens ? Number(queueLengthOrCurrentTokens) : 0;
491+
});
492+
493+
// Now zip the results with the queue keys
494+
const zippedResults = queueKeys.map((queueKey, index) => {
495+
const releaseQueue = queueKey
496+
.replace(":queue", "")
497+
.replace(streamRedis.options.keyPrefix ?? "", "");
498+
499+
// Current tokens are at indexes 0, 2, 4, 6, etc.
500+
// Queue length are at indexes 1, 3, 5, 7, etc.
501+
502+
const currentTokens = results[index * 2];
503+
const queueLength = results[index * 2 + 1];
504+
505+
if (typeof currentTokens !== "number" || typeof queueLength !== "number") {
506+
return null;
507+
}
508+
509+
return {
510+
releaseQueue,
511+
currentTokens: currentTokens,
512+
queueLength: queueLength,
513+
};
514+
});
515+
516+
return zippedResults.filter((result) => result !== null);
517+
}
518+
519+
stream.on("end", () => {
520+
streamRedis.quit();
521+
resolvePromise(Array.from(metrics.values()));
522+
});
523+
524+
stream.on("error", (error) => {
525+
this.logger.error("Error getting queue metrics:", { error });
526+
527+
stream.pause();
528+
streamRedis.quit();
529+
rejectPromise(error);
530+
});
531+
532+
stream.on("data", async (keys) => {
533+
stream.pause();
534+
535+
const uniqueKeys = Array.from(new Set<string>(keys));
536+
537+
if (uniqueKeys.length === 0) {
538+
stream.resume();
539+
return;
540+
}
541+
542+
const unresolvedKeys = uniqueKeys.filter((key) => !metrics.has(key));
543+
544+
if (unresolvedKeys.length === 0) {
545+
stream.resume();
546+
return;
547+
}
548+
549+
this.logger.debug("Fetching queue metrics for keys", { keys: uniqueKeys });
550+
551+
await getMetricsForKeys(unresolvedKeys).then((results) => {
552+
results.forEach((result) => {
553+
if (result) {
554+
metrics.set(result.releaseQueue, result);
555+
}
556+
});
557+
558+
stream.resume();
559+
});
560+
});
561+
562+
return promise;
563+
}
564+
408565
#registerCommands() {
409566
this.redis.defineCommand("consumeToken", {
410567
numberOfKeys: 4,
@@ -424,7 +581,7 @@ local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens)
424581
425582
-- If we have enough tokens, then consume them
426583
if currentTokens >= 1 then
427-
newCurrentTokens = currentTokens - 1
584+
local newCurrentTokens = currentTokens - 1
428585
429586
redis.call("SET", bucketKey, newCurrentTokens)
430587
redis.call("ZREM", queueKey, releaserId)

internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { redisTest, StartedRedisContainer } from "@internal/testcontainers";
22
import { ReleaseConcurrencyTokenBucketQueue } from "../releaseConcurrencyTokenBucketQueue.js";
33
import { setTimeout } from "node:timers/promises";
4-
import { createRedisClient, Redis } from "@internal/redis";
54

65
type TestQueueDescriptor = {
76
name: string;
@@ -680,4 +679,38 @@ describe("ReleaseConcurrencyQueue", () => {
680679
await queue.quit();
681680
}
682681
});
682+
683+
redisTest(
684+
"Should retrieve metrics for all queues via getQueueMetrics",
685+
async ({ redisContainer }) => {
686+
const { queue } = createReleaseConcurrencyQueue(redisContainer, 1);
687+
688+
// Set up multiple queues with different states
689+
await queue.attemptToRelease({ name: "metrics-queue1" }, "run1"); // Consume 1 token from queue1
690+
691+
// Add more items to queue1 that will be queued due to no tokens
692+
await queue.attemptToRelease({ name: "metrics-queue1" }, "run2"); // This will be queued
693+
await queue.attemptToRelease({ name: "metrics-queue1" }, "run3"); // This will be queued
694+
await queue.attemptToRelease({ name: "metrics-queue1" }, "run4"); // This will be queued
695+
696+
const metrics = await queue.getQueueMetrics();
697+
698+
expect(metrics).toHaveLength(1);
699+
expect(metrics[0].releaseQueue).toBe("metrics-queue1");
700+
expect(metrics[0].currentTokens).toBe(0);
701+
expect(metrics[0].queueLength).toBe(3);
702+
703+
// Now add 10 items to 100 different queues
704+
for (let i = 0; i < 100; i++) {
705+
for (let j = 0; j < 10; j++) {
706+
await queue.attemptToRelease({ name: `metrics-queue2-${i}` }, `run${i}-${j}`);
707+
}
708+
}
709+
710+
const metrics2 = await queue.getQueueMetrics();
711+
expect(metrics2.length).toBeGreaterThan(90);
712+
713+
await queue.quit();
714+
}
715+
);
683716
});

0 commit comments

Comments
 (0)