Skip to content

Commit 2131b66

Browse files
committed
insert payloads into their own table only on insert and then join
1 parent c30a014 commit 2131b66

File tree

6 files changed

+255
-59
lines changed

6 files changed

+255
-59
lines changed

apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@ import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { prisma } from "~/db.server";
33
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
44
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
5+
import { z } from "zod";
6+
7+
const schema = z.object({
8+
insertStrategy: z.enum(["streaming", "batching"]).optional(),
9+
});
510

611
export async function action({ request }: ActionFunctionArgs) {
712
// Next authenticate the request
@@ -26,7 +31,10 @@ export async function action({ request }: ActionFunctionArgs) {
2631
}
2732

2833
try {
29-
await runsReplicationInstance.start();
34+
const body = await request.json();
35+
const { insertStrategy } = schema.parse(body);
36+
37+
await runsReplicationInstance.start(insertStrategy);
3038

3139
return json({
3240
success: true,

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 162 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import type { ClickHouse, TaskRunV1 } from "@internal/clickhouse";
1+
import type { ClickHouse, TaskRunV1, RawTaskRunPayloadV1 } from "@internal/clickhouse";
22
import { RedisOptions } from "@internal/redis";
33
import { LogicalReplicationClient, Transaction, type PgoutputMessage } from "@internal/replication";
44
import { Logger } from "@trigger.dev/core/logger";
@@ -25,6 +25,8 @@ export type RunsReplicationServiceOptions = {
2525
flushBatchSize?: number;
2626
};
2727

28+
type TaskRunInsert = { _version: bigint; run: TaskRun; event: "insert" | "update" };
29+
2830
export class RunsReplicationService {
2931
private _lastLsn: string | null = null;
3032
private _isSubscribed = false;
@@ -36,7 +38,7 @@ export class RunsReplicationService {
3638
| null = null;
3739

3840
private _replicationClient: LogicalReplicationClient;
39-
private _concurrentFlushScheduler: ConcurrentFlushScheduler<{ _version: bigint; run: TaskRun }>;
41+
private _concurrentFlushScheduler: ConcurrentFlushScheduler<TaskRunInsert>;
4042
private logger: Logger;
4143
private _lastReplicationLagMs: number | null = null;
4244
private _transactionCounter?: Counter;
@@ -64,10 +66,7 @@ export class RunsReplicationService {
6466
ackIntervalSeconds: 10,
6567
});
6668

67-
this._concurrentFlushScheduler = new ConcurrentFlushScheduler<{
68-
_version: bigint;
69-
run: TaskRun;
70-
}>({
69+
this._concurrentFlushScheduler = new ConcurrentFlushScheduler<TaskRunInsert>({
7170
batchSize: options.flushBatchSize ?? 50,
7271
flushInterval: options.flushIntervalMs ?? 100,
7372
maxConcurrency: options.maxFlushConcurrency ?? 100,
@@ -128,7 +127,9 @@ export class RunsReplicationService {
128127
}
129128
}
130129

131-
async start() {
130+
async start(insertStrategy?: "streaming" | "batching") {
131+
this._insertStrategy = insertStrategy ?? this._insertStrategy;
132+
132133
this.logger.info("Starting replication client", {
133134
lastLsn: this._lastLsn,
134135
});
@@ -216,6 +217,20 @@ export class RunsReplicationService {
216217
return;
217218
}
218219

220+
const relevantEvents = transaction.events.filter(
221+
(event) => event.tag === "insert" || event.tag === "update"
222+
);
223+
224+
if (relevantEvents.length === 0) {
225+
this.logger.debug("No relevant events", {
226+
transaction,
227+
});
228+
229+
await this._replicationClient.acknowledge(transaction.commitEndLsn);
230+
231+
return;
232+
}
233+
219234
this.logger.debug("Handling transaction", {
220235
transaction,
221236
});
@@ -227,13 +242,21 @@ export class RunsReplicationService {
227242

228243
if (this._insertStrategy === "streaming") {
229244
await this._concurrentFlushScheduler.addToBatch(
230-
transaction.events.map((event) => ({ _version, run: event.data }))
245+
relevantEvents.map((event) => ({
246+
_version,
247+
run: event.data,
248+
event: event.tag as "insert" | "update",
249+
}))
231250
);
232251
} else {
233252
const [flushError] = await tryCatch(
234253
this.#flushBatch(
235254
nanoid(),
236-
transaction.events.map((event) => ({ _version, run: event.data }))
255+
relevantEvents.map((event) => ({
256+
_version,
257+
run: event.data,
258+
event: event.tag as "insert" | "update",
259+
}))
237260
)
238261
);
239262

@@ -247,7 +270,7 @@ export class RunsReplicationService {
247270
await this._replicationClient.acknowledge(transaction.commitEndLsn);
248271
}
249272

250-
async #flushBatch(flushId: string, batch: Array<{ _version: bigint; run: TaskRun }>) {
273+
async #flushBatch(flushId: string, batch: Array<TaskRunInsert>) {
251274
if (batch.length === 0) {
252275
this.logger.debug("No runs to flush", {
253276
flushId,
@@ -260,19 +283,37 @@ export class RunsReplicationService {
260283
batchSize: batch.length,
261284
});
262285

263-
const preparedRuns = await Promise.all(batch.map(this.#prepareRun.bind(this)));
264-
const runsToInsert = preparedRuns.filter(Boolean);
286+
const preparedInserts = await Promise.all(batch.map(this.#prepareRunInserts.bind(this)));
265287

266-
if (runsToInsert.length === 0) {
267-
this.logger.debug("No runs to insert", {
268-
flushId,
269-
batchSize: batch.length,
270-
});
271-
return;
272-
}
288+
const taskRunInserts = preparedInserts
289+
.map(({ taskRunInsert }) => taskRunInsert)
290+
.filter(Boolean);
291+
292+
const payloadInserts = preparedInserts
293+
.map(({ payloadInsert }) => payloadInsert)
294+
.filter(Boolean);
295+
296+
this.logger.info("Flushing inserts", {
297+
flushId,
298+
taskRunInserts: taskRunInserts.length,
299+
payloadInserts: payloadInserts.length,
300+
});
273301

302+
await Promise.all([
303+
this.#insertTaskRunInserts(taskRunInserts),
304+
this.#insertPayloadInserts(payloadInserts),
305+
]);
306+
307+
this.logger.info("Flushed inserts", {
308+
flushId,
309+
taskRunInserts: taskRunInserts.length,
310+
payloadInserts: payloadInserts.length,
311+
});
312+
}
313+
314+
async #insertTaskRunInserts(taskRunInserts: TaskRunV1[]) {
274315
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insert(
275-
runsToInsert,
316+
taskRunInserts,
276317
{
277318
params: {
278319
clickhouse_settings: {
@@ -283,51 +324,100 @@ export class RunsReplicationService {
283324
);
284325

285326
if (insertError) {
286-
this.logger.error("Error inserting runs", {
327+
this.logger.error("Error inserting task run inserts", {
287328
error: insertError,
288-
flushId,
289-
batchSize: batch.length,
290329
});
291-
} else {
292-
this.logger.info("Flushed batch", {
293-
flushId,
294-
insertResult,
330+
}
331+
332+
return insertResult;
333+
}
334+
335+
async #insertPayloadInserts(payloadInserts: RawTaskRunPayloadV1[]) {
336+
const [insertError, insertResult] = await this.options.clickhouse.taskRuns.insertPayloads(
337+
payloadInserts,
338+
{
339+
params: {
340+
clickhouse_settings: {
341+
wait_for_async_insert: this._insertStrategy === "batching" ? 1 : 0,
342+
},
343+
},
344+
}
345+
);
346+
347+
if (insertError) {
348+
this.logger.error("Error inserting payload inserts", {
349+
error: insertError,
295350
});
296351
}
352+
353+
return insertResult;
297354
}
298355

299-
async #prepareRun(batchedRun: {
300-
run: TaskRun;
301-
_version: bigint;
302-
}): Promise<TaskRunV1 | undefined> {
356+
async #prepareRunInserts(
357+
batchedRun: TaskRunInsert
358+
): Promise<{ taskRunInsert?: TaskRunV1; payloadInsert?: RawTaskRunPayloadV1 }> {
303359
this.logger.debug("Preparing run", {
304360
batchedRun,
305361
});
306362

307-
const { run, _version } = batchedRun;
363+
const { run, _version, event } = batchedRun;
308364

309365
if (!run.environmentType) {
310-
return undefined;
366+
return {
367+
taskRunInsert: undefined,
368+
payloadInsert: undefined,
369+
};
311370
}
312371

313372
if (!run.organizationId) {
314-
return undefined;
373+
return {
374+
taskRunInsert: undefined,
375+
payloadInsert: undefined,
376+
};
315377
}
316378

317-
const [payload, output] = await Promise.all([
318-
this.#prepareJson(run.payload, run.payloadType),
319-
this.#prepareJson(run.output, run.outputType),
379+
if (event === "update") {
380+
const taskRunInsert = await this.#prepareTaskRunInsert(
381+
run,
382+
run.organizationId,
383+
run.environmentType,
384+
_version
385+
);
386+
387+
return {
388+
taskRunInsert,
389+
payloadInsert: undefined,
390+
};
391+
}
392+
393+
const [taskRunInsert, payloadInsert] = await Promise.all([
394+
this.#prepareTaskRunInsert(run, run.organizationId, run.environmentType, _version),
395+
this.#preparePayloadInsert(run, _version),
320396
]);
321397

398+
return {
399+
taskRunInsert,
400+
payloadInsert,
401+
};
402+
}
403+
404+
async #prepareTaskRunInsert(
405+
run: TaskRun,
406+
organizationId: string,
407+
environmentType: string,
408+
_version: bigint
409+
): Promise<TaskRunV1> {
410+
const output = await this.#prepareJson(run.output, run.outputType);
411+
322412
return {
323413
environment_id: run.runtimeEnvironmentId,
324-
organization_id: run.organizationId,
414+
organization_id: organizationId,
325415
project_id: run.projectId,
326416
run_id: run.id,
327417
updated_at: run.updatedAt.getTime(),
328418
created_at: run.createdAt.getTime(),
329419
status: run.status,
330-
environment_type: run.environmentType,
420+
environment_type: environmentType,
331421
friendly_id: run.friendlyId,
332422
engine: run.engine,
333423
task_identifier: run.taskIdentifier,
@@ -347,7 +437,7 @@ export class RunsReplicationService {
347437
usage_duration_ms: run.usageDurationMs,
348438
cost_in_cents: run.costInCents,
349439
base_cost_in_cents: run.baseCostInCents,
350-
tags: run.runTags,
440+
tags: run.runTags ?? [],
351441
task_version: run.taskVersion,
352442
sdk_version: run.sdkVersion,
353443
cli_version: run.cliVersion,
@@ -358,22 +448,31 @@ export class RunsReplicationService {
358448
is_test: run.isTest,
359449
idempotency_key: run.idempotencyKey,
360450
expiration_ttl: run.ttl,
361-
payload,
362451
output,
363452
_version: _version.toString(),
364453
};
365454
}
366455

456+
async #preparePayloadInsert(run: TaskRun, _version: bigint): Promise<RawTaskRunPayloadV1> {
457+
const payload = await this.#prepareJson(run.payload, run.payloadType);
458+
459+
return {
460+
run_id: run.id,
461+
created_at: run.createdAt.getTime(),
462+
payload,
463+
};
464+
}
465+
367466
async #prepareJson(
368467
data: string | undefined | null,
369468
dataType: string
370-
): Promise<unknown | undefined> {
469+
): Promise<{ data: unknown }> {
371470
if (!data) {
372-
return undefined;
471+
return { data: undefined };
373472
}
374473

375474
if (dataType !== "application/json" && dataType !== "application/super+json") {
376-
return undefined;
475+
return { data: undefined };
377476
}
378477

379478
const packet = {
@@ -384,7 +483,7 @@ export class RunsReplicationService {
384483
const parsedData = await parsePacket(packet);
385484

386485
if (!parsedData) {
387-
return undefined;
486+
return { data: undefined };
388487
}
389488

390489
return { data: parsedData };
@@ -453,6 +552,24 @@ export class ConcurrentFlushScheduler<T> {
453552
},
454553
registers: [this.metricsRegister],
455554
});
555+
556+
new Gauge({
557+
name: "concurrent_flush_scheduler_active_concurrency",
558+
help: "Number of active concurrency",
559+
collect() {
560+
this.set(scheduler.concurrencyLimiter.activeCount);
561+
},
562+
registers: [this.metricsRegister],
563+
});
564+
565+
new Gauge({
566+
name: "concurrent_flush_scheduler_pending_concurrency",
567+
help: "Number of pending concurrency",
568+
collect() {
569+
this.set(scheduler.concurrencyLimiter.pendingCount);
570+
},
571+
registers: [this.metricsRegister],
572+
});
456573
}
457574
}
458575

0 commit comments

Comments
 (0)