Skip to content

Commit 648b581

Browse files
committed
Efficiency improvements to SharedQueueConsumer and MarQS for run engine v1
1 parent e1497c8 commit 648b581

File tree

11 files changed

+898
-685
lines changed

11 files changed

+898
-685
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@ const EnvironmentSchema = z.object({
162162
SHARED_QUEUE_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10),
163163
SHARED_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(100),
164164
SHARED_QUEUE_CONSUMER_NEXT_TICK_INTERVAL_MS: z.coerce.number().int().default(100),
165+
SHARED_QUEUE_CONSUMER_EMIT_RESUME_DEPENDENCY_TIMEOUT_MS: z.coerce.number().int().default(1000),
166+
SHARED_QUEUE_CONSUMER_RESOLVE_PAYLOADS_BATCH_SIZE: z.coerce.number().int().default(25),
165167

166168
// Development OTEL environment variables
167169
DEV_OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(),
@@ -219,6 +221,8 @@ const EnvironmentSchema = z.object({
219221
.number()
220222
.int()
221223
.default(60 * 1000 * 15),
224+
MARQS_SHARED_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(36),
225+
MARQS_DEV_QUEUE_SELECTION_COUNT: z.coerce.number().int().default(12),
222226
PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),
223227

224228
VERBOSE_GRAPHILE_LOGGING: z.string().default("false"),

apps/webapp/app/routes/admin.api.v1.marqs.ts

Lines changed: 0 additions & 31 deletions
This file was deleted.

apps/webapp/app/services/apiAuth.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
isPersonalAccessToken,
1818
} from "./personalAccessToken.server";
1919
import { isPublicJWT, validatePublicJwtKey } from "./realtime/jwtAuth.server";
20+
import { RuntimeEnvironmentForEnvRepo } from "~/v3/environmentVariables/environmentVariablesRepository.server";
2021

2122
const ClaimsSchema = z.object({
2223
scopes: z.array(z.string()).optional(),
@@ -410,7 +411,7 @@ const JWT_ALGORITHM = "HS256";
410411
const DEFAULT_JWT_EXPIRATION_IN_MS = 1000 * 60 * 60; // 1 hour
411412

412413
export async function generateJWTTokenForEnvironment(
413-
environment: RuntimeEnvironment,
414+
environment: RuntimeEnvironmentForEnvRepo,
414415
payload: Record<string, string>
415416
) {
416417
const jwt = await new SignJWT({

apps/webapp/app/v3/environmentVariables/environmentVariablesRepository.server.ts

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -654,9 +654,26 @@ export class EnvironmentVariablesRepository implements Repository {
654654
}
655655
}
656656

657+
export const RuntimeEnvironmentForEnvRepoPayload = {
658+
select: {
659+
id: true,
660+
slug: true,
661+
type: true,
662+
projectId: true,
663+
apiKey: true,
664+
organizationId: true,
665+
},
666+
} as const;
667+
668+
export type RuntimeEnvironmentForEnvRepo = Prisma.RuntimeEnvironmentGetPayload<
669+
typeof RuntimeEnvironmentForEnvRepoPayload
670+
>;
671+
657672
export const environmentVariablesRepository = new EnvironmentVariablesRepository();
658673

659-
export async function resolveVariablesForEnvironment(runtimeEnvironment: RuntimeEnvironment) {
674+
export async function resolveVariablesForEnvironment(
675+
runtimeEnvironment: RuntimeEnvironmentForEnvRepo
676+
) {
660677
const projectSecrets = await environmentVariablesRepository.getEnvironmentVariables(
661678
runtimeEnvironment.projectId,
662679
runtimeEnvironment.id
@@ -672,7 +689,9 @@ export async function resolveVariablesForEnvironment(runtimeEnvironment: Runtime
672689
return [...overridableTriggerVariables, ...projectSecrets, ...builtInVariables];
673690
}
674691

675-
async function resolveOverridableTriggerVariables(runtimeEnvironment: RuntimeEnvironment) {
692+
async function resolveOverridableTriggerVariables(
693+
runtimeEnvironment: RuntimeEnvironmentForEnvRepo
694+
) {
676695
let result: Array<EnvironmentVariable> = [
677696
{
678697
key: "TRIGGER_REALTIME_STREAM_VERSION",
@@ -683,7 +702,7 @@ async function resolveOverridableTriggerVariables(runtimeEnvironment: RuntimeEnv
683702
return result;
684703
}
685704

686-
async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment) {
705+
async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironmentForEnvRepo) {
687706
let result: Array<EnvironmentVariable> = [
688707
{
689708
key: "OTEL_EXPORTER_OTLP_ENDPOINT",
@@ -745,7 +764,7 @@ async function resolveBuiltInDevVariables(runtimeEnvironment: RuntimeEnvironment
745764
return [...result, ...commonVariables];
746765
}
747766

748-
async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironment) {
767+
async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironmentForEnvRepo) {
749768
let result: Array<EnvironmentVariable> = [
750769
{
751770
key: "TRIGGER_SECRET_KEY",
@@ -838,7 +857,7 @@ async function resolveBuiltInProdVariables(runtimeEnvironment: RuntimeEnvironmen
838857
}
839858

840859
async function resolveCommonBuiltInVariables(
841-
runtimeEnvironment: RuntimeEnvironment
860+
runtimeEnvironment: RuntimeEnvironmentForEnvRepo
842861
): Promise<Array<EnvironmentVariable>> {
843862
return [];
844863
}

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,9 @@ import { resolveVariablesForEnvironment } from "../environmentVariables/environm
1818
import { FailedTaskRunService } from "../failedTaskRun.server";
1919
import { CancelDevSessionRunsService } from "../services/cancelDevSessionRuns.server";
2020
import { CompleteAttemptService } from "../services/completeAttempt.server";
21-
import {
22-
SEMINTATTRS_FORCE_RECORDING,
23-
attributesFromAuthenticatedEnv,
24-
tracer,
25-
} from "../tracer.server";
26-
import { DevSubscriber, devPubSub } from "./devPubSub.server";
21+
import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
2722
import { getMaxDuration } from "../utils/maxDuration";
23+
import { DevSubscriber, devPubSub } from "./devPubSub.server";
2824

2925
const MessageBody = z.discriminatedUnion("type", [
3026
z.object({

0 commit comments

Comments
 (0)