Skip to content

Commit e5ea9cb

Browse files
committed
move the release concurrency queue into the release concurrency system, make it disabled by default, configure the run engine in the webapp with env vars
1 parent 28b3ed0 commit e5ea9cb

File tree

7 files changed

+146
-69
lines changed

7 files changed

+146
-69
lines changed

apps/webapp/app/env.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,13 @@ const EnvironmentSchema = z.object({
565565
RUN_ENGINE_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),
566566
RUN_ENGINE_RATE_LIMIT_LIMITER_LOGS_ENABLED: z.string().default("0"),
567567

568+
RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"),
569+
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1),
570+
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES: z.coerce.number().int().default(3),
571+
RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT: z.coerce.number().int().default(1),
572+
RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL: z.coerce.number().int().default(500),
573+
RUN_ENGINE_RELEASE_CONCURRENCY_BATCH_SIZE: z.coerce.number().int().default(10),
574+
568575
/** How long should the presence ttl last */
569576
DEV_PRESENCE_TTL_MS: z.coerce.number().int().default(30_000),
570577
DEV_PRESENCE_POLL_INTERVAL_MS: z.coerce.number().int().default(5_000),

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { RunEngine } from "@internal/run-engine";
2+
import { defaultMachine } from "@trigger.dev/platform/v3";
23
import { prisma } from "~/db.server";
34
import { env } from "~/env.server";
4-
import { tracer } from "./tracer.server";
55
import { singleton } from "~/utils/singleton";
6-
import { defaultMachine, machines } from "@trigger.dev/platform/v3";
76
import { allMachines } from "./machinePresets.server";
7+
import { tracer } from "./tracer.server";
88

99
export const engine = singleton("RunEngine", createRunEngine);
1010

@@ -73,6 +73,23 @@ function createRunEngine() {
7373
EXECUTING: env.RUN_ENGINE_TIMEOUT_EXECUTING,
7474
EXECUTING_WITH_WAITPOINTS: env.RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS,
7575
},
76+
releaseConcurrency: {
77+
disabled: env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "0",
78+
maxTokensRatio: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO,
79+
maxRetries: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES,
80+
consumersCount: env.RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT,
81+
pollInterval: env.RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL,
82+
batchSize: env.RUN_ENGINE_RELEASE_CONCURRENCY_BATCH_SIZE,
83+
redis: {
84+
keyPrefix: "engine:",
85+
port: env.RUN_ENGINE_RUN_QUEUE_REDIS_PORT ?? undefined,
86+
host: env.RUN_ENGINE_RUN_QUEUE_REDIS_HOST ?? undefined,
87+
username: env.RUN_ENGINE_RUN_QUEUE_REDIS_USERNAME ?? undefined,
88+
password: env.RUN_ENGINE_RUN_QUEUE_REDIS_PASSWORD ?? undefined,
89+
enableAutoPipelining: true,
90+
...(env.RUN_ENGINE_RUN_QUEUE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
91+
},
92+
},
7693
});
7794

7895
return engine;

internal-packages/run-engine/src/engine/index.ts

Lines changed: 51 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import {
3535
} from "./errors.js";
3636
import { EventBus, EventBusEvents } from "./eventBus.js";
3737
import { RunLocker } from "./locking.js";
38-
import { ReleaseConcurrencyTokenBucketQueue } from "./releaseConcurrencyTokenBucketQueue.js";
3938
import { BatchSystem } from "./systems/batchSystem.js";
4039
import { CheckpointSystem } from "./systems/checkpointSystem.js";
4140
import { DelayedRunSystem } from "./systems/delayedRunSystem.js";
@@ -63,11 +62,6 @@ export class RunEngine {
6362
private logger = new Logger("RunEngine", "debug");
6463
private tracer: Tracer;
6564
private heartbeatTimeouts: HeartbeatTimeouts;
66-
releaseConcurrencyQueue: ReleaseConcurrencyTokenBucketQueue<{
67-
orgId: string;
68-
projectId: string;
69-
envId: string;
70-
}>;
7165
eventBus: EventBus = new EventEmitter<EventBusEvents>();
7266
executionSnapshotSystem: ExecutionSnapshotSystem;
7367
runAttemptSystem: RunAttemptSystem;
@@ -184,51 +178,6 @@ export class RunEngine {
184178
...(options.heartbeatTimeoutsMs ?? {}),
185179
};
186180

187-
// Initialize the ReleaseConcurrencyQueue
188-
this.releaseConcurrencyQueue = new ReleaseConcurrencyTokenBucketQueue({
189-
redis: {
190-
...options.queue.redis, // Use base queue redis options
191-
...options.releaseConcurrency?.redis, // Allow overrides
192-
keyPrefix: `${options.queue.redis.keyPrefix ?? ""}release-concurrency:`,
193-
},
194-
retry: {
195-
maxRetries: options.releaseConcurrency?.maxRetries ?? 5,
196-
backoff: {
197-
minDelay: options.releaseConcurrency?.backoff?.minDelay ?? 1000,
198-
maxDelay: options.releaseConcurrency?.backoff?.maxDelay ?? 10000,
199-
factor: options.releaseConcurrency?.backoff?.factor ?? 2,
200-
},
201-
},
202-
consumersCount: options.releaseConcurrency?.consumersCount ?? 1,
203-
pollInterval: options.releaseConcurrency?.pollInterval ?? 1000,
204-
batchSize: options.releaseConcurrency?.batchSize ?? 10,
205-
executor: async (descriptor, snapshotId) => {
206-
await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot(snapshotId);
207-
},
208-
maxTokens: async (descriptor) => {
209-
const environment = await this.prisma.runtimeEnvironment.findFirstOrThrow({
210-
where: { id: descriptor.envId },
211-
select: {
212-
maximumConcurrencyLimit: true,
213-
},
214-
});
215-
216-
return (
217-
environment.maximumConcurrencyLimit * (options.releaseConcurrency?.maxTokensRatio ?? 1.0)
218-
);
219-
},
220-
keys: {
221-
fromDescriptor: (descriptor) =>
222-
`org:${descriptor.orgId}:proj:${descriptor.projectId}:env:${descriptor.envId}`,
223-
toDescriptor: (name) => ({
224-
orgId: name.split(":")[1],
225-
projectId: name.split(":")[3],
226-
envId: name.split(":")[5],
227-
}),
228-
},
229-
tracer: this.tracer,
230-
});
231-
232181
const resources: SystemResources = {
233182
prisma: this.prisma,
234183
worker: this.worker,
@@ -237,11 +186,60 @@ export class RunEngine {
237186
tracer: this.tracer,
238187
runLock: this.runLock,
239188
runQueue: this.runQueue,
240-
releaseConcurrencyQueue: this.releaseConcurrencyQueue,
241189
};
242190

243191
this.releaseConcurrencySystem = new ReleaseConcurrencySystem({
244192
resources,
193+
queueOptions:
194+
typeof options.releaseConcurrency?.disabled === "boolean" &&
195+
options.releaseConcurrency.disabled
196+
? undefined
197+
: {
198+
redis: {
199+
...options.queue.redis, // Use base queue redis options
200+
...options.releaseConcurrency?.redis, // Allow overrides
201+
keyPrefix: `${options.queue.redis.keyPrefix ?? ""}release-concurrency:`,
202+
},
203+
retry: {
204+
maxRetries: options.releaseConcurrency?.maxRetries ?? 5,
205+
backoff: {
206+
minDelay: options.releaseConcurrency?.backoff?.minDelay ?? 1000,
207+
maxDelay: options.releaseConcurrency?.backoff?.maxDelay ?? 10000,
208+
factor: options.releaseConcurrency?.backoff?.factor ?? 2,
209+
},
210+
},
211+
consumersCount: options.releaseConcurrency?.consumersCount ?? 1,
212+
pollInterval: options.releaseConcurrency?.pollInterval ?? 1000,
213+
batchSize: options.releaseConcurrency?.batchSize ?? 10,
214+
executor: async (descriptor, snapshotId) => {
215+
await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot(
216+
snapshotId
217+
);
218+
},
219+
maxTokens: async (descriptor) => {
220+
const environment = await this.prisma.runtimeEnvironment.findFirstOrThrow({
221+
where: { id: descriptor.envId },
222+
select: {
223+
maximumConcurrencyLimit: true,
224+
},
225+
});
226+
227+
return (
228+
environment.maximumConcurrencyLimit *
229+
(options.releaseConcurrency?.maxTokensRatio ?? 1.0)
230+
);
231+
},
232+
keys: {
233+
fromDescriptor: (descriptor) =>
234+
`org:${descriptor.orgId}:proj:${descriptor.projectId}:env:${descriptor.envId}`,
235+
toDescriptor: (name) => ({
236+
orgId: name.split(":")[1],
237+
projectId: name.split(":")[3],
238+
envId: name.split(":")[5],
239+
}),
240+
},
241+
tracer: this.tracer,
242+
},
245243
});
246244

247245
this.executionSnapshotSystem = new ExecutionSnapshotSystem({
@@ -1213,7 +1211,7 @@ export class RunEngine {
12131211
async quit() {
12141212
try {
12151213
//stop the run queue
1216-
await this.releaseConcurrencyQueue.quit();
1214+
await this.releaseConcurrencySystem.quit();
12171215
await this.runQueue.quit();
12181216
await this.worker.stop();
12191217
await this.runLock.quit();

internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ import { SystemResources } from "./systems.js";
33
import { getLatestExecutionSnapshot } from "./executionSnapshotSystem.js";
44
import { canReleaseConcurrency } from "../statuses.js";
55
import { z } from "zod";
6+
import {
7+
ReleaseConcurrencyQueueOptions,
8+
ReleaseConcurrencyTokenBucketQueue,
9+
} from "../releaseConcurrencyTokenBucketQueue.js";
610

711
const ReleaseConcurrencyMetadata = z.object({
812
releaseConcurrency: z.boolean().optional(),
@@ -12,17 +16,65 @@ type ReleaseConcurrencyMetadata = z.infer<typeof ReleaseConcurrencyMetadata>;
1216

1317
export type ReleaseConcurrencySystemOptions = {
1418
resources: SystemResources;
19+
queueOptions?: ReleaseConcurrencyQueueOptions<{
20+
orgId: string;
21+
projectId: string;
22+
envId: string;
23+
}>;
1524
};
1625

1726
export class ReleaseConcurrencySystem {
1827
private readonly $: SystemResources;
28+
releaseConcurrencyQueue?: ReleaseConcurrencyTokenBucketQueue<{
29+
orgId: string;
30+
projectId: string;
31+
envId: string;
32+
}>;
1933

2034
constructor(private readonly options: ReleaseConcurrencySystemOptions) {
2135
this.$ = options.resources;
36+
37+
if (options.queueOptions) {
38+
this.releaseConcurrencyQueue = new ReleaseConcurrencyTokenBucketQueue(options.queueOptions);
39+
}
40+
}
41+
42+
public async consumeToken(
43+
descriptor: { orgId: string; projectId: string; envId: string },
44+
releaserId: string
45+
) {
46+
if (!this.releaseConcurrencyQueue) {
47+
return;
48+
}
49+
50+
await this.releaseConcurrencyQueue.consumeToken(descriptor, releaserId);
51+
}
52+
53+
public async returnToken(
54+
descriptor: { orgId: string; projectId: string; envId: string },
55+
releaserId: string
56+
) {
57+
if (!this.releaseConcurrencyQueue) {
58+
return;
59+
}
60+
61+
await this.releaseConcurrencyQueue.returnToken(descriptor, releaserId);
62+
}
63+
64+
public async quit() {
65+
if (!this.releaseConcurrencyQueue) {
66+
return;
67+
}
68+
69+
await this.releaseConcurrencyQueue.quit();
2270
}
2371

2472
public async checkpointCreatedOnEnvironment(environment: RuntimeEnvironment) {
25-
await this.$.releaseConcurrencyQueue.refillTokens(
73+
if (!this.releaseConcurrencyQueue) {
74+
return;
75+
}
76+
77+
await this.releaseConcurrencyQueue.refillTokens(
2678
{
2779
orgId: environment.organizationId,
2880
projectId: environment.projectId,
@@ -33,12 +85,16 @@ export class ReleaseConcurrencySystem {
3385
}
3486

3587
public async releaseConcurrencyForSnapshot(snapshot: TaskRunExecutionSnapshot) {
88+
if (!this.releaseConcurrencyQueue) {
89+
return;
90+
}
91+
3692
// Go ahead and release concurrency immediately if the run is in a development environment
3793
if (snapshot.environmentType === "DEVELOPMENT") {
3894
return await this.executeReleaseConcurrencyForSnapshot(snapshot.id);
3995
}
4096

41-
await this.$.releaseConcurrencyQueue.attemptToRelease(
97+
await this.releaseConcurrencyQueue.attemptToRelease(
4298
{
4399
orgId: snapshot.organizationId,
44100
projectId: snapshot.projectId,
@@ -49,6 +105,10 @@ export class ReleaseConcurrencySystem {
49105
}
50106

51107
public async executeReleaseConcurrencyForSnapshot(snapshotId: string) {
108+
if (!this.releaseConcurrencyQueue) {
109+
return;
110+
}
111+
52112
this.$.logger.debug("Executing released concurrency", {
53113
snapshotId,
54114
});

internal-packages/run-engine/src/engine/systems/systems.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import { RunQueue } from "../../run-queue/index.js";
55
import { EventBus } from "../eventBus.js";
66
import { RunLocker } from "../locking.js";
77
import { EngineWorker } from "../types.js";
8-
import { ReleaseConcurrencyTokenBucketQueue } from "../releaseConcurrencyTokenBucketQueue.js";
98

109
export type SystemResources = {
1110
prisma: PrismaClient;
@@ -15,9 +14,4 @@ export type SystemResources = {
1514
tracer: Tracer;
1615
runLock: RunLocker;
1716
runQueue: RunQueue;
18-
releaseConcurrencyQueue: ReleaseConcurrencyTokenBucketQueue<{
19-
orgId: string;
20-
projectId: string;
21-
envId: string;
22-
}>;
2317
};

internal-packages/run-engine/src/engine/tests/releaseConcurrency.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,7 @@ describe("RunEngine Releasing Concurrency", () => {
673673
projectId: authenticatedEnvironment.projectId,
674674
});
675675

676-
await engine.releaseConcurrencyQueue.consumeToken(
676+
await engine.releaseConcurrencySystem.consumeToken(
677677
{
678678
orgId: authenticatedEnvironment.organizationId,
679679
projectId: authenticatedEnvironment.projectId,
@@ -708,7 +708,7 @@ describe("RunEngine Releasing Concurrency", () => {
708708
expect(envConcurrencyAfter).toBe(1);
709709

710710
// Now we return the token so the concurrency can be released
711-
await engine.releaseConcurrencyQueue.returnToken(
711+
await engine.releaseConcurrencySystem.returnToken(
712712
{
713713
orgId: authenticatedEnvironment.organizationId,
714714
projectId: authenticatedEnvironment.projectId,
@@ -835,7 +835,7 @@ describe("RunEngine Releasing Concurrency", () => {
835835
projectId: authenticatedEnvironment.projectId,
836836
});
837837

838-
await engine.releaseConcurrencyQueue.consumeToken(
838+
await engine.releaseConcurrencySystem.consumeToken(
839839
{
840840
orgId: authenticatedEnvironment.organizationId,
841841
projectId: authenticatedEnvironment.projectId,
@@ -890,7 +890,7 @@ describe("RunEngine Releasing Concurrency", () => {
890890
expect(snapshot.executionStatus).toBe("SUSPENDED");
891891

892892
// Now we return the token so the concurrency can be released
893-
await engine.releaseConcurrencyQueue.returnToken(
893+
await engine.releaseConcurrencySystem.returnToken(
894894
{
895895
orgId: authenticatedEnvironment.organizationId,
896896
projectId: authenticatedEnvironment.projectId,
@@ -1017,7 +1017,7 @@ describe("RunEngine Releasing Concurrency", () => {
10171017
projectId: authenticatedEnvironment.projectId,
10181018
});
10191019

1020-
await engine.releaseConcurrencyQueue.consumeToken(
1020+
await engine.releaseConcurrencySystem.consumeToken(
10211021
{
10221022
orgId: authenticatedEnvironment.organizationId,
10231023
projectId: authenticatedEnvironment.projectId,
@@ -1063,7 +1063,7 @@ describe("RunEngine Releasing Concurrency", () => {
10631063
expect(executionDataAfter?.snapshot.executionStatus).toBe("EXECUTING");
10641064

10651065
// Now we return the token so the concurrency can be released
1066-
await engine.releaseConcurrencyQueue.returnToken(
1066+
await engine.releaseConcurrencySystem.returnToken(
10671067
{
10681068
orgId: authenticatedEnvironment.organizationId,
10691069
projectId: authenticatedEnvironment.projectId,

internal-packages/run-engine/src/engine/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ export type RunEngineOptions = {
3737
queueRunsWaitingForWorkerBatchSize?: number;
3838
tracer: Tracer;
3939
releaseConcurrency?: {
40+
disabled?: boolean;
4041
maxTokensRatio?: number;
4142
redis?: Partial<RedisOptions>;
4243
maxRetries?: number;

0 commit comments

Comments
 (0)