Skip to content

Commit fc6b69b

Browse files
committed
Testing the replication service
1 parent da0565e commit fc6b69b

File tree

3 files changed

+1277
-25
lines changed

3 files changed

+1277
-25
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { tryCatch } from "@trigger.dev/core/utils";
66
import { parsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
77
import { TaskRun } from "@trigger.dev/database";
88
import { nanoid } from "nanoid";
9+
import EventEmitter from "node:events";
910
import pLimit from "p-limit";
1011
import { Counter, Gauge } from "prom-client";
1112
import type { MetricsRegister } from "~/metrics.server";
@@ -25,10 +26,15 @@ export type RunsReplicationServiceOptions = {
2526
leaderLockTimeoutMs?: number;
2627
leaderLockExtendIntervalMs?: number;
2728
ackIntervalSeconds?: number;
29+
logger?: Logger;
2830
};
2931

3032
type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" };
3133

34+
export type RunsReplicationServiceEvents = {
35+
message: [{ lsn: string; message: PgoutputMessage; service: RunsReplicationService }];
36+
};
37+
3238
export class RunsReplicationService {
3339
private _lastLsn: string | null = null;
3440
private _isSubscribed = false;
@@ -44,12 +50,16 @@ export class RunsReplicationService {
4450
private logger: Logger;
4551
private _lastReplicationLagMs: number | null = null;
4652
private _transactionCounter?: Counter;
53+
private _lagGauge?: Gauge;
4754
private _insertStrategy: "streaming" | "batching";
4855
private _isShuttingDown = false;
4956
private _isShutDownComplete = false;
5057

58+
public readonly events: EventEmitter<RunsReplicationServiceEvents>;
59+
5160
constructor(private readonly options: RunsReplicationServiceOptions) {
52-
this.logger = new Logger("RunsReplicationService", "debug");
61+
this.logger = options.logger ?? new Logger("RunsReplicationService", "debug");
62+
this.events = new EventEmitter();
5363

5464
this._insertStrategy = options.insertStrategy ?? "streaming";
5565

@@ -113,7 +123,7 @@ export class RunsReplicationService {
113123

114124
if (options.metricsRegister) {
115125
const replicationService = this;
116-
new Gauge({
126+
this._lagGauge = new Gauge({
117127
name: "runs_replication_service_replication_lag_ms",
118128
help: "The replication lag in milliseconds",
119129
collect() {
@@ -134,9 +144,25 @@ export class RunsReplicationService {
134144
}
135145
}
136146

137-
public shutdown() {
138-
this.logger.info("Initiating shutdown of runs replication service");
147+
public async getTransactionCountMetric() {
148+
return this._transactionCounter?.get();
149+
}
150+
151+
public async getLagGaugeMetric() {
152+
return this._lagGauge?.get();
153+
}
154+
155+
public async shutdown() {
139156
this._isShuttingDown = true;
157+
158+
this.logger.info("Initiating shutdown of runs replication service");
159+
160+
if (!this._currentTransaction) {
161+
this.logger.info("No transaction to commit, shutting down immediately");
162+
await this._replicationClient.stop();
163+
this._isShutDownComplete = true;
164+
return;
165+
}
140166
}
141167

142168
async start(insertStrategy?: "streaming" | "batching") {
@@ -162,8 +188,19 @@ export class RunsReplicationService {
162188
}
163189

164190
async #handleData(lsn: string, message: PgoutputMessage) {
191+
this.logger.debug("Handling data", {
192+
lsn,
193+
tag: message.tag,
194+
});
195+
196+
this.events.emit("message", { lsn, message, service: this });
197+
165198
switch (message.tag) {
166199
case "begin": {
200+
if (this._isShuttingDown || this._isShutDownComplete) {
201+
return;
202+
}
203+
167204
this._currentTransaction = {
168205
commitLsn: message.commitLsn,
169206
xid: message.xid,
@@ -195,15 +232,29 @@ export class RunsReplicationService {
195232
});
196233
break;
197234
}
235+
case "delete": {
236+
if (!this._currentTransaction) {
237+
return;
238+
}
239+
240+
this._currentTransaction.events.push({
241+
tag: message.tag,
242+
data: message.old as TaskRun,
243+
raw: message,
244+
});
245+
246+
break;
247+
}
198248
case "commit": {
199249
if (!this._currentTransaction) {
200250
return;
201251
}
202252
const replicationLagMs = Date.now() - Number(message.commitTime / 1000n);
203253
this._currentTransaction.commitEndLsn = message.commitEndLsn;
204254
this._currentTransaction.replicationLagMs = replicationLagMs;
205-
await this.#handleTransaction(this._currentTransaction as Transaction<TaskRun>);
255+
const transaction = this._currentTransaction as Transaction<TaskRun>;
206256
this._currentTransaction = null;
257+
await this.#handleTransaction(transaction);
207258
break;
208259
}
209260
}

0 commit comments

Comments
 (0)