Skip to content

Commit f3e9041

Browse files
committed
prepare for using clickhouse cloud and now running ch migrations during boot in the entrypoint.sh
1 parent 2131b66 commit f3e9041

14 files changed

+270
-154
lines changed

apps/webapp/app/env.server.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,18 @@ const EnvironmentSchema = z.object({
754754
.optional()
755755
.transform((v) => v ?? process.env.REDIS_PASSWORD),
756756
RUN_REPLICATION_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
757+
758+
RUN_REPLICATION_CLICKHOUSE_URL: z.string().optional(),
759+
RUN_REPLICATION_ENABLED: z.string().default("0"),
760+
RUN_REPLICATION_SLOT_NAME: z.string().default("task_runs_to_clickhouse_v1"),
761+
RUN_REPLICATION_PUBLICATION_NAME: z.string().default("task_runs_to_clickhouse_v1_publication"),
762+
RUN_REPLICATION_MAX_FLUSH_CONCURRENCY: z.coerce.number().int().default(100),
763+
RUN_REPLICATION_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
764+
RUN_REPLICATION_FLUSH_BATCH_SIZE: z.coerce.number().int().default(100),
765+
RUN_REPLICATION_INSERT_STRATEGY: z.enum(["streaming", "batching"]).default("batching"),
766+
RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS: z.coerce.number().int().default(30_000),
767+
RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS: z.coerce.number().int().default(10_000),
768+
RUN_REPLICATION_ACK_INTERVAL_SECONDS: z.coerce.number().int().default(10),
757769
});
758770

759771
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export async function action({ request }: ActionFunctionArgs) {
3434
const body = await request.json();
3535
const { insertStrategy } = schema.parse(body);
3636

37-
await runsReplicationInstance.start(insertStrategy);
37+
await runsReplicationInstance?.start(insertStrategy);
3838

3939
return json({
4040
success: true,

apps/webapp/app/routes/admin.api.v1.runs-replication.stop.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export async function action({ request }: ActionFunctionArgs) {
2626
}
2727

2828
try {
29-
await runsReplicationInstance.stop();
29+
await runsReplicationInstance?.stop();
3030

3131
return json({
3232
success: true,

apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export async function action({ request }: ActionFunctionArgs) {
2626
}
2727

2828
try {
29-
await runsReplicationInstance.teardown();
29+
await runsReplicationInstance?.teardown();
3030

3131
return json({
3232
success: true,

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { singleton } from "~/utils/singleton";
44
import invariant from "tiny-invariant";
55
import { env } from "~/env.server";
66
import { metricsRegister } from "~/metrics.server";
7+
import { logger } from "./logger.server";
78

89
export const runsReplicationInstance = singleton(
910
"runsReplicationInstance",
@@ -14,14 +15,22 @@ function initializeRunsReplicationInstance() {
1415
const { DATABASE_URL } = process.env;
1516
invariant(typeof DATABASE_URL === "string", "DATABASE_URL env var not set");
1617

17-
const clickhouse = ClickHouse.fromEnv();
18+
if (!env.RUN_REPLICATION_CLICKHOUSE_URL) {
19+
logger.info("🗃️ Runs replication service not enabled");
20+
return;
21+
}
22+
23+
const clickhouse = new ClickHouse({
24+
url: env.RUN_REPLICATION_CLICKHOUSE_URL,
25+
name: "runs-replication",
26+
});
1827

1928
const service = new RunsReplicationService({
2029
clickhouse: clickhouse,
2130
pgConnectionUrl: DATABASE_URL,
2231
serviceName: "runs-replication",
23-
slotName: "task_runs_to_clickhouse_v1",
24-
publicationName: "task_runs_to_clickhouse_v1_publication",
32+
slotName: env.RUN_REPLICATION_SLOT_NAME,
33+
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
2534
redisOptions: {
2635
keyPrefix: "runs-replication:",
2736
port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
@@ -32,7 +41,27 @@ function initializeRunsReplicationInstance() {
3241
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
3342
},
3443
metricsRegister: metricsRegister,
44+
maxFlushConcurrency: env.RUN_REPLICATION_MAX_FLUSH_CONCURRENCY,
45+
flushIntervalMs: env.RUN_REPLICATION_FLUSH_INTERVAL_MS,
46+
flushBatchSize: env.RUN_REPLICATION_FLUSH_BATCH_SIZE,
47+
insertStrategy: env.RUN_REPLICATION_INSERT_STRATEGY,
48+
leaderLockTimeoutMs: env.RUN_REPLICATION_LEADER_LOCK_TIMEOUT_MS,
49+
leaderLockExtendIntervalMs: env.RUN_REPLICATION_LEADER_LOCK_EXTEND_INTERVAL_MS,
50+
ackIntervalSeconds: env.RUN_REPLICATION_ACK_INTERVAL_SECONDS,
3551
});
3652

53+
if (env.RUN_REPLICATION_ENABLED === "1") {
54+
service
55+
.start()
56+
.then(() => {
57+
logger.info("🗃️ Runs replication service started");
58+
})
59+
.catch((error) => {
60+
logger.error("🗃️ Runs replication service failed to start", {
61+
error,
62+
});
63+
});
64+
}
65+
3766
return service;
3867
}

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 27 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@ export type RunsReplicationServiceOptions = {
2323
maxFlushConcurrency?: number;
2424
flushIntervalMs?: number;
2525
flushBatchSize?: number;
26+
leaderLockTimeoutMs?: number;
27+
leaderLockExtendIntervalMs?: number;
28+
ackIntervalSeconds?: number;
2629
};
2730

28-
type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" };
31+
type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" | "delete" };
2932

3033
export class RunsReplicationService {
3134
private _lastLsn: string | null = null;
@@ -61,9 +64,9 @@ export class RunsReplicationService {
6164
autoAcknowledge: false,
6265
publicationActions: ["insert", "update"],
6366
logger: new Logger("RunsReplicationService", "debug"),
64-
leaderLockTimeoutMs: 30_000,
65-
leaderLockExtendIntervalMs: 10_000,
66-
ackIntervalSeconds: 10,
67+
leaderLockTimeoutMs: options.leaderLockTimeoutMs ?? 30_000,
68+
leaderLockExtendIntervalMs: options.leaderLockExtendIntervalMs ?? 10_000,
69+
ackIntervalSeconds: options.ackIntervalSeconds ?? 10,
6770
});
6871

6972
this._concurrentFlushScheduler = new ConcurrentFlushScheduler<TaskRunInsert>({
@@ -217,20 +220,6 @@ export class RunsReplicationService {
217220
return;
218221
}
219222

220-
const relevantEvents = transaction.events.filter(
221-
(event) => event.tag === "insert" || event.tag === "update"
222-
);
223-
224-
if (relevantEvents.length === 0) {
225-
this.logger.debug("No relevant events", {
226-
transaction,
227-
});
228-
229-
await this._replicationClient.acknowledge(transaction.commitEndLsn);
230-
231-
return;
232-
}
233-
234223
this.logger.debug("Handling transaction", {
235224
transaction,
236225
});
@@ -242,20 +231,20 @@ export class RunsReplicationService {
242231

243232
if (this._insertStrategy === "streaming") {
244233
await this._concurrentFlushScheduler.addToBatch(
245-
relevantEvents.map((event) => ({
234+
transaction.events.map((event) => ({
246235
_version,
247236
run: event.data,
248-
event: event.tag as "insert" | "update",
237+
event: event.tag,
249238
}))
250239
);
251240
} else {
252241
const [flushError] = await tryCatch(
253242
this.#flushBatch(
254243
nanoid(),
255-
relevantEvents.map((event) => ({
244+
transaction.events.map((event) => ({
256245
_version,
257246
run: event.data,
258-
event: event.tag as "insert" | "update",
247+
event: event.tag,
259248
}))
260249
)
261250
);
@@ -376,11 +365,12 @@ export class RunsReplicationService {
376365
};
377366
}
378367

379-
if (event === "update") {
368+
if (event === "update" || event === "delete") {
380369
const taskRunInsert = await this.#prepareTaskRunInsert(
381370
run,
382371
run.organizationId,
383372
run.environmentType,
373+
event,
384374
_version
385375
);
386376

@@ -391,7 +381,7 @@ export class RunsReplicationService {
391381
}
392382

393383
const [taskRunInsert, payloadInsert] = await Promise.all([
394-
this.#prepareTaskRunInsert(run, run.organizationId, run.environmentType, _version),
384+
this.#prepareTaskRunInsert(run, run.organizationId, run.environmentType, event, _version),
395385
this.#preparePayloadInsert(run, _version),
396386
]);
397387

@@ -405,6 +395,7 @@ export class RunsReplicationService {
405395
run: TaskRun,
406396
organizationId: string,
407397
environmentType: string,
398+
event: "insert" | "update" | "delete",
408399
_version: bigint
409400
): Promise<TaskRunV1> {
410401
const output = await this.#prepareJson(run.output, run.outputType);
@@ -424,10 +415,10 @@ export class RunsReplicationService {
424415
queue: run.queue,
425416
span_id: run.spanId,
426417
trace_id: run.traceId,
427-
error: run.error ? (run.error as TaskRunError) : undefined,
418+
error: { data: run.error },
428419
attempt: run.attemptNumber ?? 1,
429-
schedule_id: run.scheduleId,
430-
batch_id: run.batchId,
420+
schedule_id: run.scheduleId ?? "",
421+
batch_id: run.batchId ?? "",
431422
completed_at: run.completedAt?.getTime(),
432423
started_at: run.startedAt?.getTime(),
433424
executed_at: run.executedAt?.getTime(),
@@ -438,18 +429,19 @@ export class RunsReplicationService {
438429
cost_in_cents: run.costInCents,
439430
base_cost_in_cents: run.baseCostInCents,
440431
tags: run.runTags ?? [],
441-
task_version: run.taskVersion,
442-
sdk_version: run.sdkVersion,
443-
cli_version: run.cliVersion,
444-
machine_preset: run.machinePreset,
445-
root_run_id: run.rootTaskRunId,
446-
parent_run_id: run.parentTaskRunId,
432+
task_version: run.taskVersion ?? "",
433+
sdk_version: run.sdkVersion ?? "",
434+
cli_version: run.cliVersion ?? "",
435+
machine_preset: run.machinePreset ?? "",
436+
root_run_id: run.rootTaskRunId ?? "",
437+
parent_run_id: run.parentTaskRunId ?? "",
447438
depth: run.depth,
448439
is_test: run.isTest,
449-
idempotency_key: run.idempotencyKey,
450-
expiration_ttl: run.ttl,
440+
idempotency_key: run.idempotencyKey ?? "",
441+
expiration_ttl: run.ttl ?? "",
451442
output,
452443
_version: _version.toString(),
444+
_is_deleted: event === "delete" ? 1 : 0,
453445
};
454446
}
455447

docker/Dockerfile

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
ARG NODE_IMAGE=node:20.11.1-bullseye-slim@sha256:5a5a92b3a8d392691c983719dbdc65d9f30085d6dcd65376e7a32e6fe9bf4cbe
22

3+
FROM golang:1.23-alpine AS goose_builder
4+
RUN go install github.com/pressly/goose/v3/cmd/goose@latest
5+
36
FROM ${NODE_IMAGE} AS pruner
47

58
WORKDIR /triggerdotdev
@@ -43,6 +46,11 @@ WORKDIR /triggerdotdev
4346
# Corepack is used to install pnpm
4447
RUN corepack enable
4548

49+
# Goose and schemas
50+
COPY --from=goose_builder /go/bin/goose /usr/local/bin/goose
51+
RUN chmod +x /usr/local/bin/goose
52+
COPY --chown=node:node internal-packages/clickhouse/schema /triggerdotdev/internal-packages/clickhouse/schema
53+
4654
COPY --from=pruner --chown=node:node /triggerdotdev/out/full/ .
4755
COPY --from=dev-deps --chown=node:node /triggerdotdev/ .
4856
COPY --chown=node:node turbo.json turbo.json
@@ -70,6 +78,10 @@ COPY --from=builder --chown=node:node /triggerdotdev/apps/webapp/public ./apps/w
7078
COPY --from=builder --chown=node:node /triggerdotdev/apps/webapp/prisma/seed.js ./apps/webapp/prisma/seed.js
7179
COPY --from=builder --chown=node:node /triggerdotdev/scripts ./scripts
7280

81+
# Goose and schemas
82+
COPY --from=builder /usr/local/bin/goose /usr/local/bin/goose
83+
COPY --from=builder --chown=node:node /triggerdotdev/internal-packages/clickhouse/schema /triggerdotdev/internal-packages/clickhouse/schema
84+
7385
EXPOSE 3000
7486

7587
USER node

0 commit comments

Comments
 (0)