Skip to content

Commit 6c62fa4

Browse files
committed
Upgrades to MarQS
- It’s “queue choosing” algorithm is now MUCH better (pretty much didn’t work before) and should be performant even when we have a bunch of prod queues - There are now concurrency limits at the environment and organization level, as well as the task/queue level. So if any of them are at capacity the message won’t be dequeued. This means we can have an org wide concurrency limit, as well as different limits for dev/prod/staging - I’ve added an admin API that can be used to update the org/env concurrency limits
1 parent 8c4df32 commit 6c62fa4

File tree

9 files changed

+670
-138
lines changed

9 files changed

+670
-138
lines changed

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ const EnvironmentSchema = z.object({
6363
REDIS_PASSWORD: z.string().optional(),
6464
REDIS_TLS_DISABLED: z.string().optional(),
6565

66+
DEFAULT_QUEUE_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(5),
67+
DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10),
6668
DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10),
6769
DEFAULT_DEV_ENV_EXECUTION_ATTEMPTS: z.coerce.number().int().positive().default(1),
6870

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { prisma } from "~/db.server";
4+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
5+
import { marqs } from "~/v3/marqs.server";
6+
7+
const ParamsSchema = z.object({
8+
environmentId: z.string(),
9+
});
10+
11+
const RequestBodySchema = z.object({
12+
envMaximumConcurrencyLimit: z.number(),
13+
orgMaximumConcurrencyLimit: z.number(),
14+
});
15+
16+
export async function action({ request, params }: ActionFunctionArgs) {
17+
// Next authenticate the request
18+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
19+
20+
if (!authenticationResult) {
21+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
22+
}
23+
24+
const user = await prisma.user.findUnique({
25+
where: {
26+
id: authenticationResult.userId,
27+
},
28+
});
29+
30+
if (!user) {
31+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
32+
}
33+
34+
if (!user.admin) {
35+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
36+
}
37+
38+
const parsedParams = ParamsSchema.parse(params);
39+
40+
const rawBody = await request.json();
41+
const body = RequestBodySchema.parse(rawBody);
42+
43+
const environment = await prisma.runtimeEnvironment.update({
44+
where: {
45+
id: parsedParams.environmentId,
46+
},
47+
data: {
48+
maximumConcurrencyLimit: body.envMaximumConcurrencyLimit,
49+
organization: {
50+
update: {
51+
data: {
52+
maximumConcurrencyLimit: body.orgMaximumConcurrencyLimit,
53+
},
54+
},
55+
},
56+
},
57+
include: {
58+
organization: true,
59+
project: true,
60+
},
61+
});
62+
63+
await marqs?.updateGlobalConcurrencyLimits(environment);
64+
65+
return json({ success: true });
66+
}

0 commit comments

Comments
 (0)