Skip to content

Commit 057020b

Browse files
committed
Using triggerAndWait or batchTriggerAndWait frees up concurrency
Normally it frees up just env and org concurrency. If it’s a recursive task then it will free up the run concurrency too (e.g. a task calling itself).
1 parent 59d3f3d commit 057020b

File tree

2 files changed

+96
-0
lines changed

2 files changed

+96
-0
lines changed

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,60 @@ export class MarQS {
542542
);
543543
}
544544

545+
public async releaseConcurrency(messageId: string, releaseForRun: boolean = false) {
546+
return this.#trace(
547+
"releaseConcurrency",
548+
async (span) => {
549+
span.setAttributes({
550+
[SemanticAttributes.MESSAGE_ID]: messageId,
551+
});
552+
553+
const message = await this.readMessage(messageId);
554+
555+
if (!message) {
556+
return;
557+
}
558+
559+
span.setAttributes({
560+
[SemanticAttributes.QUEUE]: message.queue,
561+
[SemanticAttributes.MESSAGE_ID]: message.messageId,
562+
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
563+
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
564+
});
565+
566+
const concurrencyKey = this.keys.currentConcurrencyKeyFromQueue(message.queue);
567+
const envConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(message.queue);
568+
const orgConcurrencyKey = this.keys.orgCurrentConcurrencyKeyFromQueue(message.queue);
569+
570+
logger.debug("Calling releaseConcurrency", {
571+
messageId,
572+
queue: message.queue,
573+
concurrencyKey,
574+
envConcurrencyKey,
575+
orgConcurrencyKey,
576+
service: this.name,
577+
releaseForRun,
578+
});
579+
580+
return this.redis.releaseConcurrency(
581+
//don't release the for the run, it breaks concurrencyLimits
582+
releaseForRun ? concurrencyKey : "",
583+
envConcurrencyKey,
584+
orgConcurrencyKey,
585+
message.messageId
586+
);
587+
},
588+
{
589+
kind: SpanKind.CONSUMER,
590+
attributes: {
591+
[SEMATTRS_MESSAGING_OPERATION]: "releaseConcurrency",
592+
[SEMATTRS_MESSAGE_ID]: messageId,
593+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
594+
},
595+
}
596+
);
597+
}
598+
545599
async #trace<T>(
546600
name: string,
547601
fn: (span: Span) => Promise<T>,
@@ -1488,6 +1542,24 @@ end
14881542
`,
14891543
});
14901544

1545+
this.redis.defineCommand("releaseConcurrency", {
1546+
numberOfKeys: 3,
1547+
lua: `
1548+
local concurrencyKey = KEYS[1]
1549+
local envCurrentConcurrencyKey = KEYS[2]
1550+
local orgCurrentConcurrencyKey = KEYS[3]
1551+
1552+
local messageId = ARGV[1]
1553+
1554+
-- Update the concurrency keys
1555+
if concurrencyKey ~= "" then
1556+
redis.call('SREM', concurrencyKey, messageId)
1557+
end
1558+
redis.call('SREM', envCurrentConcurrencyKey, messageId)
1559+
redis.call('SREM', orgCurrentConcurrencyKey, messageId)
1560+
`,
1561+
});
1562+
14911563
this.redis.defineCommand("heartbeatMessage", {
14921564
numberOfKeys: 1,
14931565
lua: `
@@ -1699,6 +1771,14 @@ declare module "ioredis" {
16991771
callback?: Callback<void>
17001772
): Result<void, Context>;
17011773

1774+
releaseConcurrency(
1775+
concurrencyKey: string,
1776+
envConcurrencyKey: string,
1777+
orgConcurrencyKey: string,
1778+
messageId: string,
1779+
callback?: Callback<void>
1780+
): Result<void, Context>;
1781+
17021782
heartbeatMessage(
17031783
visibilityQueue: string,
17041784
messageId: string,

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ export class TriggerTaskService extends BaseService {
107107
select: {
108108
id: true,
109109
status: true,
110+
taskIdentifier: true,
110111
},
111112
},
112113
},
@@ -143,6 +144,7 @@ export class TriggerTaskService extends BaseService {
143144
select: {
144145
id: true,
145146
status: true,
147+
taskIdentifier: true,
146148
},
147149
},
148150
},
@@ -391,6 +393,20 @@ export class TriggerTaskService extends BaseService {
391393
this._prisma
392394
);
393395

396+
//release the concurrency for the env and org, if part of a (batch)triggerAndWait
397+
if (dependentAttempt) {
398+
const isSameTask = dependentAttempt.taskRun.taskIdentifier === taskId;
399+
await marqs?.releaseConcurrency(dependentAttempt.taskRun.id, isSameTask);
400+
}
401+
if (dependentBatchRun?.dependentTaskAttempt) {
402+
const isSameTask =
403+
dependentBatchRun.dependentTaskAttempt.taskRun.taskIdentifier === taskId;
404+
await marqs?.releaseConcurrency(
405+
dependentBatchRun.dependentTaskAttempt.taskRun.id,
406+
isSameTask
407+
);
408+
}
409+
394410
if (!run) {
395411
return;
396412
}

0 commit comments

Comments
 (0)