Skip to content

Commit a8b3c70

Browse files
authored
Merge pull request #1804 from triggerdotdev/re2-reserve-concurrency
re2: New release concurrency system
2 parents 71c7b53 + 38e1887 commit a8b3c70

File tree

83 files changed

+11287
-4868
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+11287
-4868
lines changed

.cursor/mcp.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@
44
"url": "http://localhost:3333/sse"
55
}
66
}
7-
}
7+
}

.vscode/launch.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,15 +138,15 @@
138138
"type": "node-terminal",
139139
"request": "launch",
140140
"name": "Debug RunEngine tests",
141-
"command": "pnpm run test --filter @internal/run-engine",
142-
"cwd": "${workspaceFolder}",
141+
"command": "pnpm run test ./src/engine/tests/releaseConcurrencyQueue.test.ts -t 'Should manage token bucket and queue correctly'",
142+
"cwd": "${workspaceFolder}/internal-packages/run-engine",
143143
"sourceMaps": true
144144
},
145145
{
146146
"type": "node-terminal",
147147
"request": "launch",
148148
"name": "Debug RunQueue tests",
149-
"command": "pnpm run test ./src/engine/tests/waitpoints.test.ts",
149+
"command": "pnpm run test ./src/run-queue/index.test.ts",
150150
"cwd": "${workspaceFolder}/internal-packages/run-engine",
151151
"sourceMaps": true
152152
}

apps/webapp/app/env.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,13 @@ const EnvironmentSchema = z.object({
565565
RUN_ENGINE_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),
566566
RUN_ENGINE_RATE_LIMIT_LIMITER_LOGS_ENABLED: z.string().default("0"),
567567

568+
RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"),
569+
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1),
570+
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES: z.coerce.number().int().default(3),
571+
RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT: z.coerce.number().int().default(1),
572+
RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL: z.coerce.number().int().default(500),
573+
RUN_ENGINE_RELEASE_CONCURRENCY_BATCH_SIZE: z.coerce.number().int().default(10),
574+
568575
/** How long should the presence ttl last */
569576
DEV_PRESENCE_TTL_MS: z.coerce.number().int().default(30_000),
570577
DEV_PRESENCE_POLL_INTERVAL_MS: z.coerce.number().int().default(5_000),

apps/webapp/app/routes/admin.api.v1.environments.$environmentId.ts

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { ActionFunctionArgs, json, LoaderFunctionArgs } from "@remix-run/server-
22
import { z } from "zod";
33
import { prisma } from "~/db.server";
44
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
5-
import { marqs } from "~/v3/marqs/index.server";
5+
import { engine } from "~/v3/runEngine.server";
66
import { updateEnvConcurrencyLimits } from "~/v3/runQueue.server";
77

88
const ParamsSchema = z.object({
@@ -113,20 +113,15 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
113113
Object.fromEntries(requestUrl.searchParams.entries())
114114
);
115115

116-
const concurrencyLimit = await marqs.getEnvConcurrencyLimit(environment);
117-
const currentConcurrency = await marqs.currentConcurrencyOfEnvironment(environment);
118-
const reserveConcurrency = await marqs.reserveConcurrencyOfEnvironment(environment);
116+
const concurrencyLimit = await engine.runQueue.getEnvConcurrencyLimit(environment);
117+
const currentConcurrency = await engine.runQueue.currentConcurrencyOfEnvironment(environment);
119118

120119
if (searchParams.queue) {
121-
const queueConcurrencyLimit = await marqs.getQueueConcurrencyLimit(
120+
const queueConcurrencyLimit = await engine.runQueue.getQueueConcurrencyLimit(
122121
environment,
123122
searchParams.queue
124123
);
125-
const queueCurrentConcurrency = await marqs.currentConcurrencyOfQueue(
126-
environment,
127-
searchParams.queue
128-
);
129-
const queueReserveConcurrency = await marqs.reserveConcurrencyOfQueue(
124+
const queueCurrentConcurrency = await engine.runQueue.currentConcurrencyOfQueue(
130125
environment,
131126
searchParams.queue
132127
);
@@ -135,12 +130,10 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
135130
id: environment.id,
136131
concurrencyLimit,
137132
currentConcurrency,
138-
reserveConcurrency,
139133
queueConcurrencyLimit,
140134
queueCurrentConcurrency,
141-
queueReserveConcurrency,
142135
});
143136
}
144137

145-
return json({ id: environment.id, concurrencyLimit, currentConcurrency, reserveConcurrency });
138+
return json({ id: environment.id, concurrencyLimit, currentConcurrency });
146139
}

apps/webapp/app/routes/engine.v1.runs.$runFriendlyId.wait.duration.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,9 @@ const { action } = createActionApiRoute(
4848
const waitResult = await engine.blockRunWithWaitpoint({
4949
runId: run.id,
5050
waitpoints: waitpoint.id,
51-
environmentId: authentication.environment.id,
5251
projectId: authentication.environment.project.id,
5352
organizationId: authentication.environment.organization.id,
54-
releaseConcurrency: {
55-
releaseQueue: true,
56-
},
53+
releaseConcurrency: body.releaseConcurrency,
5754
});
5855

5956
return json({

apps/webapp/app/routes/engine.v1.runs.$runFriendlyId.waitpoints.tokens.$waitpointFriendlyId.wait.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ const { action } = createActionApiRoute(
3434
throw json({ error: "Waitpoint not found" }, { status: 404 });
3535
}
3636

37+
// TODO: Add releaseConcurrency from the body
3738
const result = await engine.blockRunWithWaitpoint({
3839
runId,
3940
waitpoints: [waitpointId],
40-
environmentId: authentication.environment.id,
4141
projectId: authentication.environment.project.id,
4242
organizationId: authentication.environment.organization.id,
4343
});

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { RunEngine } from "@internal/run-engine";
2+
import { defaultMachine } from "@trigger.dev/platform/v3";
23
import { prisma } from "~/db.server";
34
import { env } from "~/env.server";
4-
import { tracer } from "./tracer.server";
55
import { singleton } from "~/utils/singleton";
6-
import { defaultMachine, machines } from "@trigger.dev/platform/v3";
76
import { allMachines } from "./machinePresets.server";
7+
import { tracer } from "./tracer.server";
88

99
export const engine = singleton("RunEngine", createRunEngine);
1010

@@ -73,6 +73,23 @@ function createRunEngine() {
7373
EXECUTING: env.RUN_ENGINE_TIMEOUT_EXECUTING,
7474
EXECUTING_WITH_WAITPOINTS: env.RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS,
7575
},
76+
releaseConcurrency: {
77+
disabled: env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "0",
78+
maxTokensRatio: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO,
79+
maxRetries: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES,
80+
consumersCount: env.RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT,
81+
pollInterval: env.RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL,
82+
batchSize: env.RUN_ENGINE_RELEASE_CONCURRENCY_BATCH_SIZE,
83+
redis: {
84+
keyPrefix: "engine:",
85+
port: env.RUN_ENGINE_RUN_QUEUE_REDIS_PORT ?? undefined,
86+
host: env.RUN_ENGINE_RUN_QUEUE_REDIS_HOST ?? undefined,
87+
username: env.RUN_ENGINE_RUN_QUEUE_REDIS_USERNAME ?? undefined,
88+
password: env.RUN_ENGINE_RUN_QUEUE_REDIS_PASSWORD ?? undefined,
89+
enableAutoPipelining: true,
90+
...(env.RUN_ENGINE_RUN_QUEUE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
91+
},
92+
},
7693
});
7794

7895
return engine;

apps/webapp/app/v3/services/triggerTaskV2.server.ts

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import {
44
packetRequiresOffloading,
55
QueueOptions,
66
SemanticInternalAttributes,
7+
TaskRunError,
8+
taskRunErrorEnhancer,
9+
taskRunErrorToString,
710
TriggerTaskRequestBody,
811
} from "@trigger.dev/core/v3";
912
import {
@@ -164,10 +167,10 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
164167
index: options.batchIndex ?? 0,
165168
}
166169
: undefined,
167-
environmentId: environment.id,
168170
projectId: environment.projectId,
169171
organizationId: environment.organizationId,
170172
tx: this._prisma,
173+
releaseConcurrency: body.options?.releaseConcurrency,
171174
});
172175
}
173176
);
@@ -271,7 +274,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
271274
immediate: true,
272275
},
273276
async (event, traceContext, traceparent) => {
274-
const run = await autoIncrementCounter.incrementInTransaction(
277+
const result = await autoIncrementCounter.incrementInTransaction(
275278
`v3-run:${environment.id}:${taskId}`,
276279
async (num, tx) => {
277280
const lockedToBackgroundWorker = body.options?.lockToVersion
@@ -370,11 +373,18 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
370373
: undefined,
371374
machine: body.options?.machine,
372375
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
376+
releaseConcurrency: body.options?.releaseConcurrency,
373377
},
374378
this._prisma
375379
);
376380

377-
return { run: taskRun, isCached: false };
381+
const error = taskRun.error ? TaskRunError.parse(taskRun.error) : undefined;
382+
383+
if (error) {
384+
event.failWithError(error);
385+
}
386+
387+
return { run: taskRun, error, isCached: false };
378388
},
379389
async (_, tx) => {
380390
const counter = await tx.taskRunNumberCounter.findFirst({
@@ -390,7 +400,13 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
390400
this._prisma
391401
);
392402

393-
return run;
403+
if (result?.error) {
404+
throw new ServiceValidationError(
405+
taskRunErrorToString(taskRunErrorEnhancer(result.error))
406+
);
407+
}
408+
409+
return result;
394410
}
395411
);
396412
} catch (error) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
-- AlterEnum
2+
ALTER TYPE "TaskRunExecutionStatus"
3+
ADD
4+
VALUE 'QUEUED_EXECUTING';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- DropIndex
2+
DROP INDEX "SecretStore_key_idx";
3+
4+
-- AlterTable
5+
ALTER TABLE "TaskRunExecutionSnapshot" ADD COLUMN "previousSnapshotId" TEXT;
6+
7+
-- CreateIndex
8+
CREATE INDEX "SecretStore_key_idx" ON "SecretStore"("key" text_pattern_ops);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- AlterTable
2+
ALTER TABLE
3+
"TaskQueue"
4+
ADD
5+
COLUMN "releaseConcurrencyOnWaitpoint" BOOLEAN NOT NULL DEFAULT false;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
Warnings:
3+
4+
- Added the required column `organizationId` to the `TaskRunExecutionSnapshot` table without a default value. This is not possible if the table is not empty.
5+
- Added the required column `projectId` to the `TaskRunExecutionSnapshot` table without a default value. This is not possible if the table is not empty.
6+
7+
*/
8+
-- AlterTable
9+
ALTER TABLE
10+
"TaskRunExecutionSnapshot"
11+
ADD
12+
COLUMN "organizationId" TEXT NOT NULL,
13+
ADD
14+
COLUMN "projectId" TEXT NOT NULL;
15+
16+
-- AddForeignKey
17+
ALTER TABLE
18+
"TaskRunExecutionSnapshot"
19+
ADD
20+
CONSTRAINT "TaskRunExecutionSnapshot_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "Project"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
21+
22+
-- AddForeignKey
23+
ALTER TABLE
24+
"TaskRunExecutionSnapshot"
25+
ADD
26+
CONSTRAINT "TaskRunExecutionSnapshot_organizationId_fkey" FOREIGN KEY ("organizationId") REFERENCES "Organization"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- AlterTable
2+
ALTER TABLE
3+
"TaskRunExecutionSnapshot"
4+
ADD
5+
COLUMN "metadata" JSONB;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- AlterTable
2+
ALTER TABLE
3+
"TaskRun"
4+
ADD
5+
COLUMN "lockedQueueId" TEXT;

internal-packages/database/prisma/schema.prisma

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ model Organization {
164164
organizationIntegrations OrganizationIntegration[]
165165
workerGroups WorkerInstanceGroup[]
166166
workerInstances WorkerInstance[]
167+
executionSnapshots TaskRunExecutionSnapshot[]
167168
}
168169

169170
model ExternalAccount {
@@ -504,6 +505,7 @@ model Project {
504505
waitpoints Waitpoint[]
505506
taskRunWaitpoints TaskRunWaitpoint[]
506507
taskRunCheckpoints TaskRunCheckpoint[]
508+
executionSnapshots TaskRunExecutionSnapshot[]
507509
}
508510

509511
enum ProjectVersion {
@@ -1724,7 +1726,9 @@ model TaskRun {
17241726
projectId String
17251727
17261728
// The specific queue this run is in
1727-
queue String
1729+
queue String
1730+
// The queueId is set when the run is locked to a specific queue
1731+
lockedQueueId String?
17281732
17291733
/// The main queue that this run is part of
17301734
masterQueue String @default("main")
@@ -1965,6 +1969,9 @@ model TaskRunExecutionSnapshot {
19651969
isValid Boolean @default(true)
19661970
error String?
19671971
1972+
/// The previous snapshot ID
1973+
previousSnapshotId String?
1974+
19681975
/// Run
19691976
runId String
19701977
run TaskRun @relation(fields: [runId], references: [id])
@@ -1982,6 +1989,12 @@ model TaskRunExecutionSnapshot {
19821989
environment RuntimeEnvironment @relation(fields: [environmentId], references: [id])
19831990
environmentType RuntimeEnvironmentType
19841991
1992+
projectId String
1993+
project Project @relation(fields: [projectId], references: [id])
1994+
1995+
organizationId String
1996+
organization Organization @relation(fields: [organizationId], references: [id])
1997+
19851998
/// Waitpoints that have been completed for this execution
19861999
completedWaitpoints Waitpoint[] @relation("completedWaitpoints")
19872000
@@ -2003,6 +2016,9 @@ model TaskRunExecutionSnapshot {
20032016
20042017
lastHeartbeatAt DateTime?
20052018
2019+
/// Metadata used by various systems in the run engine
2020+
metadata Json?
2021+
20062022
/// Used to get the latest valid snapshot quickly
20072023
@@index([runId, isValid, createdAt(sort: Desc)])
20082024
}
@@ -2012,6 +2028,8 @@ enum TaskRunExecutionStatus {
20122028
RUN_CREATED
20132029
/// Run is in the RunQueue
20142030
QUEUED
2031+
/// Run is in the RunQueue, and is also executing. This happens when a run is continued cannot reacquire concurrency
2032+
QUEUED_EXECUTING
20152033
/// Run has been pulled from the queue, but isn't executing yet
20162034
PENDING_EXECUTING
20172035
/// Run is executing on a worker
@@ -2526,6 +2544,9 @@ model TaskQueue {
25262544
25272545
paused Boolean @default(false)
25282546
2547+
/// If true, when a run is paused and waiting for waitpoints to be completed, the run will release the concurrency capacity.
2548+
releaseConcurrencyOnWaitpoint Boolean @default(false)
2549+
25292550
createdAt DateTime @default(now())
25302551
updatedAt DateTime @updatedAt
25312552

0 commit comments

Comments
 (0)