Skip to content

Commit 317135e

Browse files
committed
fix: release concurrency system only consumes tokens when releasings are executed successfully
1 parent 36159be commit 317135e

File tree

5 files changed

+115
-23
lines changed

5 files changed

+115
-23
lines changed

.vscode/launch.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@
138138
"type": "node-terminal",
139139
"request": "launch",
140140
"name": "Debug RunEngine tests",
141-
"command": "pnpm run test ./src/engine/tests/releaseConcurrencyQueue.test.ts -t 'Should manage token bucket and queue correctly'",
141+
"command": "pnpm run test ./src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts -t 'Should provide metrics about queues via getQueueMetrics'",
142142
"cwd": "${workspaceFolder}/internal-packages/run-engine",
143143
"sourceMaps": true
144144
},

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ export class RunEngine {
219219
pollInterval: options.releaseConcurrency?.pollInterval ?? 1000,
220220
batchSize: options.releaseConcurrency?.batchSize ?? 10,
221221
executor: async (descriptor, snapshotId) => {
222-
await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot(
222+
return await this.releaseConcurrencySystem.executeReleaseConcurrencyForSnapshot(
223223
snapshotId
224224
);
225225
},

internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@ export type ReleaseConcurrencyQueueRetryOptions = {
1515

1616
export type ReleaseConcurrencyQueueOptions<T> = {
1717
redis: RedisOptions;
18-
executor: (releaseQueue: T, releaserId: string) => Promise<void>;
18+
/**
19+
* @returns true if the run was successful, false if the token should be returned to the bucket
20+
*/
21+
executor: (releaseQueue: T, releaserId: string) => Promise<boolean>;
1922
keys: {
2023
fromDescriptor: (releaseQueue: T) => string;
2124
toDescriptor: (releaseQueue: string) => T;
@@ -119,7 +122,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
119122
String(Date.now())
120123
);
121124

122-
this.logger.debug("Consumed token in attemptToRelease", {
125+
this.logger.info("Consumed token in attemptToRelease", {
123126
releaseQueueDescriptor,
124127
releaserId,
125128
maxTokens,
@@ -270,7 +273,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
270273
return false;
271274
}
272275

273-
await Promise.all(
276+
await Promise.allSettled(
274277
result.map(([queue, releaserId, metadata]) => {
275278
const itemMetadata = QueueItemMetadata.parse(JSON.parse(metadata));
276279
const releaseQueueDescriptor = this.keys.toDescriptor(queue);
@@ -283,9 +286,29 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
283286

284287
async #callExecutor(releaseQueueDescriptor: T, releaserId: string, metadata: QueueItemMetadata) {
285288
try {
286-
this.logger.info("Executing run:", { releaseQueueDescriptor, releaserId });
289+
this.logger.info("Calling executor for release", { releaseQueueDescriptor, releaserId });
290+
291+
const released = await this.options.executor(releaseQueueDescriptor, releaserId);
292+
293+
if (released) {
294+
this.logger.info("Executor released concurrency", { releaseQueueDescriptor, releaserId });
295+
} else {
296+
this.logger.info("Executor did not release concurrency", {
297+
releaseQueueDescriptor,
298+
releaserId,
299+
});
287300

288-
await this.options.executor(releaseQueueDescriptor, releaserId);
301+
// Return the token but don't requeue
302+
const releaseQueue = this.keys.fromDescriptor(releaseQueueDescriptor);
303+
await this.redis.returnTokenOnly(
304+
this.masterQueuesKey,
305+
this.#bucketKey(releaseQueue),
306+
this.#queueKey(releaseQueue),
307+
this.#metadataKey(releaseQueue),
308+
releaseQueue,
309+
releaserId
310+
);
311+
}
289312
} catch (error) {
290313
this.logger.error("Error executing run:", { error });
291314

@@ -401,7 +424,9 @@ local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens)
401424
402425
-- If we have enough tokens, then consume them
403426
if currentTokens >= 1 then
404-
redis.call("SET", bucketKey, currentTokens - 1)
427+
newCurrentTokens = currentTokens - 1
428+
429+
redis.call("SET", bucketKey, newCurrentTokens)
405430
redis.call("ZREM", queueKey, releaserId)
406431
407432
-- Clean up metadata when successfully consuming
@@ -411,8 +436,8 @@ if currentTokens >= 1 then
411436
local queueLength = redis.call("ZCARD", queueKey)
412437
413438
-- If we still have tokens and items in queue, update available queues
414-
if currentTokens > 0 and queueLength > 0 then
415-
redis.call("ZADD", masterQueuesKey, currentTokens, releaseQueue)
439+
if newCurrentTokens > 0 and queueLength > 0 then
440+
redis.call("ZADD", masterQueuesKey, newCurrentTokens, releaseQueue)
416441
else
417442
redis.call("ZREM", masterQueuesKey, releaseQueue)
418443
end

internal-packages/run-engine/src/engine/systems/releaseConcurrencySystem.ts

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ export class ReleaseConcurrencySystem {
104104
);
105105
}
106106

107-
public async executeReleaseConcurrencyForSnapshot(snapshotId: string) {
107+
public async executeReleaseConcurrencyForSnapshot(snapshotId: string): Promise<boolean> {
108108
if (!this.releaseConcurrencyQueue) {
109-
return;
109+
return false;
110110
}
111111

112112
this.$.logger.debug("Executing released concurrency", {
@@ -136,14 +136,14 @@ export class ReleaseConcurrencySystem {
136136
snapshotId,
137137
});
138138

139-
return;
139+
return false;
140140
}
141141

142142
// - Runlock the run
143143
// - Get latest snapshot
144144
// - If the run is non suspended or going to be, then bail
145145
// - If the run is suspended or going to be, then release the concurrency
146-
await this.$.runLock.lock([snapshot.runId], 5_000, async () => {
146+
return await this.$.runLock.lock([snapshot.runId], 5_000, async () => {
147147
const latestSnapshot = await getLatestExecutionSnapshot(this.$.prisma, snapshot.runId);
148148

149149
const isValidSnapshot =
@@ -159,7 +159,7 @@ export class ReleaseConcurrencySystem {
159159
snapshot,
160160
});
161161

162-
return;
162+
return false;
163163
}
164164

165165
if (!canReleaseConcurrency(latestSnapshot.executionStatus)) {
@@ -168,20 +168,21 @@ export class ReleaseConcurrencySystem {
168168
snapshot: latestSnapshot,
169169
});
170170

171-
return;
171+
return false;
172172
}
173173

174174
const metadata = this.#parseMetadata(snapshot.metadata);
175175

176176
if (typeof metadata.releaseConcurrency === "boolean") {
177177
if (metadata.releaseConcurrency) {
178-
return await this.$.runQueue.releaseAllConcurrency(
179-
snapshot.organizationId,
180-
snapshot.runId
181-
);
178+
await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId);
179+
180+
return true;
182181
}
183182

184-
return await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);
183+
await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);
184+
185+
return true;
185186
}
186187

187188
// Get the locked queue
@@ -198,10 +199,14 @@ export class ReleaseConcurrencySystem {
198199
(typeof taskQueue.concurrencyLimit === "undefined" ||
199200
taskQueue.releaseConcurrencyOnWaitpoint)
200201
) {
201-
return await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId);
202+
await this.$.runQueue.releaseAllConcurrency(snapshot.organizationId, snapshot.runId);
203+
204+
return true;
202205
}
203206

204-
return await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);
207+
await this.$.runQueue.releaseEnvConcurrency(snapshot.organizationId, snapshot.runId);
208+
209+
return true;
205210
});
206211
}
207212

internal-packages/run-engine/src/engine/tests/releaseConcurrencyTokenBucketQueue.test.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { redisTest, StartedRedisContainer } from "@internal/testcontainers";
22
import { ReleaseConcurrencyTokenBucketQueue } from "../releaseConcurrencyTokenBucketQueue.js";
33
import { setTimeout } from "node:timers/promises";
4+
import { createRedisClient, Redis } from "@internal/redis";
45

56
type TestQueueDescriptor = {
67
name: string;
@@ -20,6 +21,7 @@ function createReleaseConcurrencyQueue(
2021
},
2122
executor: async (releaseQueue, runId) => {
2223
executedRuns.push({ releaseQueue: releaseQueue.name, runId });
24+
return true;
2325
},
2426
maxTokens: async (_) => maxTokens,
2527
keys: {
@@ -221,6 +223,7 @@ describe("ReleaseConcurrencyQueue", () => {
221223
throw new Error("Executor failed");
222224
}
223225
executedRuns.push({ releaseQueue, runId });
226+
return true;
224227
},
225228
maxTokens: async (_) => 2,
226229
keys: {
@@ -299,6 +302,7 @@ describe("ReleaseConcurrencyQueue", () => {
299302
// Add small delay to simulate work
300303
await setTimeout(10);
301304
executedRuns.push({ releaseQueue, runId });
305+
return true;
302306
},
303307
keys: {
304308
fromDescriptor: (descriptor) => descriptor,
@@ -419,6 +423,7 @@ describe("ReleaseConcurrencyQueue", () => {
419423
},
420424
executor: async (releaseQueue, runId) => {
421425
secondRunAttempted = true;
426+
return true;
422427
},
423428
keys: {
424429
fromDescriptor: (descriptor) => descriptor,
@@ -516,6 +521,63 @@ describe("ReleaseConcurrencyQueue", () => {
516521
}
517522
});
518523

524+
redisTest(
525+
"Should return token but not requeue when executor returns false",
526+
async ({ redisContainer }) => {
527+
const executedRuns: { releaseQueue: string; runId: string }[] = [];
528+
const runResults: Record<string, boolean> = {
529+
run1: true, // This will succeed
530+
run2: false, // This will return false, returning the token without requeuing
531+
run3: true, // This should execute immediately when run2's token is returned
532+
};
533+
534+
const queue = new ReleaseConcurrencyTokenBucketQueue<string>({
535+
redis: {
536+
keyPrefix: "release-queue:test:",
537+
host: redisContainer.getHost(),
538+
port: redisContainer.getPort(),
539+
},
540+
executor: async (releaseQueue, runId) => {
541+
const success = runResults[runId];
542+
543+
executedRuns.push({ releaseQueue, runId });
544+
545+
return success;
546+
},
547+
keys: {
548+
fromDescriptor: (descriptor) => descriptor,
549+
toDescriptor: (name) => name,
550+
},
551+
maxTokens: async (_) => 2, // Only 2 tokens available at a time
552+
pollInterval: 100,
553+
});
554+
555+
try {
556+
// First run should execute and succeed
557+
await queue.attemptToRelease("test-queue", "run1");
558+
expect(executedRuns).toHaveLength(1);
559+
expect(executedRuns[0]).toEqual({ releaseQueue: "test-queue", runId: "run1" });
560+
561+
// Second run should execute but return false, returning the token
562+
await queue.attemptToRelease("test-queue", "run2");
563+
expect(executedRuns).toHaveLength(2);
564+
expect(executedRuns[1]).toEqual({ releaseQueue: "test-queue", runId: "run2" });
565+
566+
// Third run should be able to execute immediately since run2 returned its token
567+
await queue.attemptToRelease("test-queue", "run3");
568+
569+
expect(executedRuns).toHaveLength(3);
570+
expect(executedRuns[2]).toEqual({ releaseQueue: "test-queue", runId: "run3" });
571+
572+
// Verify that run2 was not retried (it should have been skipped)
573+
const run2Attempts = executedRuns.filter((r) => r.runId === "run2");
574+
expect(run2Attempts).toHaveLength(1); // Only executed once, not retried
575+
} finally {
576+
await queue.quit();
577+
}
578+
}
579+
);
580+
519581
redisTest("Should implement exponential backoff between retries", async ({ redisContainer }) => {
520582
const executionTimes: number[] = [];
521583
let startTime: number;

0 commit comments

Comments
 (0)