Skip to content

Commit 99a63d4

Browse files
authored
When enqueuing clear concurrency for that run (#1271)
* Better logging if a background worker isn’t created * When enqueuing a message into the queue clear the concurrency
1 parent 6744617 commit 99a63d4

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

apps/webapp/app/routes/api.v1.projects.$projectRef.background-workers.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
5858
{ status: 200 }
5959
);
6060
} catch (e) {
61-
logger.error("Failed to create background worker", { error: e });
61+
logger.error("Failed to create background worker", { error: JSON.stringify(e) });
6262

6363
if (e instanceof ServiceValidationError) {
6464
return json({ error: e.message }, { status: 400 });

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -986,15 +986,25 @@ export class MarQS {
986986
}
987987

988988
async #callEnqueueMessage(message: MessagePayload) {
989+
const concurrencyKey = this.keys.currentConcurrencyKeyFromQueue(message.queue);
990+
const envConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(message.queue);
991+
const orgConcurrencyKey = this.keys.orgCurrentConcurrencyKeyFromQueue(message.queue);
992+
989993
logger.debug("Calling enqueueMessage", {
990994
messagePayload: message,
995+
concurrencyKey,
996+
envConcurrencyKey,
997+
orgConcurrencyKey,
991998
service: this.name,
992999
});
9931000

9941001
return this.redis.enqueueMessage(
9951002
message.queue,
9961003
message.parentQueue,
9971004
this.keys.messageKey(message.messageId),
1005+
concurrencyKey,
1006+
envConcurrencyKey,
1007+
orgConcurrencyKey,
9981008
message.queue,
9991009
message.messageId,
10001010
JSON.stringify(message),
@@ -1268,11 +1278,14 @@ export class MarQS {
12681278

12691279
#registerCommands() {
12701280
this.redis.defineCommand("enqueueMessage", {
1271-
numberOfKeys: 3,
1281+
numberOfKeys: 6,
12721282
lua: `
12731283
local queue = KEYS[1]
12741284
local parentQueue = KEYS[2]
12751285
local messageKey = KEYS[3]
1286+
local concurrencyKey = KEYS[4]
1287+
local envCurrentConcurrencyKey = KEYS[5]
1288+
local orgCurrentConcurrencyKey = KEYS[6]
12761289
12771290
local queueName = ARGV[1]
12781291
local messageId = ARGV[2]
@@ -1292,6 +1305,11 @@ if #earliestMessage == 0 then
12921305
else
12931306
redis.call('ZADD', parentQueue, earliestMessage[2], queueName)
12941307
end
1308+
1309+
-- Update the concurrency keys
1310+
redis.call('SREM', concurrencyKey, messageId)
1311+
redis.call('SREM', envCurrentConcurrencyKey, messageId)
1312+
redis.call('SREM', orgCurrentConcurrencyKey, messageId)
12951313
`,
12961314
});
12971315

@@ -1621,6 +1639,9 @@ declare module "ioredis" {
16211639
queue: string,
16221640
parentQueue: string,
16231641
messageKey: string,
1642+
concurrencyKey: string,
1643+
envConcurrencyKey: string,
1644+
orgConcurrencyKey: string,
16241645
queueName: string,
16251646
messageId: string,
16261647
messageData: string,

0 commit comments

Comments
 (0)