Skip to content

Commit d1e4064

Browse files
authored
Allow creating and monitoring run replication services with different settings (#2055)
* Allow creating and monitoring run replication services with different settings * Fix test
1 parent a8e38d9 commit d1e4064

14 files changed

+406
-13
lines changed

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -771,6 +771,7 @@ const EnvironmentSchema = z.object({
771771
RUN_REPLICATION_WAIT_FOR_ASYNC_INSERT: z.string().default("0"),
772772
RUN_REPLICATION_KEEP_ALIVE_ENABLED: z.string().default("1"),
773773
RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS: z.coerce.number().int().default(9_000),
774+
RUN_REPLICATION_MAX_OPEN_CONNECTIONS: z.coerce.number().int().default(10),
774775
});
775776

776777
export type Environment = z.infer<typeof EnvironmentSchema>;
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { prisma } from "~/db.server";
3+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
4+
import { z } from "zod";
5+
import { ClickHouse } from "@internal/clickhouse";
6+
import { env } from "~/env.server";
7+
import { RunsReplicationService } from "~/services/runsReplicationService.server";
8+
import {
9+
getRunsReplicationGlobal,
10+
setRunsReplicationGlobal,
11+
} from "~/services/runsReplicationGlobal.server";
12+
13+
const CreateRunReplicationServiceParams = z.object({
14+
name: z.string(),
15+
keepAliveEnabled: z.boolean(),
16+
keepAliveIdleSocketTtl: z.number(),
17+
maxOpenConnections: z.number(),
18+
maxFlushConcurrency: z.number(),
19+
flushIntervalMs: z.number(),
20+
flushBatchSize: z.number(),
21+
leaderLockTimeoutMs: z.number(),
22+
leaderLockExtendIntervalMs: z.number(),
23+
leaderLockAcquireAdditionalTimeMs: z.number(),
24+
leaderLockRetryIntervalMs: z.number(),
25+
ackIntervalSeconds: z.number(),
26+
waitForAsyncInsert: z.boolean(),
27+
});
28+
29+
type CreateRunReplicationServiceParams = z.infer<typeof CreateRunReplicationServiceParams>;
30+
31+
export async function action({ request }: ActionFunctionArgs) {
32+
// Next authenticate the request
33+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
34+
35+
if (!authenticationResult) {
36+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
37+
}
38+
39+
const user = await prisma.user.findUnique({
40+
where: {
41+
id: authenticationResult.userId,
42+
},
43+
});
44+
45+
if (!user) {
46+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
47+
}
48+
49+
if (!user.admin) {
50+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
51+
}
52+
53+
try {
54+
const globalService = getRunsReplicationGlobal();
55+
56+
if (globalService) {
57+
return json(
58+
{ error: "Global runs replication service already exists. Stop it first." },
59+
{ status: 400 }
60+
);
61+
}
62+
63+
const params = CreateRunReplicationServiceParams.parse(await request.json());
64+
65+
const service = createRunReplicationService(params);
66+
67+
setRunsReplicationGlobal(service);
68+
69+
await service.start();
70+
71+
return json({
72+
success: true,
73+
});
74+
} catch (error) {
75+
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
76+
}
77+
}
78+
79+
function createRunReplicationService(params: CreateRunReplicationServiceParams) {
80+
const clickhouse = new ClickHouse({
81+
url: env.RUN_REPLICATION_CLICKHOUSE_URL,
82+
name: params.name,
83+
keepAlive: {
84+
enabled: params.keepAliveEnabled,
85+
idleSocketTtl: params.keepAliveIdleSocketTtl,
86+
},
87+
logLevel: "debug",
88+
compression: {
89+
request: true,
90+
},
91+
maxOpenConnections: params.maxOpenConnections,
92+
});
93+
94+
const service = new RunsReplicationService({
95+
clickhouse: clickhouse,
96+
pgConnectionUrl: env.DATABASE_URL,
97+
serviceName: params.name,
98+
slotName: env.RUN_REPLICATION_SLOT_NAME,
99+
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
100+
redisOptions: {
101+
keyPrefix: "runs-replication:",
102+
port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
103+
host: env.RUN_REPLICATION_REDIS_HOST ?? undefined,
104+
username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined,
105+
password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined,
106+
enableAutoPipelining: true,
107+
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
108+
},
109+
maxFlushConcurrency: params.maxFlushConcurrency,
110+
flushIntervalMs: params.flushIntervalMs,
111+
flushBatchSize: params.flushBatchSize,
112+
leaderLockTimeoutMs: params.leaderLockTimeoutMs,
113+
leaderLockExtendIntervalMs: params.leaderLockExtendIntervalMs,
114+
leaderLockAcquireAdditionalTimeMs: params.leaderLockAcquireAdditionalTimeMs,
115+
leaderLockRetryIntervalMs: params.leaderLockRetryIntervalMs,
116+
ackIntervalSeconds: params.ackIntervalSeconds,
117+
logLevel: "debug",
118+
waitForAsyncInsert: params.waitForAsyncInsert,
119+
});
120+
121+
return service;
122+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { prisma } from "~/db.server";
4+
import { startTcpBufferMonitor } from "~/services/monitorTcpBuffers.server";
5+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
6+
import { getTcpMonitorGlobal, setTcpMonitorGlobal } from "~/services/runsReplicationGlobal.server";
7+
8+
const schema = z.object({
9+
intervalMs: z.number().min(1000).max(60_000).default(5_000),
10+
});
11+
12+
export async function action({ request }: ActionFunctionArgs) {
13+
// Next authenticate the request
14+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
15+
16+
if (!authenticationResult) {
17+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
18+
}
19+
20+
const user = await prisma.user.findUnique({
21+
where: {
22+
id: authenticationResult.userId,
23+
},
24+
});
25+
26+
if (!user) {
27+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
28+
}
29+
30+
if (!user.admin) {
31+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
32+
}
33+
34+
try {
35+
const body = await request.json();
36+
const { intervalMs } = schema.parse(body);
37+
38+
const globalMonitor = getTcpMonitorGlobal();
39+
40+
if (globalMonitor) {
41+
return json(
42+
{
43+
error: "Tcp buffer monitor already running, you must stop it before starting a new one",
44+
},
45+
{
46+
status: 400,
47+
}
48+
);
49+
}
50+
51+
setTcpMonitorGlobal(startTcpBufferMonitor(intervalMs));
52+
53+
return json({
54+
success: true,
55+
});
56+
} catch (error) {
57+
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
58+
}
59+
}

apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { prisma } from "~/db.server";
33
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
4+
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
45
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
56

67
export async function action({ request }: ActionFunctionArgs) {
@@ -26,7 +27,13 @@ export async function action({ request }: ActionFunctionArgs) {
2627
}
2728

2829
try {
29-
await runsReplicationInstance?.start();
30+
const globalService = getRunsReplicationGlobal();
31+
32+
if (globalService) {
33+
await globalService.start();
34+
} else {
35+
await runsReplicationInstance?.start();
36+
}
3037

3138
return json({
3239
success: true,
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { prisma } from "~/db.server";
3+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
4+
import {
5+
getTcpMonitorGlobal,
6+
unregisterTcpMonitorGlobal,
7+
} from "~/services/runsReplicationGlobal.server";
8+
9+
export async function action({ request }: ActionFunctionArgs) {
10+
// Next authenticate the request
11+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
12+
13+
if (!authenticationResult) {
14+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
15+
}
16+
17+
const user = await prisma.user.findUnique({
18+
where: {
19+
id: authenticationResult.userId,
20+
},
21+
});
22+
23+
if (!user) {
24+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
25+
}
26+
27+
if (!user.admin) {
28+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
29+
}
30+
31+
try {
32+
const globalMonitor = getTcpMonitorGlobal();
33+
34+
if (!globalMonitor) {
35+
return json({ error: "Tcp buffer monitor not running" }, { status: 400 });
36+
}
37+
38+
clearInterval(globalMonitor);
39+
unregisterTcpMonitorGlobal();
40+
41+
return json({
42+
success: true,
43+
});
44+
} catch (error) {
45+
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
46+
}
47+
}

apps/webapp/app/routes/admin.api.v1.runs-replication.stop.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { prisma } from "~/db.server";
33
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
4+
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
45
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
56

67
export async function action({ request }: ActionFunctionArgs) {
@@ -26,7 +27,13 @@ export async function action({ request }: ActionFunctionArgs) {
2627
}
2728

2829
try {
29-
await runsReplicationInstance?.stop();
30+
const globalService = getRunsReplicationGlobal();
31+
32+
if (globalService) {
33+
await globalService.stop();
34+
} else {
35+
await runsReplicationInstance?.stop();
36+
}
3037

3138
return json({
3239
success: true,

apps/webapp/app/routes/admin.api.v1.runs-replication.teardown.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { prisma } from "~/db.server";
33
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
4+
import {
5+
getRunsReplicationGlobal,
6+
unregisterRunsReplicationGlobal,
7+
} from "~/services/runsReplicationGlobal.server";
48
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
59

610
export async function action({ request }: ActionFunctionArgs) {
@@ -26,7 +30,14 @@ export async function action({ request }: ActionFunctionArgs) {
2630
}
2731

2832
try {
29-
await runsReplicationInstance?.teardown();
33+
const globalService = getRunsReplicationGlobal();
34+
35+
if (globalService) {
36+
await globalService.teardown();
37+
unregisterRunsReplicationGlobal();
38+
} else {
39+
await runsReplicationInstance?.teardown();
40+
}
3041

3142
return json({
3243
success: true,
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
// monitorTcpBuffers.ts
2+
import fs from "fs/promises";
3+
import os from "os";
4+
import { logger } from "./logger.server";
5+
6+
/**
7+
* Parse /proc/net/sockstat and /proc/sys/net/* every `intervalMs`
8+
* and log the numbers. You can pivot these logs into CloudWatch
9+
* metrics with a filter pattern if you like.
10+
*/
11+
export function startTcpBufferMonitor(intervalMs = 5_000) {
12+
async function sampleOnce() {
13+
try {
14+
const [sockstat, wmemMax, tcpMem] = await Promise.all([
15+
fs.readFile("/proc/net/sockstat", "utf8"),
16+
fs.readFile("/proc/sys/net/core/wmem_max", "utf8"),
17+
fs.readFile("/proc/sys/net/ipv4/tcp_mem", "utf8"),
18+
]);
19+
20+
logger.debug("tcp-buffer-monitor", {
21+
sockstat,
22+
wmemMax,
23+
tcpMem,
24+
});
25+
26+
// /proc/net/sockstat has lines like:
27+
// TCP: inuse 5 orphan 0 tw 0 alloc 6 mem 409
28+
const tcpLine = sockstat.split("\n").find((l) => l.startsWith("TCP:")) ?? "";
29+
const fields = tcpLine.trim().split(/\s+/);
30+
const inUse = Number(fields[2]); // open sockets
31+
const alloc = Number(fields[8]); // total sockets with buffers
32+
const memPages = Number(fields[10]); // pages (4 kB each)
33+
const memBytes = memPages * 4096;
34+
35+
const wmemMaxBytes = Number(wmemMax.trim());
36+
const [low, pressure, high] = tcpMem
37+
.trim()
38+
.split(/\s+/)
39+
.map((n) => Number(n) * 4096); // pages → bytes
40+
41+
logger.debug("tcp-buffer-monitor", {
42+
t: Date.now(),
43+
host: os.hostname(),
44+
sockets_in_use: inUse,
45+
sockets_alloc: alloc,
46+
tcp_mem_bytes: memBytes,
47+
tcp_mem_high: high,
48+
wmem_max: wmemMaxBytes,
49+
});
50+
} catch (err) {
51+
// Log and keep going; most errors are “file disappeared for a moment”
52+
console.error("tcp-buffer-monitor error", err);
53+
}
54+
}
55+
56+
return setInterval(sampleOnce, intervalMs);
57+
}

0 commit comments

Comments
 (0)