Skip to content

Commit f739407

Browse files
matt-aitkenericallam
authored andcommitted
Remove the concurrencyLimit from a queue if the limit is null
1 parent 4da28f7 commit f739407

File tree

4 files changed

+21
-0
lines changed

4 files changed

+21
-0
lines changed

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ export class MarQS {
8282
return this.redis.set(this.keys.queueConcurrencyLimitKey(env, queue), concurrency);
8383
}
8484

85+
public async removeQueueConcurrencyLimits(env: AuthenticatedEnvironment, queue: string) {
86+
return this.redis.del(this.keys.queueConcurrencyLimitKey(env, queue));
87+
}
88+
8589
public async updateEnvConcurrencyLimits(env: AuthenticatedEnvironment) {
8690
await this.#callUpdateGlobalConcurrencyLimits({
8791
envConcurrencyLimitKey: this.keys.envConcurrencyLimitKey(env),

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ export async function createBackgroundTasks(
179179
taskQueue.name,
180180
taskQueue.concurrencyLimit
181181
);
182+
} else {
183+
await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name);
182184
}
183185
} catch (error) {
184186
if (error instanceof Prisma.PrismaClientKnownRequestError) {

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ export class TriggerTaskService extends BaseService {
219219
taskQueue.name,
220220
taskQueue.concurrencyLimit
221221
);
222+
} else {
223+
await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name);
222224
}
223225
}
224226

references/v3-catalog/src/queues.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import dotenv from "dotenv";
22
import { simpleChildTask } from "./trigger/subtasks";
3+
import { wait } from "@trigger.dev/sdk/v3";
34

45
dotenv.config();
56

@@ -54,6 +55,18 @@ export async function run() {
5455
},
5556
},
5657
]);
58+
59+
await wait.for({ seconds: 5 });
60+
61+
//this should set the concurrencyLimit back to none
62+
await simpleChildTask.trigger(
63+
{ message: "Simple alt queue 2" },
64+
{
65+
queue: {
66+
name: "queue-concurrency-3",
67+
},
68+
}
69+
);
5770
}
5871

5972
run().catch(console.error);

0 commit comments

Comments
 (0)