Skip to content

Commit 2bd533d

Browse files
committed
Improved release concurrency accounting system + a sweeper to auto-refill tokens for snapshots that are no longer the latest snapshot on a run (e.g. the run has moved to a new snapshot state)
1 parent 9e0ecf7 commit 2bd533d

File tree

10 files changed

+452
-92
lines changed

10 files changed

+452
-92
lines changed

apps/webapp/app/env.server.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,11 @@ const EnvironmentSchema = z.object({
605605
RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED: z.string().default("0"),
606606
RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS: z.string().default("0"),
607607
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO: z.coerce.number().default(1),
608+
RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_MAX_AGE: z.coerce
609+
.number()
610+
.int()
611+
.default(60_000 * 30),
612+
RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_POLL_INTERVAL: z.coerce.number().int().default(60_000),
608613
RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES: z.coerce.number().int().default(3),
609614
RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT: z.coerce.number().int().default(1),
610615
RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL: z.coerce.number().int().default(500),

apps/webapp/app/routes/resources.taskruns.$runParam.debug.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,10 @@ export async function loader({ request, params }: LoaderFunctionArgs) {
155155
label: "Release concurrency metadata",
156156
key: releaseConcurrencyMetadataKey,
157157
},
158+
{
159+
label: "Release concurrency releasings",
160+
key: "engine:release-concurrency:releasings",
161+
},
158162
];
159163

160164
return typedjson({

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ function createRunEngine() {
8181
disabled: env.RUN_ENGINE_RELEASE_CONCURRENCY_ENABLED === "0",
8282
disableConsumers: env.RUN_ENGINE_RELEASE_CONCURRENCY_DISABLE_CONSUMERS === "1",
8383
maxTokensRatio: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_TOKENS_RATIO,
84+
releasingsMaxAge: env.RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_MAX_AGE,
85+
releasingsPollInterval: env.RUN_ENGINE_RELEASE_CONCURRENCY_RELEASINGS_POLL_INTERVAL,
8486
maxRetries: env.RUN_ENGINE_RELEASE_CONCURRENCY_MAX_RETRIES,
8587
consumersCount: env.RUN_ENGINE_RELEASE_CONCURRENCY_CONSUMERS_COUNT,
8688
pollInterval: env.RUN_ENGINE_RELEASE_CONCURRENCY_POLL_INTERVAL,

internal-packages/run-engine/src/engine/index.ts

Lines changed: 3 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ export class RunEngine {
201201

202202
this.releaseConcurrencySystem = new ReleaseConcurrencySystem({
203203
resources,
204+
maxTokensRatio: options.releaseConcurrency?.maxTokensRatio,
205+
releasingsMaxAge: options.releaseConcurrency?.releasingsMaxAge,
206+
releasingsPollInterval: options.releaseConcurrency?.releasingsPollInterval,
204207
queueOptions:
205208
typeof options.releaseConcurrency?.disabled === "boolean" &&
206209
options.releaseConcurrency.disabled
@@ -223,33 +226,6 @@ export class RunEngine {
223226
consumersCount: options.releaseConcurrency?.consumersCount ?? 1,
224227
pollInterval: options.releaseConcurrency?.pollInterval ?? 1000,
225228
batchSize: options.releaseConcurrency?.batchSize ?? 10,
226-
executor: async (descriptor, snapshotId) => {
227-
return await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot(
228-
snapshotId
229-
);
230-
},
231-
maxTokens: async (descriptor) => {
232-
const environment = await this.prisma.runtimeEnvironment.findFirstOrThrow({
233-
where: { id: descriptor.envId },
234-
select: {
235-
maximumConcurrencyLimit: true,
236-
},
237-
});
238-
239-
return (
240-
environment.maximumConcurrencyLimit *
241-
(options.releaseConcurrency?.maxTokensRatio ?? 1.0)
242-
);
243-
},
244-
keys: {
245-
fromDescriptor: (descriptor) =>
246-
`org:${descriptor.orgId}:proj:${descriptor.projectId}:env:${descriptor.envId}`,
247-
toDescriptor: (name) => ({
248-
orgId: name.split(":")[1],
249-
projectId: name.split(":")[3],
250-
envId: name.split(":")[5],
251-
}),
252-
},
253229
tracer: this.tracer,
254230
},
255231
});

0 commit comments

Comments
 (0)