Skip to content

Commit 28b3ed0

Browse files
committed
Implement release concurrency system
1 parent 7eaf81a commit 28b3ed0

File tree

38 files changed

+1659
-190
lines changed

38 files changed

+1659
-190
lines changed

apps/webapp/app/routes/engine.v1.runs.$runFriendlyId.wait.duration.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,9 @@ const { action } = createActionApiRoute(
4848
const waitResult = await engine.blockRunWithWaitpoint({
4949
runId: run.id,
5050
waitpoints: waitpoint.id,
51-
environmentId: authentication.environment.id,
5251
projectId: authentication.environment.project.id,
5352
organizationId: authentication.environment.organization.id,
54-
releaseConcurrency: true,
53+
releaseConcurrency: body.releaseConcurrency,
5554
});
5655

5756
return json({

apps/webapp/app/routes/engine.v1.runs.$runFriendlyId.waitpoints.tokens.$waitpointFriendlyId.wait.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,12 @@ const { action } = createActionApiRoute(
3434
throw json({ error: "Waitpoint not found" }, { status: 404 });
3535
}
3636

37+
// TODO: Add releaseConcurrency from the body
3738
const result = await engine.blockRunWithWaitpoint({
3839
runId,
3940
waitpoints: [waitpointId],
40-
environmentId: authentication.environment.id,
4141
projectId: authentication.environment.project.id,
4242
organizationId: authentication.environment.organization.id,
43-
releaseConcurrency: true,
4443
});
4544

4645
return json<WaitForWaitpointTokenResponseBody>(

apps/webapp/app/v3/services/triggerTaskV2.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,10 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
167167
index: options.batchIndex ?? 0,
168168
}
169169
: undefined,
170-
environmentId: environment.id,
171170
projectId: environment.projectId,
172171
organizationId: environment.organizationId,
173172
tx: this._prisma,
173+
releaseConcurrency: body.options?.releaseConcurrency,
174174
});
175175
}
176176
);
@@ -373,6 +373,7 @@ export class TriggerTaskServiceV2 extends WithRunEngine {
373373
: undefined,
374374
machine: body.options?.machine,
375375
priorityMs: body.options?.priority ? body.options.priority * 1_000 : undefined,
376+
releaseConcurrency: body.options?.releaseConcurrency,
376377
},
377378
this._prisma
378379
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- AlterTable
2+
ALTER TABLE
3+
"TaskQueue"
4+
ADD
5+
COLUMN "releaseConcurrencyOnWaitpoint" BOOLEAN NOT NULL DEFAULT false;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
Warnings:
3+
4+
- Added the required column `organizationId` to the `TaskRunExecutionSnapshot` table without a default value. This is not possible if the table is not empty.
5+
- Added the required column `projectId` to the `TaskRunExecutionSnapshot` table without a default value. This is not possible if the table is not empty.
6+
7+
*/
8+
-- AlterTable
9+
ALTER TABLE
10+
"TaskRunExecutionSnapshot"
11+
ADD
12+
COLUMN "organizationId" TEXT NOT NULL,
13+
ADD
14+
COLUMN "projectId" TEXT NOT NULL;
15+
16+
-- AddForeignKey
17+
ALTER TABLE
18+
"TaskRunExecutionSnapshot"
19+
ADD
20+
CONSTRAINT "TaskRunExecutionSnapshot_projectId_fkey" FOREIGN KEY ("projectId") REFERENCES "Project"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
21+
22+
-- AddForeignKey
23+
ALTER TABLE
24+
"TaskRunExecutionSnapshot"
25+
ADD
26+
CONSTRAINT "TaskRunExecutionSnapshot_organizationId_fkey" FOREIGN KEY ("organizationId") REFERENCES "Organization"("id") ON DELETE RESTRICT ON UPDATE CASCADE;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- AlterTable
2+
ALTER TABLE
3+
"TaskRunExecutionSnapshot"
4+
ADD
5+
COLUMN "metadata" JSONB;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
-- AlterTable
2+
ALTER TABLE
3+
"TaskRun"
4+
ADD
5+
COLUMN "lockedQueueId" TEXT;

internal-packages/database/prisma/schema.prisma

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ model Organization {
164164
organizationIntegrations OrganizationIntegration[]
165165
workerGroups WorkerInstanceGroup[]
166166
workerInstances WorkerInstance[]
167+
executionSnapshots TaskRunExecutionSnapshot[]
167168
}
168169

169170
model ExternalAccount {
@@ -504,6 +505,7 @@ model Project {
504505
waitpoints Waitpoint[]
505506
taskRunWaitpoints TaskRunWaitpoint[]
506507
taskRunCheckpoints TaskRunCheckpoint[]
508+
executionSnapshots TaskRunExecutionSnapshot[]
507509
}
508510

509511
enum ProjectVersion {
@@ -1724,7 +1726,9 @@ model TaskRun {
17241726
projectId String
17251727
17261728
// The specific queue this run is in
1727-
queue String
1729+
queue String
1730+
// The queueId is set when the run is locked to a specific queue
1731+
lockedQueueId String?
17281732
17291733
/// The main queue that this run is part of
17301734
masterQueue String @default("main")
@@ -1985,6 +1989,12 @@ model TaskRunExecutionSnapshot {
19851989
environment RuntimeEnvironment @relation(fields: [environmentId], references: [id])
19861990
environmentType RuntimeEnvironmentType
19871991
1992+
projectId String
1993+
project Project @relation(fields: [projectId], references: [id])
1994+
1995+
organizationId String
1996+
organization Organization @relation(fields: [organizationId], references: [id])
1997+
19881998
/// Waitpoints that have been completed for this execution
19891999
completedWaitpoints Waitpoint[] @relation("completedWaitpoints")
19902000
@@ -2006,6 +2016,9 @@ model TaskRunExecutionSnapshot {
20062016
20072017
lastHeartbeatAt DateTime?
20082018
2019+
/// Metadata used by various systems in the run engine
2020+
metadata Json?
2021+
20092022
/// Used to get the latest valid snapshot quickly
20102023
@@index([runId, isValid, createdAt(sort: Desc)])
20112024
}
@@ -2531,6 +2544,9 @@ model TaskQueue {
25312544
25322545
paused Boolean @default(false)
25332546
2547+
/// If true, when a run is paused and waiting for waitpoints to be completed, the run will release the concurrency capacity.
2548+
releaseConcurrencyOnWaitpoint Boolean @default(false)
2549+
25342550
createdAt DateTime @default(now())
25352551
updatedAt DateTime @updatedAt
25362552

internal-packages/run-engine/src/engine/index.ts

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ import {
3636
import { EventBus, EventBusEvents } from "./eventBus.js";
3737
import { RunLocker } from "./locking.js";
3838
import { ReleaseConcurrencyTokenBucketQueue } from "./releaseConcurrencyTokenBucketQueue.js";
39-
import { canReleaseConcurrency } from "./statuses.js";
4039
import { BatchSystem } from "./systems/batchSystem.js";
4140
import { CheckpointSystem } from "./systems/checkpointSystem.js";
4241
import { DelayedRunSystem } from "./systems/delayedRunSystem.js";
@@ -46,13 +45,14 @@ import {
4645
ExecutionSnapshotSystem,
4746
getLatestExecutionSnapshot,
4847
} from "./systems/executionSnapshotSystem.js";
48+
import { ReleaseConcurrencySystem } from "./systems/releaseConcurrencySystem.js";
4949
import { RunAttemptSystem } from "./systems/runAttemptSystem.js";
5050
import { SystemResources } from "./systems/systems.js";
5151
import { TtlSystem } from "./systems/ttlSystem.js";
52+
import { WaitingForWorkerSystem } from "./systems/waitingForWorkerSystem.js";
5253
import { WaitpointSystem } from "./systems/waitpointSystem.js";
5354
import { EngineWorker, HeartbeatTimeouts, RunEngineOptions, TriggerParams } from "./types.js";
5455
import { workerCatalog } from "./workerCatalog.js";
55-
import { WaitingForWorkerSystem } from "./systems/waitingForWorkerSystem.js";
5656

5757
export class RunEngine {
5858
private runLockRedis: Redis;
@@ -63,7 +63,7 @@ export class RunEngine {
6363
private logger = new Logger("RunEngine", "debug");
6464
private tracer: Tracer;
6565
private heartbeatTimeouts: HeartbeatTimeouts;
66-
private releaseConcurrencyQueue: ReleaseConcurrencyTokenBucketQueue<{
66+
releaseConcurrencyQueue: ReleaseConcurrencyTokenBucketQueue<{
6767
orgId: string;
6868
projectId: string;
6969
envId: string;
@@ -79,6 +79,7 @@ export class RunEngine {
7979
delayedRunSystem: DelayedRunSystem;
8080
ttlSystem: TtlSystem;
8181
waitingForWorkerSystem: WaitingForWorkerSystem;
82+
releaseConcurrencySystem: ReleaseConcurrencySystem;
8283

8384
constructor(private readonly options: RunEngineOptions) {
8485
this.prisma = options.prisma;
@@ -188,7 +189,7 @@ export class RunEngine {
188189
redis: {
189190
...options.queue.redis, // Use base queue redis options
190191
...options.releaseConcurrency?.redis, // Allow overrides
191-
keyPrefix: `${options.queue.redis.keyPrefix}release-concurrency:`,
192+
keyPrefix: `${options.queue.redis.keyPrefix ?? ""}release-concurrency:`,
192193
},
193194
retry: {
194195
maxRetries: options.releaseConcurrency?.maxRetries ?? 5,
@@ -201,8 +202,8 @@ export class RunEngine {
201202
consumersCount: options.releaseConcurrency?.consumersCount ?? 1,
202203
pollInterval: options.releaseConcurrency?.pollInterval ?? 1000,
203204
batchSize: options.releaseConcurrency?.batchSize ?? 10,
204-
executor: async (descriptor, runId) => {
205-
await this.#executeReleasedConcurrencyFromQueue(descriptor, runId);
205+
executor: async (descriptor, snapshotId) => {
206+
await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot(snapshotId);
206207
},
207208
maxTokens: async (descriptor) => {
208209
const environment = await this.prisma.runtimeEnvironment.findFirstOrThrow({
@@ -239,6 +240,10 @@ export class RunEngine {
239240
releaseConcurrencyQueue: this.releaseConcurrencyQueue,
240241
};
241242

243+
this.releaseConcurrencySystem = new ReleaseConcurrencySystem({
244+
resources,
245+
});
246+
242247
this.executionSnapshotSystem = new ExecutionSnapshotSystem({
243248
resources,
244249
heartbeatTimeouts: this.heartbeatTimeouts,
@@ -251,6 +256,7 @@ export class RunEngine {
251256

252257
this.checkpointSystem = new CheckpointSystem({
253258
resources,
259+
releaseConcurrencySystem: this.releaseConcurrencySystem,
254260
executionSnapshotSystem: this.executionSnapshotSystem,
255261
enqueueSystem: this.enqueueSystem,
256262
});
@@ -269,6 +275,7 @@ export class RunEngine {
269275
resources,
270276
executionSnapshotSystem: this.executionSnapshotSystem,
271277
enqueueSystem: this.enqueueSystem,
278+
releaseConcurrencySystem: this.releaseConcurrencySystem,
272279
});
273280

274281
this.ttlSystem = new TtlSystem({
@@ -344,6 +351,7 @@ export class RunEngine {
344351
machine,
345352
workerId,
346353
runnerId,
354+
releaseConcurrency,
347355
}: TriggerParams,
348356
tx?: PrismaClientOrTransaction
349357
): Promise<TaskRun> {
@@ -435,6 +443,8 @@ export class RunEngine {
435443
runStatus: status,
436444
environmentId: environment.id,
437445
environmentType: environment.type,
446+
projectId: environment.project.id,
447+
organizationId: environment.organization.id,
438448
workerId,
439449
runnerId,
440450
},
@@ -490,12 +500,11 @@ export class RunEngine {
490500
runId: parentTaskRunId,
491501
waitpoints: associatedWaitpoint.id,
492502
projectId: associatedWaitpoint.projectId,
493-
organizationId: environment.organization.id,
494503
batch,
495504
workerId,
496505
runnerId,
497506
tx: prisma,
498-
releaseConcurrency: true, // TODO: This needs to use the release concurrency system
507+
releaseConcurrency,
499508
});
500509
}
501510

@@ -1015,7 +1024,6 @@ export class RunEngine {
10151024
runId,
10161025
waitpoints,
10171026
projectId,
1018-
organizationId,
10191027
releaseConcurrency,
10201028
timeout,
10211029
spanIdToComplete,
@@ -1040,7 +1048,6 @@ export class RunEngine {
10401048
runId,
10411049
waitpoints,
10421050
projectId,
1043-
organizationId,
10441051
releaseConcurrency,
10451052
timeout,
10461053
spanIdToComplete,
@@ -1051,35 +1058,6 @@ export class RunEngine {
10511058
});
10521059
}
10531060

1054-
async #executeReleasedConcurrencyFromQueue(
1055-
descriptor: { orgId: string; projectId: string; envId: string },
1056-
runId: string
1057-
) {
1058-
this.logger.debug("Executing released concurrency", {
1059-
descriptor,
1060-
runId,
1061-
});
1062-
1063-
// - Runlock the run
1064-
// - Get latest snapshot
1065-
// - If the run is non suspended or going to be, then bail
1066-
// - If the run is suspended or going to be, then release the concurrency
1067-
await this.runLock.lock([runId], 5_000, async () => {
1068-
const snapshot = await getLatestExecutionSnapshot(this.prisma, runId);
1069-
1070-
if (!canReleaseConcurrency(snapshot.executionStatus)) {
1071-
this.logger.debug("Run is not in a state to release concurrency", {
1072-
runId,
1073-
snapshot,
1074-
});
1075-
1076-
return;
1077-
}
1078-
1079-
return await this.runQueue.releaseConcurrency(descriptor.orgId, snapshot.runId);
1080-
});
1081-
}
1082-
10831061
/** This completes a waitpoint and updates all entries so the run isn't blocked,
10841062
* if they're no longer blocked. This doesn't suffer from race conditions. */
10851063
async completeWaitpoint({
@@ -1340,7 +1318,8 @@ export class RunEngine {
13401318
id: latestSnapshot.environmentId,
13411319
type: latestSnapshot.environmentType,
13421320
},
1343-
orgId: run.runtimeEnvironment.organizationId,
1321+
orgId: latestSnapshot.organizationId,
1322+
projectId: latestSnapshot.projectId,
13441323
error: {
13451324
type: "INTERNAL_ERROR",
13461325
code: "TASK_RUN_DEQUEUED_MAX_RETRIES",

0 commit comments

Comments
 (0)