Skip to content

Commit 5d7e490

Browse files
committed
Configure the new queue selection strategy in the webapp and get it all building and typechecks passing
1 parent a3b7d4c commit 5d7e490

File tree

13 files changed

+81
-64
lines changed

13 files changed

+81
-64
lines changed

apps/webapp/app/env.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,12 @@ const EnvironmentSchema = z.object({
419419
RUN_ENGINE_TIMEOUT_EXECUTING: z.coerce.number().int().default(60_000),
420420
RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().default(60_000),
421421
RUN_ENGINE_DEBUG_WORKER_NOTIFICATIONS: z.coerce.boolean().default(false),
422+
RUN_ENGINE_PARENT_QUEUE_LIMIT: z.coerce.number().int().default(1000),
423+
RUN_ENGINE_CONCURRENCY_LIMIT_BIAS: z.coerce.number().default(0.75),
424+
RUN_ENGINE_AVAILABLE_CAPACITY_BIAS: z.coerce.number().default(0.3),
425+
RUN_ENGINE_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25),
426+
RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
427+
RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),
422428

423429
RUN_ENGINE_WORKER_REDIS_HOST: z
424430
.string()

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,17 @@ function createRunEngine() {
4343
enableAutoPipelining: true,
4444
...(env.RUN_ENGINE_RUN_QUEUE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
4545
},
46+
queueSelectionStrategyOptions: {
47+
parentQueueLimit: env.RUN_ENGINE_PARENT_QUEUE_LIMIT,
48+
biases: {
49+
concurrencyLimitBias: env.RUN_ENGINE_CONCURRENCY_LIMIT_BIAS,
50+
availableCapacityBias: env.RUN_ENGINE_AVAILABLE_CAPACITY_BIAS,
51+
queueAgeRandomization: env.RUN_ENGINE_QUEUE_AGE_RANDOMIZATION_BIAS,
52+
},
53+
reuseSnapshotCount: env.RUN_ENGINE_REUSE_SNAPSHOT_COUNT,
54+
maximumEnvCount: env.RUN_ENGINE_MAXIMUM_ENV_COUNT,
55+
tracer,
56+
},
4657
},
4758
runLock: {
4859
redis: {

apps/webapp/remix.config.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ module.exports = {
2626
"superjson",
2727
"prismjs/components/prism-json",
2828
"prismjs/components/prism-typescript",
29+
"@internal/run-engine",
30+
"@internal/redis",
31+
"@internal/tracing",
2932
],
3033
browserNodeBuiltinsPolyfill: { modules: { path: true, os: true, crypto: true } },
3134
watchPaths: async () => {

apps/webapp/tsconfig.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@
4040
"@internal/redis-worker": ["../../internal-packages/redis-worker/src/index"],
4141
"@internal/redis-worker/*": ["../../internal-packages/redis-worker/src/*"],
4242
"@internal/redis": ["../../internal-packages/redis/src/index"],
43-
"@internal/redis/*": ["../../internal-packages/redis/src/*"]
43+
"@internal/redis/*": ["../../internal-packages/redis/src/*"],
44+
"@internal/tracing": ["../../internal-packages/tracing/src/index"],
45+
"@internal/tracing/*": ["../../internal-packages/tracing/src/*"]
4446
},
4547
"noEmit": true
4648
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import { nanoid } from "nanoid";
5353
import { EventEmitter } from "node:events";
5454
import { z } from "zod";
5555
import { RunQueue } from "../run-queue/index.js";
56-
import { FairDequeuingStrategy } from "../run-queue/fairDequeuingStrategy.js";
56+
import { FairQueueSelectionStrategy } from "../run-queue/fairQueueSelectionStrategy.js";
5757
import { MinimalAuthenticatedEnvironment } from "../shared/index.js";
5858
import { MAX_TASK_RUN_ATTEMPTS } from "./consts.js";
5959
import { getRunWithBackgroundWorkerTasks } from "./db/worker.js";
@@ -159,9 +159,10 @@ export class RunEngine {
159159
name: "rq",
160160
tracer: trace.getTracer("rq"),
161161
keys,
162-
queuePriorityStrategy: new FairDequeuingStrategy({
162+
queueSelectionStrategy: new FairQueueSelectionStrategy({
163163
keys,
164164
redis: { ...options.queue.redis, keyPrefix: `${options.queue.redis.keyPrefix}runqueue:` },
165+
defaultEnvConcurrencyLimit: options.queue?.defaultEnvConcurrency ?? 10,
165166
}),
166167
defaultEnvConcurrency: options.queue?.defaultEnvConcurrency ?? 10,
167168
logger: new Logger("RunQueue", "debug"),

internal-packages/run-engine/src/engine/locking.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
1-
import Redlock, { RedlockAbortSignal } from "redlock";
1+
// import { default: Redlock } from "redlock";
2+
const { default: Redlock } = require("redlock");
23
import { AsyncLocalStorage } from "async_hooks";
34
import { Redis } from "@internal/redis";
5+
import * as redlock from "redlock";
46

57
interface LockContext {
68
resources: string;
7-
signal: RedlockAbortSignal;
9+
signal: redlock.RedlockAbortSignal;
810
}
911

1012
export class RunLocker {
11-
private redlock: Redlock;
13+
private redlock: InstanceType<typeof redlock.default>;
1214
private asyncLocalStorage: AsyncLocalStorage<LockContext>;
1315

1416
constructor(options: { redis: Redis }) {
@@ -26,7 +28,7 @@ export class RunLocker {
2628
async lock<T>(
2729
resources: string[],
2830
duration: number,
29-
routine: (signal: RedlockAbortSignal) => Promise<T>
31+
routine: (signal: redlock.RedlockAbortSignal) => Promise<T>
3032
): Promise<T> {
3133
const currentContext = this.asyncLocalStorage.getStore();
3234
const joinedResources = resources.sort().join(",");

internal-packages/run-engine/src/engine/types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { MachinePreset, MachinePresetName, QueueOptions, RetryOptions } from "@t
44
import { PrismaClient } from "@trigger.dev/database";
55
import { type RedisOptions } from "@internal/redis";
66
import { MinimalAuthenticatedEnvironment } from "../shared/index.js";
7+
import { FairQueueSelectionStrategyOptions } from "../run-queue/fairQueueSelectionStrategy.js";
78

89
export type RunEngineOptions = {
910
prisma: PrismaClient;
@@ -21,6 +22,10 @@ export type RunEngineOptions = {
2122
redis: RedisOptions;
2223
retryOptions?: RetryOptions;
2324
defaultEnvConcurrency?: number;
25+
queueSelectionStrategyOptions?: Pick<
26+
FairQueueSelectionStrategyOptions,
27+
"parentQueueLimit" | "tracer" | "biases" | "reuseSnapshotCount" | "maximumEnvCount"
28+
>;
2429
};
2530
runLock: {
2631
redis: RedisOptions;

internal-packages/run-engine/src/run-queue/fairDequeuingStrategy.test.ts renamed to internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.test.ts

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { redisTest } from "@internal/testcontainers";
22
import { describe, expect, vi } from "vitest";
3-
import { FairDequeuingStrategy } from "./fairDequeuingStrategy.js";
3+
import { FairQueueSelectionStrategy } from "./fairQueueSelectionStrategy.js";
44
import { RunQueueFullKeyProducer } from "./keyProducer.js";
55
import { createRedisClient, Redis, RedisOptions } from "@internal/redis";
66
import { EnvQueues, RunQueueKeyProducer } from "./types.js";
@@ -13,7 +13,7 @@ describe("FairDequeuingStrategy", () => {
1313
"should distribute a single queue from a single env",
1414
async ({ redisOptions: redis }) => {
1515
const keyProducer = new RunQueueFullKeyProducer();
16-
const strategy = new FairDequeuingStrategy({
16+
const strategy = new FairQueueSelectionStrategy({
1717
redis,
1818
keys: keyProducer,
1919
defaultEnvConcurrencyLimit: 5,
@@ -47,7 +47,7 @@ describe("FairDequeuingStrategy", () => {
4747

4848
redisTest("should respect env concurrency limits", async ({ redisOptions: redis }) => {
4949
const keyProducer = new RunQueueFullKeyProducer();
50-
const strategy = new FairDequeuingStrategy({
50+
const strategy = new FairQueueSelectionStrategy({
5151
redis,
5252
keys: keyProducer,
5353
defaultEnvConcurrencyLimit: 2,
@@ -80,7 +80,7 @@ describe("FairDequeuingStrategy", () => {
8080
"should give extra concurrency when the env has reserve concurrency",
8181
async ({ redisOptions: redis }) => {
8282
const keyProducer = new RunQueueFullKeyProducer();
83-
const strategy = new FairDequeuingStrategy({
83+
const strategy = new FairQueueSelectionStrategy({
8484
redis,
8585
keys: keyProducer,
8686
defaultEnvConcurrencyLimit: 2,
@@ -126,7 +126,7 @@ describe("FairDequeuingStrategy", () => {
126126

127127
redisTest("should respect parentQueueLimit", async ({ redisOptions: redis }) => {
128128
const keyProducer = new RunQueueFullKeyProducer();
129-
const strategy = new FairDequeuingStrategy({
129+
const strategy = new FairQueueSelectionStrategy({
130130
redis,
131131
keys: keyProducer,
132132
defaultEnvConcurrencyLimit: 5,
@@ -185,7 +185,7 @@ describe("FairDequeuingStrategy", () => {
185185
"should reuse snapshots across calls for the same consumer",
186186
async ({ redisOptions: redis }) => {
187187
const keyProducer = new RunQueueFullKeyProducer();
188-
const strategy = new FairDequeuingStrategy({
188+
const strategy = new FairQueueSelectionStrategy({
189189
redis,
190190
keys: keyProducer,
191191
defaultEnvConcurrencyLimit: 5,
@@ -282,7 +282,7 @@ describe("FairDequeuingStrategy", () => {
282282
"should fairly distribute queues across environments over time",
283283
async ({ redisOptions: redis }) => {
284284
const keyProducer = new RunQueueFullKeyProducer();
285-
const strategy = new FairDequeuingStrategy({
285+
const strategy = new FairQueueSelectionStrategy({
286286
redis,
287287
keys: keyProducer,
288288
defaultEnvConcurrencyLimit: 5,
@@ -440,7 +440,7 @@ describe("FairDequeuingStrategy", () => {
440440
"should shuffle environments while maintaining age order within environments",
441441
async ({ redisOptions: redis }) => {
442442
const keyProducer = new RunQueueFullKeyProducer();
443-
const strategy = new FairDequeuingStrategy({
443+
const strategy = new FairQueueSelectionStrategy({
444444
redis,
445445
keys: keyProducer,
446446
defaultEnvConcurrencyLimit: 5,
@@ -623,7 +623,7 @@ describe("FairDequeuingStrategy", () => {
623623
const strategies = Array.from(
624624
{ length: numStrategies },
625625
(_, i) =>
626-
new FairDequeuingStrategy({
626+
new FairQueueSelectionStrategy({
627627
redis,
628628
keys: keyProducer,
629629
defaultEnvConcurrencyLimit: 5,
@@ -708,7 +708,7 @@ describe("FairDequeuingStrategy", () => {
708708

709709
// Helper function to run iterations with a specific age influence
710710
async function runWithQueueAgeRandomization(queueAgeRandomization: number) {
711-
const strategy = new FairDequeuingStrategy({
711+
const strategy = new FairQueueSelectionStrategy({
712712
redis,
713713
keys: keyProducer,
714714
defaultEnvConcurrencyLimit: 5,
@@ -799,7 +799,7 @@ describe("FairDequeuingStrategy", () => {
799799
"should respect maximumEnvCount and select envs based on queue ages",
800800
async ({ redisOptions: redis }) => {
801801
const keyProducer = new RunQueueFullKeyProducer();
802-
const strategy = new FairDequeuingStrategy({
802+
const strategy = new FairQueueSelectionStrategy({
803803
redis,
804804
keys: keyProducer,
805805
defaultEnvConcurrencyLimit: 5,
@@ -942,7 +942,7 @@ describe("FairDequeuingStrategy", () => {
942942
"should not overly bias picking environments when queue have priority offset ages",
943943
async ({ redisOptions: redis }) => {
944944
const keyProducer = new RunQueueFullKeyProducer();
945-
const strategy = new FairDequeuingStrategy({
945+
const strategy = new FairQueueSelectionStrategy({
946946
redis,
947947
keys: keyProducer,
948948
defaultEnvConcurrencyLimit: 5,

internal-packages/run-engine/src/run-queue/fairDequeuingStrategy.ts renamed to internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1+
import { createRedisClient, Redis, type RedisOptions } from "@internal/redis";
2+
import { startSpan, type Tracer } from "@internal/tracing";
13
import { createCache, DefaultStatefulContext, Namespace, Cache as UnkeyCache } from "@unkey/cache";
24
import { MemoryStore } from "@unkey/cache/stores";
35
import { randomUUID } from "crypto";
6+
import seedrandom from "seedrandom";
47
import {
58
EnvDescriptor,
69
EnvQueues,
7-
RunQueueFairDequeueStrategy,
810
RunQueueKeyProducer,
11+
RunQueueSelectionStrategy,
912
} from "./types.js";
10-
import seedrandom from "seedrandom";
11-
import { startSpan, type Tracer } from "@internal/tracing";
12-
import { Redis, type RedisOptions, createRedisClient } from "@internal/redis";
1313

14-
export type FairDequeuingStrategyBiases = {
14+
export type FairQueueSelectionStrategyBiases = {
1515
/**
1616
* How much to bias towards environments with higher concurrency limits
1717
* 0 = no bias, 1 = full bias based on limit differences
@@ -33,7 +33,7 @@ export type FairDequeuingStrategyBiases = {
3333
queueAgeRandomization: number;
3434
};
3535

36-
export type FairDequeuingStrategyOptions = {
36+
export type FairQueueSelectionStrategyOptions = {
3737
redis: RedisOptions;
3838
keys: RunQueueKeyProducer;
3939
defaultEnvConcurrencyLimit?: number;
@@ -44,7 +44,7 @@ export type FairDequeuingStrategyOptions = {
4444
* Configure biasing for environment shuffling
4545
* If not provided, no biasing will be applied (completely random shuffling)
4646
*/
47-
biases?: FairDequeuingStrategyBiases;
47+
biases?: FairQueueSelectionStrategyBiases;
4848
reuseSnapshotCount?: number;
4949
maximumEnvCount?: number;
5050
};
@@ -79,13 +79,13 @@ const emptyFairQueueSnapshot: FairQueueSnapshot = {
7979
queues: [],
8080
};
8181

82-
const defaultBiases: FairDequeuingStrategyBiases = {
82+
const defaultBiases: FairQueueSelectionStrategyBiases = {
8383
concurrencyLimitBias: 0,
8484
availableCapacityBias: 0,
8585
queueAgeRandomization: 0, // Default to completely age-based ordering
8686
};
8787

88-
export class FairDequeuingStrategy implements RunQueueFairDequeueStrategy {
88+
export class FairQueueSelectionStrategy implements RunQueueSelectionStrategy {
8989
private _cache: UnkeyCache<{
9090
concurrencyLimit: number;
9191
}>;
@@ -100,7 +100,7 @@ export class FairDequeuingStrategy implements RunQueueFairDequeueStrategy {
100100
private _defaultEnvConcurrencyLimit: number;
101101
private _parentQueueLimit: number;
102102

103-
constructor(private options: FairDequeuingStrategyOptions) {
103+
constructor(private options: FairQueueSelectionStrategyOptions) {
104104
const ctx = new DefaultStatefulContext();
105105
const memory = new MemoryStore({ persistentMap: new Map() });
106106

@@ -612,7 +612,7 @@ export class FairDequeuingStrategy implements RunQueueFairDequeueStrategy {
612612
}
613613
}
614614

615-
export class NoopFairDequeuingStrategy implements RunQueueFairDequeueStrategy {
615+
export class NoopFairDequeuingStrategy implements RunQueueSelectionStrategy {
616616
async distributeFairQueuesFromParentQueue(
617617
parentQueue: string,
618618
consumerId: string

0 commit comments

Comments
 (0)