Skip to content

runs replication leader lock expiration fix #2050

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -766,9 +766,11 @@ const EnvironmentSchema = z.object({
RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000),
RUN_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10),
RUN_REPLICATION_LOG_LEVEL: z.enum(["log", "error", "warn", "info", "debug"]).default("info"),
RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT: z.coerce.number().int().default(240),
RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS: z.coerce.number().int().default(10_000),
RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS: z.coerce.number().int().default(500),
RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("1"),
RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().default(9_000),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
7 changes: 6 additions & 1 deletion apps/webapp/app/services/runsReplicationInstance.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ function initializeRunsReplicationInstance() {
const clickhouse = new ClickHouse({
url: env.RUN_REPLICATION_CLICKHOUSE_URL,
name: "runs-replication",
keepAlive: {
enabled: env.RUN_REPLICATION_KEEP_ALIVE_ENABLED === "1",
idleSocketTtl: env.RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
},
logLevel: env.RUN_REPLICATION_LOG_LEVEL,
});

const service = new RunsReplicationService({
Expand All @@ -45,7 +50,7 @@ function initializeRunsReplicationInstance() {
flushBatchSize: env.RUN_REPLICATION_FLUSH_BATCH_SIZE,
leaderLockTimeoutMs: env.RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS,
leaderLockExtendIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS,
leaderLockRetryCount: env.RUN_REPLICATION_LEADER_LOCK_RETRY_COUNT,
leaderLockAcquireAdditionalTimeMs: env.RUN_REPLICATION_LEADER_LOCK_ADDITIONAL_TIME_MS,
leaderLockRetryIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_RETRY_INTERVAL_MS,
ackIntervalSeconds: env.RUN_REPLICATION_ACK_INTERVAL_SECONDS,
logLevel: env.RUN_REPLICATION_LOG_LEVEL,
Expand Down
89 changes: 59 additions & 30 deletions apps/webapp/app/services/runsReplicationService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export type RunsReplicationServiceOptions = {
flushBatchSize?: number;
leaderLockTimeoutMs?: number;
leaderLockExtendIntervalMs?: number;
leaderLockRetryCount?: number;
leaderLockAcquireAdditionalTimeMs?: number;
leaderLockRetryIntervalMs?: number;
ackIntervalSeconds?: number;
acknowledgeTimeoutMs?: number;
Expand Down Expand Up @@ -102,11 +102,11 @@ export class RunsReplicationService {
redisOptions: options.redisOptions,
autoAcknowledge: false,
publicationActions: ["insert", "update", "delete"],
logger: new Logger("LogicalReplicationClient", options.logLevel ?? "info"),
logger: options.logger ?? new Logger("LogicalReplicationClient", options.logLevel ?? "info"),
leaderLockTimeoutMs: options.leaderLockTimeoutMs ?? 30_000,
leaderLockExtendIntervalMs: options.leaderLockExtendIntervalMs ?? 10_000,
ackIntervalSeconds: options.ackIntervalSeconds ?? 10,
leaderLockRetryCount: options.leaderLockRetryCount ?? 240,
leaderLockAcquireAdditionalTimeMs: options.leaderLockAcquireAdditionalTimeMs ?? 10_000,
leaderLockRetryIntervalMs: options.leaderLockRetryIntervalMs ?? 500,
tracer: options.tracer,
});
Expand Down Expand Up @@ -330,10 +330,6 @@ export class RunsReplicationService {
return;
}

this.logger.debug("Handling transaction", {
transaction,
});

const lsnToUInt64Start = process.hrtime.bigint();

// If there are events, we need to handle them
Expand All @@ -349,20 +345,32 @@ export class RunsReplicationService {
}))
);

const currentSpan = this._tracer.startSpan("handle_transaction", {
attributes: {
"transaction.xid": transaction.xid,
"transaction.replication_lag_ms": transaction.replicationLagMs,
"transaction.events": transaction.events.length,
"transaction.commit_end_lsn": transaction.commitEndLsn,
"transaction.parse_duration_ms": this._currentParseDurationMs ?? undefined,
"transaction.lsn_to_uint64_ms": lsnToUInt64DurationMs,
"transaction.version": _version.toString(),
this._tracer
.startSpan("handle_transaction", {
attributes: {
"transaction.xid": transaction.xid,
"transaction.replication_lag_ms": transaction.replicationLagMs,
"transaction.events": transaction.events.length,
"transaction.commit_end_lsn": transaction.commitEndLsn,
"transaction.parse_duration_ms": this._currentParseDurationMs ?? undefined,
"transaction.lsn_to_uint64_ms": lsnToUInt64DurationMs,
"transaction.version": _version.toString(),
},
startTime: transaction.beginStartTimestamp,
})
.end();

this.logger.debug("handle_transaction", {
transaction: {
xid: transaction.xid,
commitLsn: transaction.commitLsn,
commitEndLsn: transaction.commitEndLsn,
events: transaction.events.length,
parseDurationMs: this._currentParseDurationMs,
lsnToUInt64DurationMs,
version: _version.toString(),
},
startTime: transaction.beginStartTimestamp,
});

currentSpan.end();
}

async #acknowledgeLatestTransaction() {
Expand All @@ -387,7 +395,7 @@ export class RunsReplicationService {
this._lastAcknowledgedAt = now;
this._lastAcknowledgedLsn = this._latestCommitEndLsn;

this.logger.debug("Acknowledging transaction", {
this.logger.debug("acknowledge_latest_transaction", {
commitEndLsn: this._latestCommitEndLsn,
lastAcknowledgedAt: this._lastAcknowledgedAt,
});
Expand Down Expand Up @@ -747,7 +755,7 @@ export class ConcurrentFlushScheduler<T> {
const callback = this.config.callback;

const promise = this.concurrencyLimiter(async () => {
await startSpan(this._tracer, "flushNextBatch", async (span) => {
return await startSpan(this._tracer, "flushNextBatch", async (span) => {
const batchId = nanoid();

span.setAttribute("batch_id", batchId);
Expand All @@ -756,26 +764,47 @@ export class ConcurrentFlushScheduler<T> {
span.setAttribute("concurrency_pending_count", this.concurrencyLimiter.pendingCount);
span.setAttribute("concurrency_concurrency", this.concurrencyLimiter.concurrency);

this.logger.debug("flush_next_batch", {
batchId,
batchSize: batch.length,
concurrencyActiveCount: this.concurrencyLimiter.activeCount,
concurrencyPendingCount: this.concurrencyLimiter.pendingCount,
concurrencyConcurrency: this.concurrencyLimiter.concurrency,
});

const start = performance.now();

await callback(batchId, batch);

const end = performance.now();

const duration = end - start;

return {
batchId,
duration,
};
});
});

const [error] = await tryCatch(promise);
const [error, result] = await tryCatch(promise);

if (error) {
this.logger.error("Error flushing batch", {
this.logger.error("flush_batch_error", {
error,
});

this.failedBatchCount++;
} else {
this.logger.debug("flush_batch_complete", {
totalBatches: 1,
successfulBatches: 1,
failedBatches: 0,
totalFailedBatches: this.failedBatchCount,
duration: result?.duration,
batchId: result?.batchId,
});
}

this.logger.debug("Batch flush complete", {
totalBatches: 1,
successfulBatches: 1,
failedBatches: 0,
totalFailedBatches: this.failedBatchCount,
});
}
}

Expand Down
129 changes: 129 additions & 0 deletions apps/webapp/test/runsReplicationService.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,135 @@ describe("RunsReplicationService", () => {
}
);

containerTest(
"should handover leadership to a second service, and the second service should be able to extend the leader lock",
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
await prisma.$executeRawUnsafe(`ALTER TABLE public."TaskRun" REPLICA IDENTITY FULL;`);

const clickhouse = new ClickHouse({
url: clickhouseContainer.getConnectionUrl(),
name: "runs-replication-shutdown-handover",
});

// Service A
const runsReplicationServiceA = new RunsReplicationService({
clickhouse,
pgConnectionUrl: postgresContainer.getConnectionUri(),
serviceName: "runs-replication-shutdown-handover",
slotName: "task_runs_to_clickhouse_v1",
publicationName: "task_runs_to_clickhouse_v1_publication",
redisOptions,
maxFlushConcurrency: 1,
flushIntervalMs: 100,
flushBatchSize: 1,
leaderLockTimeoutMs: 5000,
leaderLockExtendIntervalMs: 1000,
leaderLockAcquireAdditionalTimeMs: 10_000,
ackIntervalSeconds: 5,
logger: new Logger("runs-replication-shutdown-handover-a", "debug"),
});

await runsReplicationServiceA.start();

// Service A
const runsReplicationServiceB = new RunsReplicationService({
clickhouse,
pgConnectionUrl: postgresContainer.getConnectionUri(),
serviceName: "runs-replication-shutdown-handover",
slotName: "task_runs_to_clickhouse_v1",
publicationName: "task_runs_to_clickhouse_v1_publication",
redisOptions,
maxFlushConcurrency: 1,
flushIntervalMs: 100,
flushBatchSize: 1,
leaderLockTimeoutMs: 5000,
leaderLockExtendIntervalMs: 1000,
leaderLockAcquireAdditionalTimeMs: 10_000,
ackIntervalSeconds: 5,
logger: new Logger("runs-replication-shutdown-handover-b", "debug"),
});

// Now we need to initiate starting the second service, and after 6 seconds, we need to shutdown the first service
await Promise.all([
setTimeout(6000).then(() => runsReplicationServiceA.stop()),
runsReplicationServiceB.start(),
]);

const organization = await prisma.organization.create({
data: {
title: "test",
slug: "test",
},
});

const project = await prisma.project.create({
data: {
name: "test",
slug: "test",
organizationId: organization.id,
externalRef: "test",
},
});

const runtimeEnvironment = await prisma.runtimeEnvironment.create({
data: {
slug: "test",
type: "DEVELOPMENT",
projectId: project.id,
organizationId: organization.id,
apiKey: "test",
pkApiKey: "test",
shortcode: "test",
},
});

// Now we insert a row into the table
const taskRun = await prisma.taskRun.create({
data: {
friendlyId: "run_1234",
taskIdentifier: "my-task",
payload: JSON.stringify({ foo: "bar" }),
traceId: "1234",
spanId: "1234",
queue: "test",
runtimeEnvironmentId: runtimeEnvironment.id,
projectId: project.id,
organizationId: organization.id,
environmentType: "DEVELOPMENT",
engine: "V2",
},
});

await setTimeout(10_000);

// Check that the row was replicated to clickhouse
const queryRuns = clickhouse.reader.query({
name: "runs-replication",
query: "SELECT * FROM trigger_dev.task_runs_v2",
schema: z.any(),
});

const [queryError, result] = await queryRuns({});

expect(queryError).toBeNull();
expect(result?.length).toBe(1);
expect(result?.[0]).toEqual(
expect.objectContaining({
run_id: taskRun.id,
friendly_id: taskRun.friendlyId,
task_identifier: taskRun.taskIdentifier,
environment_id: runtimeEnvironment.id,
project_id: project.id,
organization_id: organization.id,
environment_type: "DEVELOPMENT",
engine: "V2",
})
);

await runsReplicationServiceB.stop();
}
);

containerTest(
"should replicate all 1,000 TaskRuns inserted in bulk to ClickHouse",
async ({ clickhouseContainer, redisOptions, postgresContainer, prisma }) => {
Expand Down
11 changes: 9 additions & 2 deletions docker/scripts/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ cp internal-packages/database/prisma/schema.prisma apps/webapp/prisma/
cp node_modules/@prisma/engines/*.node apps/webapp/prisma/

cd /triggerdotdev/apps/webapp
# exec dumb-init pnpm run start:local
NODE_PATH='/triggerdotdev/node_modules/.pnpm/node_modules' exec dumb-init node --max-old-space-size=8192 ./build/server.js


# Decide how much old-space memory Node should get.
# Use $NODE_MAX_OLD_SPACE_SIZE if it’s set; otherwise fall back to 8192.
MAX_OLD_SPACE_SIZE="${NODE_MAX_OLD_SPACE_SIZE:-8192}"

echo "Setting max old space size to ${MAX_OLD_SPACE_SIZE}"

NODE_PATH='/triggerdotdev/node_modules/.pnpm/node_modules' exec dumb-init node --max-old-space-size=${MAX_OLD_SPACE_SIZE} ./build/server.js

15 changes: 12 additions & 3 deletions internal-packages/clickhouse/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ import type {
ClickhouseWriter,
} from "./types.js";
import { generateErrorMessage } from "zod-error";
import { Logger } from "@trigger.dev/core/logger";
import { Logger, type LogLevel } from "@trigger.dev/core/logger";
import type { Agent as HttpAgent } from "http";
import type { Agent as HttpsAgent } from "https";

export type ClickhouseConfig = {
name: string;
url: string;
tracer?: Tracer;
keepAlive?: {
enabled?: boolean;
idleSocketTtl?: number;
};
httpAgent?: HttpAgent | HttpsAgent;
clickhouseSettings?: ClickHouseSettings;
logger?: Logger;
logLevel?: LogLevel;
};

export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {
Expand All @@ -33,11 +41,12 @@ export class ClickhouseClient implements ClickhouseReader, ClickhouseWriter {

constructor(config: ClickhouseConfig) {
this.name = config.name;
this.logger = config.logger ?? new Logger("ClickhouseClient", "debug");
this.logger = config.logger ?? new Logger("ClickhouseClient", config.logLevel ?? "info");

this.client = createClient({
url: config.url,

keep_alive: config.keepAlive,
http_agent: config.httpAgent,
clickhouse_settings: {
...config.clickhouseSettings,
output_format_json_quote_64bit_integers: 0,
Expand Down
Loading