Skip to content

Commit c30a014

Browse files
committed
get it working
1 parent ee13ebb commit c30a014

19 files changed

+799
-15
lines changed

.configs/prometheus.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
global:
2+
scrape_interval: 15s # how often to scrape targets
3+
4+
scrape_configs:
5+
- job_name: "trigger-dev"
6+
static_configs:
7+
- targets: ["localhost:3030"]

apps/webapp/app/env.server.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -725,6 +725,35 @@ const EnvironmentSchema = z.object({
725725
// BetterStack
726726
BETTERSTACK_API_KEY: z.string().optional(),
727727
BETTERSTACK_STATUS_PAGE_ID: z.string().optional(),
728+
729+
RUN_REPLICATION_REDIS_HOST: z
730+
.string()
731+
.optional()
732+
.transform((v) => v ?? process.env.REDIS_HOST),
733+
RUN_REPLICATION_REDIS_READER_HOST: z
734+
.string()
735+
.optional()
736+
.transform((v) => v ?? process.env.REDIS_READER_HOST),
737+
RUN_REPLICATION_REDIS_READER_PORT: z.coerce
738+
.number()
739+
.optional()
740+
.transform(
741+
(v) =>
742+
v ?? (process.env.REDIS_READER_PORT ? parseInt(process.env.REDIS_READER_PORT) : undefined)
743+
),
744+
RUN_REPLICATION_REDIS_PORT: z.coerce
745+
.number()
746+
.optional()
747+
.transform((v) => v ?? (process.env.REDIS_PORT ? parseInt(process.env.REDIS_PORT) : undefined)),
748+
RUN_REPLICATION_REDIS_USERNAME: z
749+
.string()
750+
.optional()
751+
.transform((v) => v ?? process.env.REDIS_USERNAME),
752+
RUN_REPLICATION_REDIS_PASSWORD: z
753+
.string()
754+
.optional()
755+
.transform((v) => v ?? process.env.REDIS_PASSWORD),
756+
RUN_REPLICATION_REDIS_TLS_DISABLED: z.string().default(process.env.REDIS_TLS_DISABLED ?? "false"),
728757
});
729758

730759
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/metrics.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import { env } from "./env.server";
44

55
export const metricsRegister = singleton("metricsRegister", initializeMetricsRegister);
66

7-
function initializeMetricsRegister() {
7+
export type MetricsRegister = Registry<OpenMetricsContentType>;
8+
9+
function initializeMetricsRegister(): MetricsRegister {
810
const registry = new Registry<OpenMetricsContentType>();
911

1012
register.setDefaultLabels({
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { prisma } from "~/db.server";
3+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
4+
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
5+
6+
export async function action({ request }: ActionFunctionArgs) {
7+
// Next authenticate the request
8+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
9+
10+
if (!authenticationResult) {
11+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
12+
}
13+
14+
const user = await prisma.user.findUnique({
15+
where: {
16+
id: authenticationResult.userId,
17+
},
18+
});
19+
20+
if (!user) {
21+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
22+
}
23+
24+
if (!user.admin) {
25+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
26+
}
27+
28+
try {
29+
await runsReplicationInstance.start();
30+
31+
return json({
32+
success: true,
33+
});
34+
} catch (error) {
35+
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
36+
}
37+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { prisma } from "~/db.server";
3+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
4+
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
5+
6+
export async function action({ request }: ActionFunctionArgs) {
7+
// Next authenticate the request
8+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
9+
10+
if (!authenticationResult) {
11+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
12+
}
13+
14+
const user = await prisma.user.findUnique({
15+
where: {
16+
id: authenticationResult.userId,
17+
},
18+
});
19+
20+
if (!user) {
21+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
22+
}
23+
24+
if (!user.admin) {
25+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
26+
}
27+
28+
try {
29+
await runsReplicationInstance.stop();
30+
31+
return json({
32+
success: true,
33+
});
34+
} catch (error) {
35+
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
36+
}
37+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { prisma } from "~/db.server";
3+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
4+
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
5+
6+
export async function action({ request }: ActionFunctionArgs) {
7+
// Next authenticate the request
8+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
9+
10+
if (!authenticationResult) {
11+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
12+
}
13+
14+
const user = await prisma.user.findUnique({
15+
where: {
16+
id: authenticationResult.userId,
17+
},
18+
});
19+
20+
if (!user) {
21+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
22+
}
23+
24+
if (!user.admin) {
25+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
26+
}
27+
28+
try {
29+
await runsReplicationInstance.teardown();
30+
31+
return json({
32+
success: true,
33+
});
34+
} catch (error) {
35+
return json({ error: error instanceof Error ? error.message : error }, { status: 400 });
36+
}
37+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { ClickHouse } from "@internal/clickhouse";
2+
import { RunsReplicationService } from "./runsReplicationService.server";
3+
import { singleton } from "~/utils/singleton";
4+
import invariant from "tiny-invariant";
5+
import { env } from "~/env.server";
6+
import { metricsRegister } from "~/metrics.server";
7+
8+
export const runsReplicationInstance = singleton(
9+
"runsReplicationInstance",
10+
initializeRunsReplicationInstance
11+
);
12+
13+
function initializeRunsReplicationInstance() {
14+
const { DATABASE_URL } = process.env;
15+
invariant(typeof DATABASE_URL === "string", "DATABASE_URL env var not set");
16+
17+
const clickhouse = ClickHouse.fromEnv();
18+
19+
const service = new RunsReplicationService({
20+
clickhouse: clickhouse,
21+
pgConnectionUrl: DATABASE_URL,
22+
serviceName: "runs-replication",
23+
slotName: "task_runs_to_clickhouse_v1",
24+
publicationName: "task_runs_to_clickhouse_v1_publication",
25+
redisOptions: {
26+
keyPrefix: "runs-replication:",
27+
port: env.RUN_REPLICATION_REDIS_PORT ?? undefined,
28+
host: env.RUN_REPLICATION_REDIS_HOST ?? undefined,
29+
username: env.RUN_REPLICATION_REDIS_USERNAME ?? undefined,
30+
password: env.RUN_REPLICATION_REDIS_PASSWORD ?? undefined,
31+
enableAutoPipelining: true,
32+
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
33+
},
34+
metricsRegister: metricsRegister,
35+
});
36+
37+
return service;
38+
}

0 commit comments

Comments
 (0)