Skip to content

Commit a946797

Browse files
authored
v3: Upgrades to MarQS (#989)
* Upgrades to MarQS - It’s “queue choosing” algorithm is now MUCH better (pretty much didn’t work before) and should be performant even when we have a bunch of prod queues - There are now concurrency limits at the environment and organization level, as well as the task/queue level. So if any of them are at capacity the message won’t be dequeued. This means we can have an org wide concurrency limit, as well as different limits for dev/prod/staging - I’ve added an admin API that can be used to update the org/env concurrency limits * More MarQS upgrades - Extract the queue priortity choosing strategy into an interface - Implement a much better weighted average strategy - “Slide the window” of queue candidates if the parent queue sends all at-capacity queues (so we won’t get stuck attempting to choose the same 12 full queues) - Added some unit tests for the priority stuff - Added some ideas for expanding the priority choosing strategy with more dynamic features * Fixes an issue with the shared queue consumer relying on the queue name to get the env id
1 parent 8c4df32 commit a946797

26 files changed

+1880
-242
lines changed

apps/webapp/app/env.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ const EnvironmentSchema = z.object({
6363
REDIS_PASSWORD: z.string().optional(),
6464
REDIS_TLS_DISABLED: z.string().optional(),
6565

66+
DEFAULT_QUEUE_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(5),
67+
DEFAULT_ENV_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10),
6668
DEFAULT_ORG_EXECUTION_CONCURRENCY_LIMIT: z.coerce.number().int().default(10),
6769
DEFAULT_DEV_ENV_EXECUTION_ATTEMPTS: z.coerce.number().int().positive().default(1),
6870

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { prisma } from "~/db.server";
4+
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
5+
import { marqs } from "~/v3/marqs/index.server";
6+
7+
const ParamsSchema = z.object({
8+
environmentId: z.string(),
9+
});
10+
11+
const RequestBodySchema = z.object({
12+
envMaximumConcurrencyLimit: z.number(),
13+
orgMaximumConcurrencyLimit: z.number(),
14+
});
15+
16+
export async function action({ request, params }: ActionFunctionArgs) {
17+
// Next authenticate the request
18+
const authenticationResult = await authenticateApiRequestWithPersonalAccessToken(request);
19+
20+
if (!authenticationResult) {
21+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
22+
}
23+
24+
const user = await prisma.user.findUnique({
25+
where: {
26+
id: authenticationResult.userId,
27+
},
28+
});
29+
30+
if (!user) {
31+
return json({ error: "Invalid or Missing API key" }, { status: 401 });
32+
}
33+
34+
if (!user.admin) {
35+
return json({ error: "You must be an admin to perform this action" }, { status: 403 });
36+
}
37+
38+
const parsedParams = ParamsSchema.parse(params);
39+
40+
const rawBody = await request.json();
41+
const body = RequestBodySchema.parse(rawBody);
42+
43+
const environment = await prisma.runtimeEnvironment.update({
44+
where: {
45+
id: parsedParams.environmentId,
46+
},
47+
data: {
48+
maximumConcurrencyLimit: body.envMaximumConcurrencyLimit,
49+
organization: {
50+
update: {
51+
data: {
52+
maximumConcurrencyLimit: body.orgMaximumConcurrencyLimit,
53+
},
54+
},
55+
},
56+
},
57+
include: {
58+
organization: true,
59+
project: true,
60+
},
61+
});
62+
63+
await marqs?.updateEnvConcurrencyLimits(environment);
64+
65+
return json({ success: true });
66+
}

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1313
import { logger } from "~/services/logger.server";
1414
import { EnvironmentVariablesRepository } from "../environmentVariables/environmentVariablesRepository.server";
1515
import { generateFriendlyId } from "../friendlyIdentifiers";
16-
import { marqs } from "../marqs.server";
16+
import { marqs } from "~/v3/marqs/index.server";
1717
import { CancelAttemptService } from "../services/cancelAttempt.server";
1818
import { CompleteAttemptService } from "../services/completeAttempt.server";
1919
import { attributesFromAuthenticatedEnv } from "../tracer.server";

0 commit comments

Comments
 (0)