Skip to content

Commit da0565e

Browse files
committed
Handover WIP and tests
1 parent f3e9041 commit da0565e

File tree

5 files changed

+189
-41
lines changed

5 files changed

+189
-41
lines changed

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ function initializeRunsReplicationInstance() {
6161
error,
6262
});
6363
});
64+
65+
process.on("SIGTERM", service.shutdown.bind(service));
66+
process.on("SIGINT", service.shutdown.bind(service));
6467
}
6568

6669
return service;

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 46 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
import type { ClickHouse, TaskRunV1, RawTaskRunPayloadV1 } from "@internal/clickhouse";
1+
import type { ClickHouse, RawTaskRunPayloadV1, TaskRunV1 } from "@internal/clickhouse";
22
import { RedisOptions } from "@internal/redis";
33
import { LogicalReplicationClient, Transaction, type PgoutputMessage } from "@internal/replication";
44
import { Logger } from "@trigger.dev/core/logger";
55
import { tryCatch } from "@trigger.dev/core/utils";
6-
import { TaskRunError } from "@trigger.dev/core/v3/schemas";
76
import { parsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
87
import { TaskRun } from "@trigger.dev/database";
98
import { nanoid } from "nanoid";
@@ -46,6 +45,8 @@ export class RunsReplicationService {
4645
private _lastReplicationLagMs: number | null = null;
4746
private _transactionCounter?: Counter;
4847
private _insertStrategy: "streaming" | "batching";
48+
private _isShuttingDown = false;
49+
private _isShutDownComplete = false;
4950

5051
constructor(private readonly options: RunsReplicationServiceOptions) {
5152
this.logger = new Logger("RunsReplicationService", "debug");
@@ -62,7 +63,7 @@ export class RunsReplicationService {
6263
table: "TaskRun",
6364
redisOptions: options.redisOptions,
6465
autoAcknowledge: false,
65-
publicationActions: ["insert", "update"],
66+
publicationActions: ["insert", "update", "delete"],
6667
logger: new Logger("RunsReplicationService", "debug"),
6768
leaderLockTimeoutMs: options.leaderLockTimeoutMs ?? 30_000,
6869
leaderLockExtendIntervalMs: options.leaderLockExtendIntervalMs ?? 10_000,
@@ -84,6 +85,9 @@ export class RunsReplicationService {
8485
});
8586

8687
this._replicationClient.events.on("heartbeat", async ({ lsn, shouldRespond }) => {
88+
if (this._isShuttingDown) return;
89+
if (this._isShutDownComplete) return;
90+
8791
if (shouldRespond) {
8892
await this._replicationClient.acknowledge(lsn);
8993
}
@@ -130,6 +134,11 @@ export class RunsReplicationService {
130134
}
131135
}
132136

137+
public shutdown() {
138+
this.logger.info("Initiating shutdown of runs replication service");
139+
this._isShuttingDown = true;
140+
}
141+
133142
async start(insertStrategy?: "streaming" | "batching") {
134143
this._insertStrategy = insertStrategy ?? this._insertStrategy;
135144

@@ -201,11 +210,27 @@ export class RunsReplicationService {
201210
}
202211

203212
async #handleTransaction(transaction: Transaction<TaskRun>) {
213+
if (this._isShutDownComplete) return;
214+
215+
let alreadyAcknowledged = false;
216+
217+
if (this._isShuttingDown) {
218+
// We need to immediately acknowledge the transaction
219+
// And then try and handle this transaction
220+
if (transaction.commitEndLsn) {
221+
await this._replicationClient.acknowledge(transaction.commitEndLsn);
222+
alreadyAcknowledged = true;
223+
}
224+
225+
await this._replicationClient.stop();
226+
this._isShutDownComplete = true;
227+
}
228+
204229
this._lastReplicationLagMs = transaction.replicationLagMs;
205230

206231
// If there are no events, do nothing
207232
if (transaction.events.length === 0) {
208-
if (transaction.commitEndLsn) {
233+
if (transaction.commitEndLsn && !alreadyAcknowledged) {
209234
await this._replicationClient.acknowledge(transaction.commitEndLsn);
210235
}
211236

@@ -222,6 +247,7 @@ export class RunsReplicationService {
222247

223248
this.logger.debug("Handling transaction", {
224249
transaction,
250+
alreadyAcknowledged,
225251
});
226252

227253
// If there are events, we need to handle them
@@ -230,13 +256,19 @@ export class RunsReplicationService {
230256
this._transactionCounter?.inc();
231257

232258
if (this._insertStrategy === "streaming") {
233-
await this._concurrentFlushScheduler.addToBatch(
234-
transaction.events.map((event) => ({
235-
_version,
236-
run: event.data,
237-
event: event.tag,
238-
}))
239-
);
259+
this._concurrentFlushScheduler
260+
.addToBatch(
261+
transaction.events.map((event) => ({
262+
_version,
263+
run: event.data,
264+
event: event.tag,
265+
}))
266+
)
267+
.catch((error) => {
268+
this.logger.error("Error adding to batch", {
269+
error,
270+
});
271+
});
240272
} else {
241273
const [flushError] = await tryCatch(
242274
this.#flushBatch(
@@ -256,7 +288,9 @@ export class RunsReplicationService {
256288
}
257289
}
258290

259-
await this._replicationClient.acknowledge(transaction.commitEndLsn);
291+
if (!alreadyAcknowledged) {
292+
await this._replicationClient.acknowledge(transaction.commitEndLsn);
293+
}
260294
}
261295

262296
async #flushBatch(flushId: string, batch: Array<TaskRunInsert>) {
@@ -497,7 +531,6 @@ export class ConcurrentFlushScheduler<T> {
497531
private readonly MAX_CONCURRENCY: number;
498532
private readonly concurrencyLimiter: ReturnType<typeof pLimit>;
499533
private flushTimer: NodeJS.Timeout | null;
500-
private isShuttingDown;
501534
private failedBatchCount;
502535
private metricsRegister?: MetricsRegister;
503536
private logger: Logger;
@@ -510,7 +543,6 @@ export class ConcurrentFlushScheduler<T> {
510543
this.MAX_CONCURRENCY = config.maxConcurrency || 1;
511544
this.concurrencyLimiter = pLimit(this.MAX_CONCURRENCY);
512545
this.flushTimer = null;
513-
this.isShuttingDown = false;
514546
this.failedBatchCount = 0;
515547

516548
this.logger.info("Initializing ConcurrentFlushScheduler", {
@@ -520,7 +552,6 @@ export class ConcurrentFlushScheduler<T> {
520552
});
521553

522554
this.startFlushTimer();
523-
this.setupShutdownHandlers();
524555

525556
if (!process.env.VITEST && config.metricsRegister) {
526557
this.metricsRegister = config.metricsRegister;
@@ -592,27 +623,6 @@ export class ConcurrentFlushScheduler<T> {
592623
this.logger.debug("Started flush timer", { interval: this.FLUSH_INTERVAL });
593624
}
594625

595-
private setupShutdownHandlers() {
596-
process.on("SIGTERM", this.shutdown.bind(this));
597-
process.on("SIGINT", this.shutdown.bind(this));
598-
this.logger.debug("Shutdown handlers configured");
599-
}
600-
601-
private async shutdown(): Promise<void> {
602-
if (this.isShuttingDown) return;
603-
this.isShuttingDown = true;
604-
this.logger.info("Initiating shutdown of dynamic flush scheduler", {
605-
remainingItems: this.currentBatch.length,
606-
});
607-
608-
await this.checkAndFlush();
609-
this.clearTimer();
610-
611-
this.logger.info("Dynamic flush scheduler shutdown complete", {
612-
totalFailedBatches: this.failedBatchCount,
613-
});
614-
}
615-
616626
private clearTimer(): void {
617627
if (this.flushTimer) {
618628
clearInterval(this.flushTimer);

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@ import {
3232
ValidationResult,
3333
} from "~/runEngine/types";
3434
import { RunEngineTriggerTaskService } from "../../app/runEngine/services/triggerTask.server";
35-
import { ClickHouse } from "@internal/clickhouse";
36-
import { RunsDashboardService } from "~/services/runsDashboardService.server";
3735

3836
vi.setConfig({ testTimeout: 30_000 }); // 30 seconds timeout
3937

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import { describe, vi } from "vitest";
2+
3+
// Mock the db prisma client
4+
vi.mock("~/db.server", () => ({
5+
prisma: {},
6+
}));
7+
8+
vi.mock("~/services/platform.v3.server", () => ({
9+
getEntitlement: vi.fn(),
10+
}));
11+
12+
import { ClickHouse } from "@internal/clickhouse";
13+
import { containerTest } from "@internal/testcontainers";
14+
import { setTimeout } from "node:timers/promises";
15+
import { RunsReplicationService } from "~/services/runsReplicationService.server";
16+
import { z } from "zod";
17+
18+
vi.setConfig({ testTimeout: 60_000 });
19+
20+
describe("RunsReplicationService", () => {
21+
containerTest(
22+
"should replicate runs to clickhouse",
23+
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
24+
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
25+
26+
const clickhouse = new ClickHouse({
27+
url: clickhouseContainer.getConnectionUrl(),
28+
name: "runs-replication",
29+
});
30+
31+
const runsReplicationService = new RunsReplicationService({
32+
clickhouse,
33+
pgConnectionUrl: postgresContainer.getConnectionUri(),
34+
serviceName: "runs-replication",
35+
slotName: "task_runs_to_clickhouse_v1",
36+
publicationName: "task_runs_to_clickhouse_v1_publication",
37+
redisOptions,
38+
maxFlushConcurrency: 1,
39+
flushIntervalMs: 100,
40+
flushBatchSize: 1,
41+
insertStrategy: "batching",
42+
leaderLockTimeoutMs: 1000,
43+
leaderLockExtendIntervalMs: 1000,
44+
ackIntervalSeconds: 5,
45+
});
46+
47+
await runsReplicationService.start();
48+
49+
const organization = await prisma.organization.create({
50+
data: {
51+
title: "test",
52+
slug: "test",
53+
},
54+
});
55+
56+
const project = await prisma.project.create({
57+
data: {
58+
name: "test",
59+
slug: "test",
60+
organizationId: organization.id,
61+
externalRef: "test",
62+
},
63+
});
64+
65+
const runtimeEnvironment = await prisma.runtimeEnvironment.create({
66+
data: {
67+
slug: "test",
68+
type: "DEVELOPMENT",
69+
projectId: project.id,
70+
organizationId: organization.id,
71+
apiKey: "test",
72+
pkApiKey: "test",
73+
shortcode: "test",
74+
},
75+
});
76+
77+
// Now we insert a row into the table
78+
const taskRun = await prisma.taskRun.create({
79+
data: {
80+
friendlyId: "run_1234",
81+
taskIdentifier: "my-task",
82+
payload: JSON.stringify({ foo: "bar" }),
83+
traceId: "1234",
84+
spanId: "1234",
85+
queue: "test",
86+
runtimeEnvironmentId: runtimeEnvironment.id,
87+
projectId: project.id,
88+
organizationId: organization.id,
89+
environmentType: "DEVELOPMENT",
90+
engine: "V2",
91+
},
92+
});
93+
94+
await setTimeout(1000);
95+
96+
// Check that the row was replicated to clickhouse
97+
const queryRuns = clickhouse.reader.query({
98+
name: "runs-replication",
99+
query: "SELECT * FROM trigger_dev.task_runs_v1",
100+
schema: z.any(),
101+
});
102+
103+
const [queryError, result] = await queryRuns({});
104+
105+
expect(queryError).toBeNull();
106+
expect(result?.length).toBe(1);
107+
expect(result?.[0]).toEqual(
108+
expect.objectContaining({
109+
run_id: taskRun.id,
110+
friendly_id: taskRun.friendlyId,
111+
task_identifier: taskRun.taskIdentifier,
112+
environment_id: runtimeEnvironment.id,
113+
project_id: project.id,
114+
organization_id: organization.id,
115+
environment_type: "DEVELOPMENT",
116+
engine: "V2",
117+
})
118+
);
119+
120+
await runsReplicationService.stop();
121+
}
122+
);
123+
});

internal-packages/replication/src/client.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,17 @@ export interface LogicalReplicationClientOptions {
5050
* The interval in ms to extend the leader lock (default: 10000)
5151
*/
5252
leaderLockExtendIntervalMs?: number;
53+
54+
/**
55+
* The number of times to retry acquiring the leader lock (default: 120)
56+
*/
57+
leaderLockRetryCount?: number;
58+
59+
/**
60+
* The interval in ms to retry acquiring the leader lock (default: 500)
61+
*/
62+
leaderLockRetryIntervalMs?: number;
63+
5364
/**
5465
* The interval in seconds to automatically acknowledge the last LSN if no ack has been sent (default: 10)
5566
*/
@@ -83,6 +94,8 @@ export class LogicalReplicationClient {
8394
private lastAcknowledgedLsn: string | null = null;
8495
private leaderLockTimeoutMs: number;
8596
private leaderLockExtendIntervalMs: number;
97+
private leaderLockRetryCount: number;
98+
private leaderLockRetryIntervalMs: number;
8699
private leaderLockHeartbeatTimer: NodeJS.Timeout | null = null;
87100
private ackIntervalSeconds: number;
88101
private lastAckTimestamp: number = 0;
@@ -106,6 +119,8 @@ export class LogicalReplicationClient {
106119

107120
this.leaderLockTimeoutMs = options.leaderLockTimeoutMs ?? 30000;
108121
this.leaderLockExtendIntervalMs = options.leaderLockExtendIntervalMs ?? 10000;
122+
this.leaderLockRetryCount = options.leaderLockRetryCount ?? 120;
123+
this.leaderLockRetryIntervalMs = options.leaderLockRetryIntervalMs ?? 500;
109124
this.ackIntervalSeconds = options.ackIntervalSeconds ?? 10;
110125

111126
this.redis = createRedisClient(
@@ -544,9 +559,8 @@ export class LogicalReplicationClient {
544559
[`logical-replication-client:${this.options.name}`],
545560
this.leaderLockTimeoutMs,
546561
{
547-
retryCount: 60,
548-
retryDelay: 1000,
549-
retryJitter: 100,
562+
retryCount: this.leaderLockRetryCount,
563+
retryDelay: this.leaderLockRetryIntervalMs,
550564
}
551565
);
552566
} catch (err) {

0 commit comments

Comments
 (0)