Skip to content

Commit 7eaf81a

Browse files
committed
Use releaserId in case we don't end up using run IDs
1 parent f9c7e95 commit 7eaf81a

File tree

1 file changed

+31
-31
lines changed

1 file changed

+31
-31
lines changed

internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export type ReleaseConcurrencyQueueRetryOptions = {
1515

1616
export type ReleaseConcurrencyQueueOptions<T> = {
1717
redis: RedisOptions;
18-
executor: (releaseQueue: T, runId: string) => Promise<void>;
18+
executor: (releaseQueue: T, releaserId: string) => Promise<void>;
1919
keys: {
2020
fromDescriptor: (releaseQueue: T) => string;
2121
toDescriptor: (releaseQueue: string) => T;
@@ -89,7 +89,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
8989
* If there is no token available, then we'll add the operation to a queue
9090
* and wait until the token is available.
9191
*/
92-
public async attemptToRelease(releaseQueueDescriptor: T, runId: string) {
92+
public async attemptToRelease(releaseQueueDescriptor: T, releaserId: string) {
9393
const maxTokens = await this.#callMaxTokens(releaseQueueDescriptor);
9494

9595
if (maxTokens === 0) {
@@ -104,13 +104,13 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
104104
this.#queueKey(releaseQueue),
105105
this.#metadataKey(releaseQueue),
106106
releaseQueue,
107-
runId,
107+
releaserId,
108108
String(maxTokens),
109109
String(Date.now())
110110
);
111111

112112
if (!!result) {
113-
await this.#callExecutor(releaseQueueDescriptor, runId, {
113+
await this.#callExecutor(releaseQueueDescriptor, releaserId, {
114114
retryCount: 0,
115115
lastAttempt: Date.now(),
116116
});
@@ -161,28 +161,28 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
161161
}
162162

163163
await Promise.all(
164-
result.map(([queue, runId, metadata]) => {
164+
result.map(([queue, releaserId, metadata]) => {
165165
const itemMetadata = QueueItemMetadata.parse(JSON.parse(metadata));
166166
const releaseQueueDescriptor = this.keys.toDescriptor(queue);
167-
return this.#callExecutor(releaseQueueDescriptor, runId, itemMetadata);
167+
return this.#callExecutor(releaseQueueDescriptor, releaserId, itemMetadata);
168168
})
169169
);
170170

171171
return true;
172172
}
173173

174-
async #callExecutor(releaseQueueDescriptor: T, runId: string, metadata: QueueItemMetadata) {
174+
async #callExecutor(releaseQueueDescriptor: T, releaserId: string, metadata: QueueItemMetadata) {
175175
try {
176-
this.logger.info("Executing run:", { releaseQueueDescriptor, runId });
176+
this.logger.info("Executing run:", { releaseQueueDescriptor, releaserId });
177177

178-
await this.options.executor(releaseQueueDescriptor, runId);
178+
await this.options.executor(releaseQueueDescriptor, releaserId);
179179
} catch (error) {
180180
this.logger.error("Error executing run:", { error });
181181

182182
if (metadata.retryCount >= this.maxRetries) {
183183
this.logger.error("Max retries reached:", {
184184
releaseQueueDescriptor,
185-
runId,
185+
releaserId,
186186
retryCount: metadata.retryCount,
187187
});
188188

@@ -194,10 +194,10 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
194194
this.#queueKey(releaseQueue),
195195
this.#metadataKey(releaseQueue),
196196
releaseQueue,
197-
runId
197+
releaserId
198198
);
199199

200-
this.logger.info("Returned token:", { releaseQueueDescriptor, runId });
200+
this.logger.info("Returned token:", { releaseQueueDescriptor, releaserId });
201201

202202
return;
203203
}
@@ -216,7 +216,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
216216
this.#queueKey(releaseQueue),
217217
this.#metadataKey(releaseQueue),
218218
releaseQueue,
219-
runId,
219+
releaserId,
220220
JSON.stringify(updatedMetadata),
221221
this.#calculateBackoffScore(updatedMetadata)
222222
);
@@ -282,7 +282,7 @@ local queueKey = KEYS[3]
282282
local metadataKey = KEYS[4]
283283
284284
local releaseQueue = ARGV[1]
285-
local runId = ARGV[2]
285+
local releaserId = ARGV[2]
286286
local maxTokens = tonumber(ARGV[3])
287287
local score = ARGV[4]
288288
@@ -292,10 +292,10 @@ local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens)
292292
-- If we have enough tokens, then consume them
293293
if currentTokens >= 1 then
294294
redis.call("SET", bucketKey, currentTokens - 1)
295-
redis.call("ZREM", queueKey, runId)
295+
redis.call("ZREM", queueKey, releaserId)
296296
297297
-- Clean up metadata when successfully consuming
298-
redis.call("HDEL", metadataKey, runId)
298+
redis.call("HDEL", metadataKey, releaserId)
299299
300300
-- Get queue length after removing the item
301301
local queueLength = redis.call("ZCARD", queueKey)
@@ -311,14 +311,14 @@ if currentTokens >= 1 then
311311
end
312312
313313
-- If we don't have enough tokens, then we need to add the operation to the queue
314-
redis.call("ZADD", queueKey, score, runId)
314+
redis.call("ZADD", queueKey, score, releaserId)
315315
316316
-- Initialize or update metadata
317317
local metadata = cjson.encode({
318318
retryCount = 0,
319319
lastAttempt = tonumber(score)
320320
})
321-
redis.call("HSET", metadataKey, runId, metadata)
321+
redis.call("HSET", metadataKey, releaserId, metadata)
322322
323323
-- Remove from the master queue
324324
redis.call("ZREM", masterQueuesKey, releaseQueue)
@@ -400,14 +400,14 @@ redis.call("SET", bucketKey, currentTokens - itemsToProcess)
400400
401401
-- Remove the items from the queue and add to results
402402
for i = 1, itemsToProcess do
403-
local runId = items[i]
404-
redis.call("ZREM", queueKey, runId)
403+
local releaserId = items[i]
404+
redis.call("ZREM", queueKey, releaserId)
405405
406406
-- Get metadata before removing it
407-
local metadata = redis.call("HGET", metadataKey, runId)
408-
redis.call("HDEL", metadataKey, runId)
407+
local metadata = redis.call("HGET", metadataKey, releaserId)
408+
redis.call("HDEL", metadataKey, releaserId)
409409
410-
table.insert(results, { queueName, runId, metadata })
410+
table.insert(results, { queueName, releaserId, metadata })
411411
end
412412
413413
-- Get remaining queue length
@@ -434,7 +434,7 @@ local queueKey = KEYS[3]
434434
local metadataKey = KEYS[4]
435435
436436
local releaseQueue = ARGV[1]
437-
local runId = ARGV[2]
437+
local releaserId = ARGV[2]
438438
local metadata = ARGV[3]
439439
local score = ARGV[4]
440440
@@ -444,10 +444,10 @@ local remainingTokens = currentTokens + 1
444444
redis.call("SET", bucketKey, remainingTokens)
445445
446446
-- Add the item back to the queue
447-
redis.call("ZADD", queueKey, score, runId)
447+
redis.call("ZADD", queueKey, score, releaserId)
448448
449449
-- Add the metadata back to the item
450-
redis.call("HSET", metadataKey, runId, metadata)
450+
redis.call("HSET", metadataKey, releaserId, metadata)
451451
452452
-- Update the master queue
453453
local queueLength = redis.call("ZCARD", queueKey)
@@ -470,15 +470,15 @@ local queueKey = KEYS[3]
470470
local metadataKey = KEYS[4]
471471
472472
local releaseQueue = ARGV[1]
473-
local runId = ARGV[2]
473+
local releaserId = ARGV[2]
474474
475475
-- Return the token to the bucket
476476
local currentTokens = tonumber(redis.call("GET", bucketKey))
477477
local remainingTokens = currentTokens + 1
478478
redis.call("SET", bucketKey, remainingTokens)
479479
480480
-- Clean up metadata
481-
redis.call("HDEL", metadataKey, runId)
481+
redis.call("HDEL", metadataKey, releaserId)
482482
483483
-- Update the master queue based on remaining queue length
484484
local queueLength = redis.call("ZCARD", queueKey)
@@ -502,7 +502,7 @@ declare module "@internal/redis" {
502502
queueKey: string,
503503
metadataKey: string,
504504
releaseQueue: string,
505-
runId: string,
505+
releaserId: string,
506506
maxTokens: string,
507507
score: string,
508508
callback?: Callback<string>
@@ -532,7 +532,7 @@ declare module "@internal/redis" {
532532
queueKey: string,
533533
metadataKey: string,
534534
releaseQueue: string,
535-
runId: string,
535+
releaserId: string,
536536
metadata: string,
537537
score: string,
538538
callback?: Callback<void>
@@ -544,7 +544,7 @@ declare module "@internal/redis" {
544544
queueKey: string,
545545
metadataKey: string,
546546
releaseQueue: string,
547-
runId: string,
547+
releaserId: string,
548548
callback?: Callback<void>
549549
): Result<void, Context>;
550550
}

0 commit comments

Comments
 (0)