Skip to content

Commit 2ee3b7c

Browse files
committed
When enqueuing a message into the queue clear the concurrency
1 parent 7633972 commit 2ee3b7c

File tree

1 file changed

+22
-1
lines changed

1 file changed

+22
-1
lines changed

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -986,15 +986,25 @@ export class MarQS {
986986
}
987987

988988
async #callEnqueueMessage(message: MessagePayload) {
989+
const concurrencyKey = this.keys.currentConcurrencyKeyFromQueue(message.queue);
990+
const envConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(message.queue);
991+
const orgConcurrencyKey = this.keys.orgCurrentConcurrencyKeyFromQueue(message.queue);
992+
989993
logger.debug("Calling enqueueMessage", {
990994
messagePayload: message,
995+
concurrencyKey,
996+
envConcurrencyKey,
997+
orgConcurrencyKey,
991998
service: this.name,
992999
});
9931000

9941001
return this.redis.enqueueMessage(
9951002
message.queue,
9961003
message.parentQueue,
9971004
this.keys.messageKey(message.messageId),
1005+
concurrencyKey,
1006+
envConcurrencyKey,
1007+
orgConcurrencyKey,
9981008
message.queue,
9991009
message.messageId,
10001010
JSON.stringify(message),
@@ -1268,11 +1278,14 @@ export class MarQS {
12681278

12691279
#registerCommands() {
12701280
this.redis.defineCommand("enqueueMessage", {
1271-
numberOfKeys: 3,
1281+
numberOfKeys: 6,
12721282
lua: `
12731283
local queue = KEYS[1]
12741284
local parentQueue = KEYS[2]
12751285
local messageKey = KEYS[3]
1286+
local concurrencyKey = KEYS[4]
1287+
local envCurrentConcurrencyKey = KEYS[5]
1288+
local orgCurrentConcurrencyKey = KEYS[6]
12761289
12771290
local queueName = ARGV[1]
12781291
local messageId = ARGV[2]
@@ -1292,6 +1305,11 @@ if #earliestMessage == 0 then
12921305
else
12931306
redis.call('ZADD', parentQueue, earliestMessage[2], queueName)
12941307
end
1308+
1309+
-- Update the concurrency keys
1310+
redis.call('SREM', concurrencyKey, messageId)
1311+
redis.call('SREM', envCurrentConcurrencyKey, messageId)
1312+
redis.call('SREM', orgCurrentConcurrencyKey, messageId)
12951313
`,
12961314
});
12971315

@@ -1621,6 +1639,9 @@ declare module "ioredis" {
16211639
queue: string,
16221640
parentQueue: string,
16231641
messageKey: string,
1642+
concurrencyKey: string,
1643+
envConcurrencyKey: string,
1644+
orgConcurrencyKey: string,
16241645
queueName: string,
16251646
messageId: string,
16261647
messageData: string,

0 commit comments

Comments
 (0)