Skip to content

Commit 05214e8

Browse files
committed
add prom metrics to redis worker
1 parent 4d45f52 commit 05214e8

File tree

3 files changed

+136
-16
lines changed

3 files changed

+136
-16
lines changed

packages/redis-worker/package.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,21 +38,22 @@
3838
"test": "vitest --sequence.concurrent=false --no-file-parallelism"
3939
},
4040
"dependencies": {
41-
"@internal/tracing": "workspace:*",
4241
"@internal/redis": "workspace:*",
42+
"@internal/tracing": "workspace:*",
4343
"@trigger.dev/core": "workspace:*",
4444
"lodash.omit": "^4.5.0",
4545
"nanoid": "^5.0.7",
4646
"p-limit": "^6.2.0",
47+
"prom-client": "^15.1.0",
4748
"zod": "3.23.8"
4849
},
4950
"devDependencies": {
5051
"@internal/testcontainers": "workspace:*",
5152
"@types/lodash.omit": "^4.5.7",
52-
"vitest": "^1.4.0",
5353
"rimraf": "6.0.1",
5454
"tshy": "^3.0.2",
55-
"tsx": "4.17.0"
55+
"tsx": "4.17.0",
56+
"vitest": "^1.4.0"
5657
},
5758
"engines": {
5859
"node": ">=18.20.0"

packages/redis-worker/src/worker.ts

Lines changed: 129 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { nanoid } from "nanoid";
99
import pLimit from "p-limit";
1010
import { createRedisClient } from "@internal/redis";
1111
import { shutdownManager } from "@trigger.dev/core/v3/serverOnly";
12+
import { Registry, Histogram } from "prom-client";
1213

1314
export type WorkerCatalog = {
1415
[key: string]: {
@@ -48,6 +49,9 @@ type WorkerOptions<TCatalog extends WorkerCatalog> = {
4849
shutdownTimeoutMs?: number;
4950
logger?: Logger;
5051
tracer?: Tracer;
52+
metrics?: {
53+
register: Registry;
54+
};
5155
};
5256

5357
// This results in attempt 12 being a delay of 1 hour
@@ -65,6 +69,16 @@ class Worker<TCatalog extends WorkerCatalog> {
6569
private subscriber: Redis | undefined;
6670
private tracer: Tracer;
6771

72+
private metrics: {
73+
register?: Registry;
74+
enqueueDuration?: Histogram;
75+
dequeueDuration?: Histogram;
76+
jobDuration?: Histogram;
77+
ackDuration?: Histogram;
78+
redriveDuration?: Histogram;
79+
rescheduleDuration?: Histogram;
80+
} = {};
81+
6882
queue: SimpleQueue<QueueCatalogFromWorkerCatalog<TCatalog>>;
6983
private jobs: WorkerOptions<TCatalog>["jobs"];
7084
private logger: Logger;
@@ -100,6 +114,61 @@ class Worker<TCatalog extends WorkerCatalog> {
100114

101115
// Create a p-limit instance using this limit.
102116
this.limiter = pLimit(this.concurrency.limit);
117+
118+
this.metrics.register = options.metrics?.register;
119+
120+
if (!this.metrics.register) {
121+
return;
122+
}
123+
124+
this.metrics.enqueueDuration = new Histogram({
125+
name: "redis_worker_enqueue_duration_seconds",
126+
help: "The duration of enqueue operations",
127+
labelNames: ["worker_name", "job_type", "has_available_at"],
128+
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
129+
registers: [this.metrics.register],
130+
});
131+
132+
this.metrics.dequeueDuration = new Histogram({
133+
name: "redis_worker_dequeue_duration_seconds",
134+
help: "The duration of dequeue operations",
135+
labelNames: ["worker_name", "worker_id", "task_count"],
136+
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
137+
registers: [this.metrics.register],
138+
});
139+
140+
this.metrics.jobDuration = new Histogram({
141+
name: "redis_worker_job_duration_seconds",
142+
help: "The duration of job operations",
143+
labelNames: ["worker_name", "worker_id", "batch_size", "job_type", "attempt"],
144+
// use different buckets here as jobs can take a while to run
145+
buckets: [0.1, 0.25, 0.5, 1, 2.5, 5, 10, 20, 30, 45, 60],
146+
registers: [this.metrics.register],
147+
});
148+
149+
this.metrics.ackDuration = new Histogram({
150+
name: "redis_worker_ack_duration_seconds",
151+
help: "The duration of ack operations",
152+
labelNames: ["worker_name"],
153+
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
154+
registers: [this.metrics.register],
155+
});
156+
157+
this.metrics.redriveDuration = new Histogram({
158+
name: "redis_worker_redrive_duration_seconds",
159+
help: "The duration of redrive operations",
160+
labelNames: ["worker_name"],
161+
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
162+
registers: [this.metrics.register],
163+
});
164+
165+
this.metrics.rescheduleDuration = new Histogram({
166+
name: "redis_worker_reschedule_duration_seconds",
167+
help: "The duration of reschedule operations",
168+
labelNames: ["worker_name"],
169+
buckets: [0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1],
170+
registers: [this.metrics.register],
171+
});
103172
}
104173

105174
public start() {
@@ -160,18 +229,25 @@ class Worker<TCatalog extends WorkerCatalog> {
160229

161230
span.setAttribute("job_visibility_timeout_ms", timeout);
162231

163-
return this.queue.enqueue({
164-
id,
165-
job,
166-
item: payload,
167-
visibilityTimeoutMs: timeout,
168-
availableAt,
169-
});
232+
return this.withHistogram(
233+
this.metrics.enqueueDuration,
234+
this.queue.enqueue({
235+
id,
236+
job,
237+
item: payload,
238+
visibilityTimeoutMs: timeout,
239+
availableAt,
240+
}),
241+
{
242+
job_type: String(job),
243+
has_available_at: availableAt ? "true" : "false",
244+
}
245+
);
170246
},
171247
{
172248
kind: SpanKind.PRODUCER,
173249
attributes: {
174-
job_type: job as string,
250+
job_type: String(job),
175251
job_id: id,
176252
},
177253
}
@@ -187,7 +263,10 @@ class Worker<TCatalog extends WorkerCatalog> {
187263
this.tracer,
188264
"reschedule",
189265
async (span) => {
190-
return this.queue.reschedule(id, availableAt);
266+
return this.withHistogram(
267+
this.metrics.rescheduleDuration,
268+
this.queue.reschedule(id, availableAt)
269+
);
191270
},
192271
{
193272
kind: SpanKind.PRODUCER,
@@ -203,7 +282,7 @@ class Worker<TCatalog extends WorkerCatalog> {
203282
this.tracer,
204283
"ack",
205284
() => {
206-
return this.queue.ack(id);
285+
return this.withHistogram(this.metrics.ackDuration, this.queue.ack(id));
207286
},
208287
{
209288
attributes: {
@@ -229,7 +308,14 @@ class Worker<TCatalog extends WorkerCatalog> {
229308
}
230309

231310
try {
232-
const items = await this.queue.dequeue(taskCount);
311+
const items = await this.withHistogram(
312+
this.metrics.dequeueDuration,
313+
this.queue.dequeue(taskCount),
314+
{
315+
worker_id: workerId,
316+
task_count: taskCount,
317+
}
318+
);
233319

234320
if (items.length === 0) {
235321
await Worker.delay(pollIntervalMs);
@@ -274,7 +360,17 @@ class Worker<TCatalog extends WorkerCatalog> {
274360
this.tracer,
275361
"processItem",
276362
async () => {
277-
await handler({ id, payload: item, visibilityTimeoutMs, attempt });
363+
await this.withHistogram(
364+
this.metrics.jobDuration,
365+
handler({ id, payload: item, visibilityTimeoutMs, attempt }),
366+
{
367+
worker_id: workerId,
368+
batch_size: batchSize,
369+
job_type: job,
370+
attempt,
371+
}
372+
);
373+
278374
// On success, acknowledge the item.
279375
await this.queue.ack(id);
280376
},
@@ -363,6 +459,23 @@ class Worker<TCatalog extends WorkerCatalog> {
363459
});
364460
}
365461

462+
private async withHistogram<T>(
463+
histogram: Histogram<string> | undefined,
464+
promise: Promise<T>,
465+
labels?: Record<string, string | number>
466+
): Promise<T> {
467+
if (!histogram || !this.metrics.register) {
468+
return promise;
469+
}
470+
471+
const end = histogram.startTimer({ worker_name: this.options.name, ...labels });
472+
try {
473+
return await promise;
474+
} finally {
475+
end();
476+
}
477+
}
478+
366479
// A simple helper to delay for a given number of milliseconds.
367480
private static delay(ms: number): Promise<void> {
368481
return new Promise((resolve) => setTimeout(resolve, ms));
@@ -387,7 +500,10 @@ class Worker<TCatalog extends WorkerCatalog> {
387500
if (typeof id !== "string") {
388501
throw new Error("Invalid message format: id must be a string");
389502
}
390-
await this.queue.redriveFromDeadLetterQueue(id);
503+
await this.withHistogram(
504+
this.metrics.redriveDuration,
505+
this.queue.redriveFromDeadLetterQueue(id)
506+
);
391507
this.logger.log(`Redrived item ${id} from Dead Letter Queue`);
392508
} catch (error) {
393509
this.logger.error("Error processing redrive message", { error, message });

pnpm-lock.yaml

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)