Skip to content

Commit 3aa634d

Browse files
committed
Add a metric for number of event repo subscribers and project subscribers
1 parent 7de3fb0 commit 3aa634d

File tree

3 files changed

+57
-14
lines changed

3 files changed

+57
-14
lines changed

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ import { env } from "~/env.server";
2626
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
2727
import { logger } from "~/services/logger.server";
2828
import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server";
29+
import { singleton } from "~/utils/singleton";
30+
import { Gauge } from "prom-client";
31+
import { metricsRegister } from "~/metrics.server";
2932

3033
export type CreatableEvent = Omit<
3134
Prisma.TaskEventCreateInput,
@@ -148,6 +151,11 @@ export class EventRepository {
148151
private readonly _flushScheduler: DynamicFlushScheduler<CreatableEvent>;
149152
private _randomIdGenerator = new RandomIdGenerator();
150153
private _redisPublishClient: Redis;
154+
private _subscriberCount = 0;
155+
156+
get subscriberCount() {
157+
return this._subscriberCount;
158+
}
151159

152160
constructor(private db: PrismaClient = prisma, private readonly _config: EventRepoConfig) {
153161
this._flushScheduler = new DynamicFlushScheduler({
@@ -721,6 +729,9 @@ export class EventRepository {
721729
// Subscribe to the channel.
722730
await redis.psubscribe(channel);
723731

732+
// Increment the subscriber count.
733+
this._subscriberCount++;
734+
724735
const eventEmitter = new EventEmitter();
725736

726737
// Define the message handler.
@@ -733,6 +744,8 @@ export class EventRepository {
733744
// Return a function that can be used to unsubscribe.
734745
const unsubscribe = async () => {
735746
await redis.punsubscribe(channel);
747+
redis.quit();
748+
this._subscriberCount--;
736749
};
737750

738751
return {
@@ -796,19 +809,34 @@ export class EventRepository {
796809
}
797810
}
798811

799-
export const eventRepository = new EventRepository(prisma, {
800-
batchSize: env.EVENTS_BATCH_SIZE,
801-
batchInterval: env.EVENTS_BATCH_INTERVAL,
802-
retentionInDays: env.EVENTS_DEFAULT_LOG_RETENTION,
803-
redis: {
804-
port: env.REDIS_PORT,
805-
host: env.REDIS_HOST,
806-
username: env.REDIS_USERNAME,
807-
password: env.REDIS_PASSWORD,
808-
enableAutoPipelining: true,
809-
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
810-
},
811-
});
812+
export const eventRepository = singleton("eventRepo", initializeEventRepo);
813+
814+
function initializeEventRepo() {
815+
const repo = new EventRepository(prisma, {
816+
batchSize: env.EVENTS_BATCH_SIZE,
817+
batchInterval: env.EVENTS_BATCH_INTERVAL,
818+
retentionInDays: env.EVENTS_DEFAULT_LOG_RETENTION,
819+
redis: {
820+
port: env.REDIS_PORT,
821+
host: env.REDIS_HOST,
822+
username: env.REDIS_USERNAME,
823+
password: env.REDIS_PASSWORD,
824+
enableAutoPipelining: true,
825+
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
826+
},
827+
});
828+
829+
new Gauge({
830+
name: "event_repository_subscriber_count",
831+
help: "Number of event repository subscribers",
832+
collect() {
833+
this.set(repo.subscriberCount);
834+
},
835+
registers: [metricsRegister],
836+
});
837+
838+
return repo;
839+
}
812840

813841
export function stripAttributePrefix(attributes: Attributes, prefix: string) {
814842
const result: Attributes = {};

apps/webapp/app/v3/services/projectPubSub.server.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import { z } from "zod";
22
import { singleton } from "~/utils/singleton";
33
import { ZodPubSub, ZodSubscriber } from "../utils/zodPubSub.server";
44
import { env } from "~/env.server";
5+
import { Gauge } from "prom-client";
6+
import { metricsRegister } from "~/metrics.server";
57

68
const messageCatalog = {
79
WORKER_CREATED: z.object({
@@ -18,7 +20,7 @@ export type ProjectSubscriber = ZodSubscriber<typeof messageCatalog>;
1820
export const projectPubSub = singleton("projectPubSub", initializeProjectPubSub);
1921

2022
function initializeProjectPubSub() {
21-
return new ZodPubSub({
23+
const pubSub = new ZodPubSub({
2224
redis: {
2325
port: env.REDIS_PORT,
2426
host: env.REDIS_HOST,
@@ -29,4 +31,15 @@ function initializeProjectPubSub() {
2931
},
3032
schema: messageCatalog,
3133
});
34+
35+
new Gauge({
36+
name: "project_pub_sub_subscribers",
37+
help: "Number of project pub sub subscribers",
38+
collect() {
39+
this.set(pubSub.subscriberCount);
40+
},
41+
registers: [metricsRegister],
42+
});
43+
44+
return pubSub;
3245
}

apps/webapp/app/v3/utils/zodPubSub.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>
5959
await this._subscriber.punsubscribe();
6060

6161
this.onUnsubscribed.post({ pattern: this._pattern });
62+
63+
this._subscriber.quit();
6264
}
6365

6466
async #onMessage(pattern: string, channel: string, serializedMessage: string) {

0 commit comments

Comments
 (0)