Skip to content

Commit 068c888

Browse files
committed
Remove the concurrencyLimit from a queue if the limit is null
1 parent c34d025 commit 068c888

File tree

4 files changed

+21
-0
lines changed

4 files changed

+21
-0
lines changed

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ export class MarQS {
8080
return this.redis.set(this.keys.queueConcurrencyLimitKey(env, queue), concurrency);
8181
}
8282

83+
public async removeQueueConcurrencyLimits(env: AuthenticatedEnvironment, queue: string) {
84+
return this.redis.del(this.keys.queueConcurrencyLimitKey(env, queue));
85+
}
86+
8387
public async updateEnvConcurrencyLimits(env: AuthenticatedEnvironment) {
8488
await this.#callUpdateGlobalConcurrencyLimits({
8589
envConcurrencyLimitKey: this.keys.envConcurrencyLimitKey(env),

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,8 @@ export async function createBackgroundTasks(
178178
taskQueue.name,
179179
taskQueue.concurrencyLimit
180180
);
181+
} else {
182+
await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name);
181183
}
182184
} catch (error) {
183185
if (error instanceof Prisma.PrismaClientKnownRequestError) {

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ export class TriggerTaskService extends BaseService {
219219
taskQueue.name,
220220
taskQueue.concurrencyLimit
221221
);
222+
} else {
223+
await marqs?.removeQueueConcurrencyLimits(environment, taskQueue.name);
222224
}
223225
}
224226

references/v3-catalog/src/queues.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import dotenv from "dotenv";
22
import { simpleChildTask } from "./trigger/subtasks";
3+
import { wait } from "@trigger.dev/sdk/v3";
34

45
dotenv.config();
56

@@ -54,6 +55,18 @@ export async function run() {
5455
},
5556
},
5657
]);
58+
59+
await wait.for({ seconds: 5 });
60+
61+
//this should set the concurrencyLimit back to none
62+
await simpleChildTask.trigger(
63+
{ message: "Simple alt queue 2" },
64+
{
65+
queue: {
66+
name: "queue-concurrency-3",
67+
},
68+
}
69+
);
5770
}
5871

5972
run().catch(console.error);

0 commit comments

Comments
 (0)