Skip to content

Commit 32271ce

Browse files
committed
runs replication leader lock expiration fix
1 parent 0661ee5 commit 32271ce

File tree

5 files changed

+207
-32
lines changed

5 files changed

+207
-32
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,7 @@ const EnvironmentSchema = z.object({
766766
RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000),
767767
RUN_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10),
768768
RUN_REPLICATION_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
769-
RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT: z.coerce.number().int().default(240),
769+
RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000),
770770
RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
771771
RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
772772
});

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ function initializeRunsReplicationInstance() {
4545
flushBatchSize: env.RUN_REPLICATION_FLUSH_BATCH_SIZE,
4646
leaderLockTimeoutMs: env.RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS,
4747
leaderLockExtendIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS,
48-
leaderLockRetryCount: env.RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT,
48+
leaderLockAcquireAdditionalTimeMs: env.RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS,
4949
leaderLockRetryIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS,
5050
ackIntervalSeconds: env.RUN_REPLICATION_ACK_INTERVAL_SECONDS,
5151
logLevel: env.RUN_REPLICATION_LOG_LEVEL,

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export type RunsReplicationServiceOptions = {
4343
flushBatchSize?: number;
4444
leaderLockTimeoutMs?: number;
4545
leaderLockExtendIntervalMs?: number;
46-
leaderLockRetryCount?: number;
46+
leaderLockAcquireAdditionalTimeMs?: number;
4747
leaderLockRetryIntervalMs?: number;
4848
ackIntervalSeconds?: number;
4949
acknowledgeTimeoutMs?: number;
@@ -102,11 +102,11 @@ export class RunsReplicationService {
102102
redisOptions: options.redisOptions,
103103
autoAcknowledge: false,
104104
publicationActions: ["insert", "update", "delete"],
105-
logger: new Logger("LogicalReplicationClient", options.logLevel ?? "info"),
105+
logger: options.logger ?? new Logger("LogicalReplicationClient", options.logLevel ?? "info"),
106106
leaderLockTimeoutMs: options.leaderLockTimeoutMs ?? 30_000,
107107
leaderLockExtendIntervalMs: options.leaderLockExtendIntervalMs ?? 10_000,
108108
ackIntervalSeconds: options.ackIntervalSeconds ?? 10,
109-
leaderLockRetryCount: options.leaderLockRetryCount ?? 240,
109+
leaderLockAcquireAdditionalTimeMs: options.leaderLockAcquireAdditionalTimeMs ?? 10_000,
110110
leaderLockRetryIntervalMs: options.leaderLockRetryIntervalMs ?? 500,
111111
tracer: options.tracer,
112112
});

apps/webapp/test/runsReplicationService.test.ts

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,6 +1030,135 @@ describe("RunsReplicationService", () => {
10301030
}
10311031
);
10321032

1033+
containerTest(
1034+
"should handover leadership to a second service, and the second service should be able to extend the leader lock",
1035+
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
1036+
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);
1037+
1038+
const clickhouse = new ClickHouse({
1039+
url: clickhouseContainer.getConnectionUrl(),
1040+
name: "runs-replication-shutdown-handover",
1041+
});
1042+
1043+
// Service A
1044+
const runsReplicationServiceA = new RunsReplicationService({
1045+
clickhouse,
1046+
pgConnectionUrl: postgresContainer.getConnectionUri(),
1047+
serviceName: "runs-replication-shutdown-handover",
1048+
slotName: "task_runs_to_clickhouse_v1",
1049+
publicationName: "task_runs_to_clickhouse_v1_publication",
1050+
redisOptions,
1051+
maxFlushConcurrency: 1,
1052+
flushIntervalMs: 100,
1053+
flushBatchSize: 1,
1054+
leaderLockTimeoutMs: 5000,
1055+
leaderLockExtendIntervalMs: 1000,
1056+
leaderLockAcquireAdditionalTimeMs: 10_000,
1057+
ackIntervalSeconds: 5,
1058+
logger: new Logger("runs-replication-shutdown-handover-a", "debug"),
1059+
});
1060+
1061+
await runsReplicationServiceA.start();
1062+
1063+
// Service A
1064+
const runsReplicationServiceB = new RunsReplicationService({
1065+
clickhouse,
1066+
pgConnectionUrl: postgresContainer.getConnectionUri(),
1067+
serviceName: "runs-replication-shutdown-handover",
1068+
slotName: "task_runs_to_clickhouse_v1",
1069+
publicationName: "task_runs_to_clickhouse_v1_publication",
1070+
redisOptions,
1071+
maxFlushConcurrency: 1,
1072+
flushIntervalMs: 100,
1073+
flushBatchSize: 1,
1074+
leaderLockTimeoutMs: 5000,
1075+
leaderLockExtendIntervalMs: 1000,
1076+
leaderLockAcquireAdditionalTimeMs: 10_000,
1077+
ackIntervalSeconds: 5,
1078+
logger: new Logger("runs-replication-shutdown-handover-b", "debug"),
1079+
});
1080+
1081+
// Now we need to initiate starting the second service, and after 6 seconds, we need to shutdown the first service
1082+
await Promise.all([
1083+
setTimeout(6000).then(() => runsReplicationServiceA.stop()),
1084+
runsReplicationServiceB.start(),
1085+
]);
1086+
1087+
const organization = await prisma.organization.create({
1088+
data: {
1089+
title: "test",
1090+
slug: "test",
1091+
},
1092+
});
1093+
1094+
const project = await prisma.project.create({
1095+
data: {
1096+
name: "test",
1097+
slug: "test",
1098+
organizationId: organization.id,
1099+
externalRef: "test",
1100+
},
1101+
});
1102+
1103+
const runtimeEnvironment = await prisma.runtimeEnvironment.create({
1104+
data: {
1105+
slug: "test",
1106+
type: "DEVELOPMENT",
1107+
projectId: project.id,
1108+
organizationId: organization.id,
1109+
apiKey: "test",
1110+
pkApiKey: "test",
1111+
shortcode: "test",
1112+
},
1113+
});
1114+
1115+
// Now we insert a row into the table
1116+
const taskRun = await prisma.taskRun.create({
1117+
data: {
1118+
friendlyId: "run_1234",
1119+
taskIdentifier: "my-task",
1120+
payload: JSON.stringify({ foo: "bar" }),
1121+
traceId: "1234",
1122+
spanId: "1234",
1123+
queue: "test",
1124+
runtimeEnvironmentId: runtimeEnvironment.id,
1125+
projectId: project.id,
1126+
organizationId: organization.id,
1127+
environmentType: "DEVELOPMENT",
1128+
engine: "V2",
1129+
},
1130+
});
1131+
1132+
await setTimeout(10_000);
1133+
1134+
// Check that the row was replicated to clickhouse
1135+
const queryRuns = clickhouse.reader.query({
1136+
name: "runs-replication",
1137+
query: "SELECT * FROM trigger_dev.task_runs_v2",
1138+
schema: z.any(),
1139+
});
1140+
1141+
const [queryError, result] = await queryRuns({});
1142+
1143+
expect(queryError).toBeNull();
1144+
expect(result?.length).toBe(1);
1145+
expect(result?.[0]).toEqual(
1146+
expect.objectContaining({
1147+
run_id: taskRun.id,
1148+
friendly_id: taskRun.friendlyId,
1149+
task_identifier: taskRun.taskIdentifier,
1150+
environment_id: runtimeEnvironment.id,
1151+
project_id: project.id,
1152+
organization_id: organization.id,
1153+
environment_type: "DEVELOPMENT",
1154+
engine: "V2",
1155+
})
1156+
);
1157+
1158+
await runsReplicationServiceB.stop();
1159+
}
1160+
);
1161+
10331162
containerTest(
10341163
"should replicate all 1,000 TaskRuns inserted in bulk to ClickHouse",
10351164
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {

internal-packages/replication/src/client.ts

Lines changed: 73 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,14 @@ export interface LogicalReplicationClientOptions {
5353
leaderLockExtendIntervalMs?: number;
5454

5555
/**
56-
* The number of times to retry acquiring the leader lock (default: 120)
56+
* The interval in ms to retry acquiring the leader lock (default: 500)
5757
*/
58-
leaderLockRetryCount?: number;
58+
leaderLockRetryIntervalMs?: number;
5959

6060
/**
61-
* The interval in ms to retry acquiring the leader lock (default: 500)
61+
* The additional time in ms to retry acquiring the leader lock (default: 1000ms)
6262
*/
63-
leaderLockRetryIntervalMs?: number;
63+
leaderLockAcquireAdditionalTimeMs?: number;
6464

6565
/**
6666
* The interval in seconds to automatically acknowledge the last LSN if no ack has been sent (default: 10)
@@ -97,7 +97,7 @@ export class LogicalReplicationClient {
9797
private lastAcknowledgedLsn: string | null = null;
9898
private leaderLockTimeoutMs: number;
9999
private leaderLockExtendIntervalMs: number;
100-
private leaderLockRetryCount: number;
100+
private leaderLockAcquireAdditionalTimeMs: number;
101101
private leaderLockRetryIntervalMs: number;
102102
private leaderLockHeartbeatTimer: NodeJS.Timeout | null = null;
103103
private ackIntervalSeconds: number;
@@ -124,7 +124,7 @@ export class LogicalReplicationClient {
124124

125125
this.leaderLockTimeoutMs = options.leaderLockTimeoutMs ?? 30000;
126126
this.leaderLockExtendIntervalMs = options.leaderLockExtendIntervalMs ?? 10000;
127-
this.leaderLockRetryCount = options.leaderLockRetryCount ?? 120;
127+
this.leaderLockAcquireAdditionalTimeMs = options.leaderLockAcquireAdditionalTimeMs ?? 1000;
128128
this.leaderLockRetryIntervalMs = options.leaderLockRetryIntervalMs ?? 500;
129129
this.ackIntervalSeconds = options.ackIntervalSeconds ?? 10;
130130

@@ -578,34 +578,74 @@ export class LogicalReplicationClient {
578578
}
579579

580580
async #acquireLeaderLock(): Promise<boolean> {
581-
try {
582-
this.leaderLock = await this.redlock.acquire(
583-
[`logical-replication-client:${this.options.name}`],
584-
this.leaderLockTimeoutMs,
585-
{
586-
retryCount: this.leaderLockRetryCount,
587-
retryDelay: this.leaderLockRetryIntervalMs,
588-
}
589-
);
590-
} catch (err) {
591-
this.logger.error("Leader election failed", {
592-
name: this.options.name,
593-
table: this.options.table,
594-
slotName: this.options.slotName,
595-
publicationName: this.options.publicationName,
596-
retryCount: this.leaderLockRetryCount,
597-
retryIntervalMs: this.leaderLockRetryIntervalMs,
598-
error: err,
599-
});
581+
const startTime = Date.now();
582+
const maxWaitTime = this.leaderLockTimeoutMs + this.leaderLockAcquireAdditionalTimeMs;
600583

601-
return false;
584+
this.logger.debug("Acquiring leader lock", {
585+
name: this.options.name,
586+
slotName: this.options.slotName,
587+
publicationName: this.options.publicationName,
588+
maxWaitTime,
589+
});
590+
591+
let attempt = 0;
592+
593+
while (Date.now() - startTime < maxWaitTime) {
594+
try {
595+
this.leaderLock = await this.redlock.acquire(
596+
[`logical-replication-client:${this.options.name}`],
597+
this.leaderLockTimeoutMs
598+
);
599+
600+
this.logger.debug("Acquired leader lock", {
601+
name: this.options.name,
602+
slotName: this.options.slotName,
603+
publicationName: this.options.publicationName,
604+
lockTimeoutMs: this.leaderLockTimeoutMs,
605+
lockExtendIntervalMs: this.leaderLockExtendIntervalMs,
606+
lock: this.leaderLock,
607+
attempt,
608+
});
609+
return true;
610+
} catch (err) {
611+
attempt++;
612+
613+
this.logger.debug("Failed to acquire leader lock, retrying", {
614+
name: this.options.name,
615+
slotName: this.options.slotName,
616+
publicationName: this.options.publicationName,
617+
attempt,
618+
retryIntervalMs: this.leaderLockRetryIntervalMs,
619+
error: err,
620+
});
621+
622+
await new Promise((resolve) => setTimeout(resolve, this.leaderLockRetryIntervalMs));
623+
}
602624
}
603625

604-
return true;
626+
this.logger.error("Leader election failed after retries", {
627+
name: this.options.name,
628+
table: this.options.table,
629+
slotName: this.options.slotName,
630+
publicationName: this.options.publicationName,
631+
totalAttempts: attempt,
632+
totalWaitTimeMs: Date.now() - startTime,
633+
});
634+
return false;
605635
}
606636

607637
async #releaseLeaderLock() {
608638
if (!this.leaderLock) return;
639+
640+
this.logger.debug("Releasing leader lock", {
641+
name: this.options.name,
642+
slotName: this.options.slotName,
643+
publicationName: this.options.publicationName,
644+
lockTimeoutMs: this.leaderLockTimeoutMs,
645+
lockExtendIntervalMs: this.leaderLockExtendIntervalMs,
646+
lock: this.leaderLock,
647+
});
648+
609649
const [releaseError] = await tryCatch(this.leaderLock.release());
610650
this.leaderLock = null;
611651

@@ -631,13 +671,19 @@ export class LogicalReplicationClient {
631671
name: this.options.name,
632672
slotName: this.options.slotName,
633673
publicationName: this.options.publicationName,
674+
lockTimeoutMs: this.leaderLockTimeoutMs,
675+
lockExtendIntervalMs: this.leaderLockExtendIntervalMs,
676+
lock: this.leaderLock,
634677
});
635678
} catch (err) {
636679
this.logger.error("Failed to extend leader lock", {
637680
name: this.options.name,
638681
slotName: this.options.slotName,
639682
publicationName: this.options.publicationName,
640683
error: err,
684+
lockTimeoutMs: this.leaderLockTimeoutMs,
685+
lockExtendIntervalMs: this.leaderLockExtendIntervalMs,
686+
lock: this.leaderLock,
641687
});
642688
// Optionally emit an error or handle loss of leadership
643689
this.events.emit("error", err instanceof Error ? err : new Error(String(err)));

0 commit comments

Comments
 (0)