Skip to content

Commit b5fcb9f

Browse files
committed
Add runs to an env queue, as well as the actual queue
1 parent c531a9d commit b5fcb9f

File tree

5 files changed

+41
-5
lines changed

5 files changed

+41
-5
lines changed

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,6 +1074,7 @@ export class MarQS {
10741074
concurrencyKey,
10751075
envConcurrencyKey,
10761076
orgConcurrencyKey,
1077+
this.keys.envQueueKeyFromQueue(message.queue),
10771078
message.queue,
10781079
message.messageId,
10791080
JSON.stringify(message),
@@ -1111,6 +1112,7 @@ export class MarQS {
11111112
currentConcurrencyKey,
11121113
envCurrentConcurrencyKey,
11131114
orgCurrentConcurrencyKey,
1115+
this.keys.envQueueKeyFromQueue(messageQueue),
11141116
messageQueue,
11151117
String(Date.now()),
11161118
String(this.options.defaultEnvConcurrency),
@@ -1187,6 +1189,7 @@ export class MarQS {
11871189
concurrencyKey,
11881190
envConcurrencyKey,
11891191
orgConcurrencyKey,
1192+
this.keys.envQueueKeyFromQueue(messageQueue),
11901193
messageId,
11911194
messageQueue
11921195
);
@@ -1234,6 +1237,7 @@ export class MarQS {
12341237
envConcurrencyKey,
12351238
orgConcurrencyKey,
12361239
visibilityQueue,
1240+
this.keys.envQueueKeyFromQueue(messageQueue),
12371241
messageQueue,
12381242
messageId,
12391243
String(Date.now()),
@@ -1347,14 +1351,15 @@ export class MarQS {
13471351

13481352
#registerCommands() {
13491353
this.redis.defineCommand("enqueueMessage", {
1350-
numberOfKeys: 6,
1354+
numberOfKeys: 7,
13511355
lua: `
13521356
local queue = KEYS[1]
13531357
local parentQueue = KEYS[2]
13541358
local messageKey = KEYS[3]
13551359
local concurrencyKey = KEYS[4]
13561360
local envCurrentConcurrencyKey = KEYS[5]
13571361
local orgCurrentConcurrencyKey = KEYS[6]
1362+
local envQueue = KEYS[7]
13581363
13591364
local queueName = ARGV[1]
13601365
local messageId = ARGV[2]
@@ -1367,6 +1372,9 @@ redis.call('SET', messageKey, messageData)
13671372
-- Add the message to the queue
13681373
redis.call('ZADD', queue, messageScore, messageId)
13691374
1375+
-- Add the message to the env queue
1376+
redis.call('ZADD', envQueue, messageScore, messageId)
1377+
13701378
-- Rebalance the parent queue
13711379
local earliestMessage = redis.call('ZRANGE', queue, 0, 0, 'WITHSCORES')
13721380
if #earliestMessage == 0 then
@@ -1383,7 +1391,7 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId)
13831391
});
13841392

13851393
this.redis.defineCommand("dequeueMessage", {
1386-
numberOfKeys: 8,
1394+
numberOfKeys: 9,
13871395
lua: `
13881396
-- Keys: childQueue, parentQueue, concurrencyLimitKey, envConcurrencyLimitKey, orgConcurrencyLimitKey, currentConcurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey
13891397
local childQueue = KEYS[1]
@@ -1394,6 +1402,7 @@ local orgConcurrencyLimitKey = KEYS[5]
13941402
local currentConcurrencyKey = KEYS[6]
13951403
local envCurrentConcurrencyKey = KEYS[7]
13961404
local orgCurrentConcurrencyKey = KEYS[8]
1405+
local envQueueKey = KEYS[9]
13971406
13981407
-- Args: childQueueName, currentTime, defaultEnvConcurrencyLimit, defaultOrgConcurrencyLimit
13991408
local childQueueName = ARGV[1]
@@ -1438,6 +1447,7 @@ local messageScore = tonumber(messages[2])
14381447
14391448
-- Move message to timeout queue and update concurrency
14401449
redis.call('ZREM', childQueue, messageId)
1450+
redis.call('ZREM', envQueueKey, messageId)
14411451
redis.call('SADD', currentConcurrencyKey, messageId)
14421452
redis.call('SADD', envCurrentConcurrencyKey, messageId)
14431453
redis.call('SADD', orgCurrentConcurrencyKey, messageId)
@@ -1474,7 +1484,7 @@ redis.call('SET', messageKey, messageData, 'GET')
14741484
});
14751485

14761486
this.redis.defineCommand("acknowledgeMessage", {
1477-
numberOfKeys: 7,
1487+
numberOfKeys: 8,
14781488
lua: `
14791489
-- Keys: parentQueue, messageKey, messageQueue, visibilityQueue, concurrencyKey, envCurrentConcurrencyKey, orgCurrentConcurrencyKey
14801490
local parentQueue = KEYS[1]
@@ -1484,6 +1494,7 @@ local visibilityQueue = KEYS[4]
14841494
local concurrencyKey = KEYS[5]
14851495
local envCurrentConcurrencyKey = KEYS[6]
14861496
local orgCurrentConcurrencyKey = KEYS[7]
1497+
local envQueueKey = KEYS[8]
14871498
14881499
-- Args: messageId, messageQueueName
14891500
local messageId = ARGV[1]
@@ -1495,6 +1506,9 @@ redis.call('DEL', messageKey)
14951506
-- Remove the message from the queue
14961507
redis.call('ZREM', messageQueue, messageId)
14971508
1509+
-- Remove the message from the env queue
1510+
redis.call('ZREM', envQueueKey, messageId)
1511+
14981512
-- Rebalance the parent queue
14991513
local earliestMessage = redis.call('ZRANGE', messageQueue, 0, 0, 'WITHSCORES')
15001514
if #earliestMessage == 0 then
@@ -1514,7 +1528,7 @@ redis.call('SREM', orgCurrentConcurrencyKey, messageId)
15141528
});
15151529

15161530
this.redis.defineCommand("nackMessage", {
1517-
numberOfKeys: 7,
1531+
numberOfKeys: 8,
15181532
lua: `
15191533
-- Keys: childQueueKey, parentQueueKey, visibilityQueue, concurrencyKey, envConcurrencyKey, orgConcurrencyKey, messageId
15201534
local messageKey = KEYS[1]
@@ -1524,6 +1538,7 @@ local concurrencyKey = KEYS[4]
15241538
local envConcurrencyKey = KEYS[5]
15251539
local orgConcurrencyKey = KEYS[6]
15261540
local visibilityQueue = KEYS[7]
1541+
local envQueueKey = KEYS[8]
15271542
15281543
-- Args: childQueueName, messageId, currentTime, messageScore
15291544
local childQueueName = ARGV[1]
@@ -1547,6 +1562,9 @@ end
15471562
-- Enqueue the message into the queue
15481563
redis.call('ZADD', childQueueKey, messageScore, messageId)
15491564
1565+
-- Enqueue the message into the env queue
1566+
redis.call('ZADD', envQueueKey, messageScore, messageId)
1567+
15501568
-- Rebalance the parent queue
15511569
local earliestMessage = redis.call('ZRANGE', childQueueKey, 0, 0, 'WITHSCORES')
15521570
if #earliestMessage == 0 then
@@ -1729,6 +1747,7 @@ declare module "ioredis" {
17291747
concurrencyKey: string,
17301748
envConcurrencyKey: string,
17311749
orgConcurrencyKey: string,
1750+
envQueue: string,
17321751
queueName: string,
17331752
messageId: string,
17341753
messageData: string,
@@ -1745,6 +1764,7 @@ declare module "ioredis" {
17451764
currentConcurrencyKey: string,
17461765
envCurrentConcurrencyKey: string,
17471766
orgCurrentConcurrencyKey: string,
1767+
envQueueKey: string,
17481768
childQueueName: string,
17491769
currentTime: string,
17501770
defaultEnvConcurrencyLimit: string,
@@ -1766,6 +1786,7 @@ declare module "ioredis" {
17661786
concurrencyKey: string,
17671787
envConcurrencyKey: string,
17681788
orgConcurrencyKey: string,
1789+
envQueueKey: string,
17691790
messageId: string,
17701791
messageQueueName: string,
17711792
callback?: Callback<void>
@@ -1779,6 +1800,7 @@ declare module "ioredis" {
17791800
envConcurrencyKey: string,
17801801
orgConcurrencyKey: string,
17811802
visibilityQueue: string,
1803+
envQueueKey: string,
17821804
childQueueName: string,
17831805
messageId: string,
17841806
currentTime: string,

apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,12 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer {
128128
return [this.envKeySection(env.id), constants.CURRENT_CONCURRENCY_PART].join(":");
129129
}
130130

131+
envQueueKeyFromQueue(queue: string) {
132+
const envId = this.normalizeQueue(queue).split(":")[3];
133+
134+
return `${constants.ENV_PART}:${envId}:${constants.QUEUE_PART}`;
135+
}
136+
131137
messageKey(messageId: string) {
132138
return `${constants.MESSAGE_PART}:${messageId}`;
133139
}

apps/webapp/app/v3/marqs/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export interface MarQSKeyProducer {
4444
envCurrentConcurrencyKeyFromQueue(queue: string): string;
4545
orgCurrentConcurrencyKey(env: AuthenticatedEnvironment): string;
4646
envCurrentConcurrencyKey(env: AuthenticatedEnvironment): string;
47+
envQueueKeyFromQueue(queue: string): string;
4748
messageKey(messageId: string): string;
4849
stripKeyPrefix(key: string): string;
4950
}

references/v3-catalog/src/trigger/simple.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,3 +208,10 @@ export const childTask = task({
208208
};
209209
},
210210
});
211+
212+
export const retryTask = task({
213+
id: "retry-task",
214+
run: async (payload: any) => {
215+
throw new Error("This task will always fail");
216+
},
217+
});

references/v3-catalog/trigger.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ export default defineConfig({
1818
instrumentations: [new OpenAIInstrumentation()],
1919
additionalFiles: ["wrangler/wrangler.toml"],
2020
retries: {
21-
enabledInDev: false,
21+
enabledInDev: true,
2222
default: {
2323
maxAttempts: 10,
2424
minTimeoutInMs: 5_000,

0 commit comments

Comments
 (0)