Skip to content

Commit 9e0ecf7

Browse files
committed
Refill release concurrency tokens when a run is cancelled
1 parent e08e493 commit 9e0ecf7

File tree

4 files changed

+196
-30
lines changed

4 files changed

+196
-30
lines changed

apps/webapp/app/components/admin/debugRun.tsx

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,15 @@ function DebugRunContent({ friendlyId }: { friendlyId: string }) {
6666
);
6767
}
6868

69-
function DebugRunData({
69+
function DebugRunData(props: UseDataFunctionReturn<typeof loader>) {
70+
if (props.engine === "V1") {
71+
return <DebugRunDataEngineV1 {...props} />;
72+
}
73+
74+
return <DebugRunDataEngineV2 {...props} />;
75+
}
76+
77+
function DebugRunDataEngineV1({
7078
run,
7179
queueConcurrencyLimit,
7280
queueCurrentConcurrency,
@@ -338,3 +346,55 @@ function DebugRunData({
338346
</Property.Table>
339347
);
340348
}
349+
350+
function DebugRunDataEngineV2({
351+
run,
352+
queueConcurrencyLimit,
353+
queueCurrentConcurrency,
354+
envConcurrencyLimit,
355+
envCurrentConcurrency,
356+
keys,
357+
}: UseDataFunctionReturn<typeof loader>) {
358+
return (
359+
<Property.Table>
360+
<Property.Item>
361+
<Property.Label>ID</Property.Label>
362+
<Property.Value className="flex items-center gap-2">
363+
<ClipboardField value={run.id} variant="tertiary/small" iconButton />
364+
</Property.Value>
365+
</Property.Item>
366+
<Property.Item>
367+
<Property.Label>Queue current concurrency</Property.Label>
368+
<Property.Value className="flex items-center gap-2">
369+
<span>{queueCurrentConcurrency ?? "0"}</span>
370+
</Property.Value>
371+
</Property.Item>
372+
<Property.Item>
373+
<Property.Label>Queue concurrency limit</Property.Label>
374+
<Property.Value className="flex items-center gap-2">
375+
<span>{queueConcurrencyLimit ?? "Not set"}</span>
376+
</Property.Value>
377+
</Property.Item>
378+
<Property.Item>
379+
<Property.Label>Env current concurrency</Property.Label>
380+
<Property.Value className="flex items-center gap-2">
381+
<span>{envCurrentConcurrency ?? "0"}</span>
382+
</Property.Value>
383+
</Property.Item>
384+
<Property.Item>
385+
<Property.Label>Env concurrency limit</Property.Label>
386+
<Property.Value className="flex items-center gap-2">
387+
<span>{envConcurrencyLimit ?? "Not set"}</span>
388+
</Property.Value>
389+
</Property.Item>
390+
{keys.map((key) => (
391+
<Property.Item>
392+
<Property.Label>{key.label}</Property.Label>
393+
<Property.Value className="flex items-center gap-2">
394+
<ClipboardField value={key.key} variant="tertiary/small" iconButton />
395+
</Property.Value>
396+
</Property.Item>
397+
))}
398+
</Property.Table>
399+
);
400+
}

apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts

Lines changed: 128 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { z } from "zod";
44
import { $replica } from "~/db.server";
55
import { requireUserId } from "~/services/session.server";
66
import { marqs } from "~/v3/marqs/index.server";
7+
import { engine } from "~/v3/runEngine.server";
78

89
const ParamSchema = z.object({
910
runParam: z.string(),
@@ -17,6 +18,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
1718
where: { friendlyId: runParam, project: { organization: { members: { some: { userId } } } } },
1819
select: {
1920
id: true,
21+
engine: true,
2022
friendlyId: true,
2123
queue: true,
2224
concurrencyKey: true,
@@ -27,6 +29,8 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
2729
type: true,
2830
slug: true,
2931
organizationId: true,
32+
project: true,
33+
maximumConcurrencyLimit: true,
3034
organization: {
3135
select: {
3236
id: true,
@@ -41,33 +45,128 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
4145
throw new Response("Not Found", { status: 404 });
4246
}
4347

44-
const queueConcurrencyLimit = await marqs.getQueueConcurrencyLimit(
45-
run.runtimeEnvironment,
46-
run.queue
47-
);
48-
const envConcurrencyLimit = await marqs.getEnvConcurrencyLimit(run.runtimeEnvironment);
49-
const queueCurrentConcurrency = await marqs.currentConcurrencyOfQueue(
50-
run.runtimeEnvironment,
51-
run.queue,
52-
run.concurrencyKey ?? undefined
53-
);
54-
const envCurrentConcurrency = await marqs.currentConcurrencyOfEnvironment(run.runtimeEnvironment);
55-
56-
const queueReserveConcurrency = await marqs.reserveConcurrencyOfQueue(
57-
run.runtimeEnvironment,
58-
run.queue,
59-
run.concurrencyKey ?? undefined
60-
);
61-
62-
const envReserveConcurrency = await marqs.reserveConcurrencyOfEnvironment(run.runtimeEnvironment);
63-
64-
return typedjson({
65-
run,
66-
queueConcurrencyLimit,
67-
envConcurrencyLimit,
68-
queueCurrentConcurrency,
69-
envCurrentConcurrency,
70-
queueReserveConcurrency,
71-
envReserveConcurrency,
72-
});
48+
if (run.engine === "V1") {
49+
const queueConcurrencyLimit = await marqs.getQueueConcurrencyLimit(
50+
run.runtimeEnvironment,
51+
run.queue
52+
);
53+
const envConcurrencyLimit = await marqs.getEnvConcurrencyLimit(run.runtimeEnvironment);
54+
const queueCurrentConcurrency = await marqs.currentConcurrencyOfQueue(
55+
run.runtimeEnvironment,
56+
run.queue,
57+
run.concurrencyKey ?? undefined
58+
);
59+
const envCurrentConcurrency = await marqs.currentConcurrencyOfEnvironment(
60+
run.runtimeEnvironment
61+
);
62+
63+
const queueReserveConcurrency = await marqs.reserveConcurrencyOfQueue(
64+
run.runtimeEnvironment,
65+
run.queue,
66+
run.concurrencyKey ?? undefined
67+
);
68+
69+
const envReserveConcurrency = await marqs.reserveConcurrencyOfEnvironment(
70+
run.runtimeEnvironment
71+
);
72+
73+
return typedjson({
74+
engine: "V1",
75+
run,
76+
queueConcurrencyLimit,
77+
envConcurrencyLimit,
78+
queueCurrentConcurrency,
79+
envCurrentConcurrency,
80+
queueReserveConcurrency,
81+
envReserveConcurrency,
82+
keys: [],
83+
});
84+
} else {
85+
const queueConcurrencyLimit = await engine.runQueue.getQueueConcurrencyLimit(
86+
run.runtimeEnvironment,
87+
run.queue
88+
);
89+
90+
const envConcurrencyLimit = await engine.runQueue.getEnvConcurrencyLimit(
91+
run.runtimeEnvironment
92+
);
93+
94+
const queueCurrentConcurrency = await engine.runQueue.currentConcurrencyOfQueue(
95+
run.runtimeEnvironment,
96+
run.queue,
97+
run.concurrencyKey ?? undefined
98+
);
99+
100+
const envCurrentConcurrency = await engine.runQueue.currentConcurrencyOfEnvironment(
101+
run.runtimeEnvironment
102+
);
103+
104+
const queueCurrentConcurrencyKey = engine.runQueue.keys.currentConcurrencyKey(
105+
run.runtimeEnvironment,
106+
run.queue,
107+
run.concurrencyKey ?? undefined
108+
);
109+
110+
const envCurrentConcurrencyKey = engine.runQueue.keys.envCurrentConcurrencyKey(
111+
run.runtimeEnvironment
112+
);
113+
114+
const queueConcurrencyLimitKey = engine.runQueue.keys.queueConcurrencyLimitKey(
115+
run.runtimeEnvironment,
116+
run.queue
117+
);
118+
119+
const envConcurrencyLimitKey = engine.runQueue.keys.envConcurrencyLimitKey(
120+
run.runtimeEnvironment
121+
);
122+
123+
const releaseConcurrencyBucketKey = `engine:release-concurrency:org:${run.runtimeEnvironment.organizationId}:proj:${run.runtimeEnvironment.project.id}:env:${run.runtimeEnvironment.id}:bucket`;
124+
const releaseConcurrencyQueueKey = `engine:release-concurrency:org:${run.runtimeEnvironment.organizationId}:proj:${run.runtimeEnvironment.project.id}:env:${run.runtimeEnvironment.id}:queue`;
125+
const releaseConcurrencyMetadataKey = `engine:release-concurrency:org:${run.runtimeEnvironment.organizationId}:proj:${run.runtimeEnvironment.project.id}:env:${run.runtimeEnvironment.id}:metadata`;
126+
127+
const withPrefix = (key: string) => `engine:runqueue:${key}`;
128+
129+
const keys = [
130+
{
131+
label: "Queue current concurrency set",
132+
key: withPrefix(queueCurrentConcurrencyKey),
133+
},
134+
{
135+
label: "Env current concurrency set",
136+
key: withPrefix(envCurrentConcurrencyKey),
137+
},
138+
{
139+
label: "Queue concurrency limit",
140+
key: withPrefix(queueConcurrencyLimitKey),
141+
},
142+
{
143+
label: "Env concurrency limit",
144+
key: withPrefix(envConcurrencyLimitKey),
145+
},
146+
{
147+
label: "Release concurrency bucket",
148+
key: releaseConcurrencyBucketKey,
149+
},
150+
{
151+
label: "Release concurrency queue",
152+
key: releaseConcurrencyQueueKey,
153+
},
154+
{
155+
label: "Release concurrency metadata",
156+
key: releaseConcurrencyMetadataKey,
157+
},
158+
];
159+
160+
return typedjson({
161+
engine: "V2",
162+
run,
163+
queueConcurrencyLimit,
164+
envConcurrencyLimit,
165+
queueCurrentConcurrency,
166+
envCurrentConcurrency,
167+
queueReserveConcurrency: undefined,
168+
envReserveConcurrency: undefined,
169+
keys,
170+
});
171+
}
73172
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ export class RunEngine {
306306
delayedRunSystem: this.delayedRunSystem,
307307
machines: this.options.machines,
308308
retryWarmStartThresholdMs: this.options.retryWarmStartThresholdMs,
309+
releaseConcurrencySystem: this.releaseConcurrencySystem,
309310
});
310311

311312
this.dequeueSystem = new DequeueSystem({

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,15 @@ import {
3333
import { SystemResources } from "./systems.js";
3434
import { WaitpointSystem } from "./waitpointSystem.js";
3535
import { DelayedRunSystem } from "./delayedRunSystem.js";
36+
import { ReleaseConcurrencySystem } from "./releaseConcurrencySystem.js";
3637

3738
export type RunAttemptSystemOptions = {
3839
resources: SystemResources;
3940
executionSnapshotSystem: ExecutionSnapshotSystem;
4041
batchSystem: BatchSystem;
4142
waitpointSystem: WaitpointSystem;
4243
delayedRunSystem: DelayedRunSystem;
44+
releaseConcurrencySystem: ReleaseConcurrencySystem;
4345
retryWarmStartThresholdMs?: number;
4446
machines: RunEngineOptions["machines"];
4547
};
@@ -50,13 +52,15 @@ export class RunAttemptSystem {
5052
private readonly batchSystem: BatchSystem;
5153
private readonly waitpointSystem: WaitpointSystem;
5254
private readonly delayedRunSystem: DelayedRunSystem;
55+
private readonly releaseConcurrencySystem: ReleaseConcurrencySystem;
5356

5457
constructor(private readonly options: RunAttemptSystemOptions) {
5558
this.$ = options.resources;
5659
this.executionSnapshotSystem = options.executionSnapshotSystem;
5760
this.batchSystem = options.batchSystem;
5861
this.waitpointSystem = options.waitpointSystem;
5962
this.delayedRunSystem = options.delayedRunSystem;
63+
this.releaseConcurrencySystem = options.releaseConcurrencySystem;
6064
}
6165

6266
public async startRunAttempt({
@@ -1037,6 +1041,8 @@ export class RunAttemptSystem {
10371041
//remove it from the queue and release concurrency
10381042
await this.$.runQueue.acknowledgeMessage(run.runtimeEnvironment.organizationId, runId);
10391043

1044+
await this.releaseConcurrencySystem.refillTokensForSnapshot(latestSnapshot);
1045+
10401046
//if executing, we need to message the worker to cancel the run and put it into `PENDING_CANCEL` status
10411047
if (isExecuting(latestSnapshot.executionStatus)) {
10421048
const newSnapshot = await this.executionSnapshotSystem.createExecutionSnapshot(prisma, {

0 commit comments

Comments
 (0)