Skip to content

Commit 422f028

Browse files
committed
Fixes an issue with the shared queue consumer relying on the queue name to get the env id
1 parent 0010375 commit 422f028

File tree

1 file changed

+8
-34
lines changed

1 file changed

+8
-34
lines changed

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -251,35 +251,12 @@ export class SharedQueueConsumer {
251251

252252
logger.log("dequeueMessageInSharedQueue()", { queueMessage: message });
253253

254-
const envId = this.#envIdFromQueue(message.queue);
255-
256-
const environment = await prisma.runtimeEnvironment.findUnique({
257-
include: {
258-
organization: true,
259-
project: true,
260-
},
261-
where: {
262-
id: envId,
263-
},
264-
});
265-
266-
if (!environment) {
267-
logger.error("Environment not found", {
268-
queueMessage: message.data,
269-
envId,
270-
});
271-
272-
this.#ackAndDoMoreWork(message.messageId);
273-
return;
274-
}
275-
276254
const messageBody = MessageBody.safeParse(message.data);
277255

278256
if (!messageBody.success) {
279257
logger.error("Failed to parse message", {
280258
queueMessage: message.data,
281259
error: messageBody.error,
282-
env: environment,
283260
});
284261

285262
this.#ackAndDoMoreWork(message.messageId);
@@ -381,6 +358,7 @@ export class SharedQueueConsumer {
381358
lockedById: backgroundTask.id,
382359
},
383360
include: {
361+
runtimeEnvironment: true,
384362
attempts: {
385363
take: 1,
386364
orderBy: { number: "desc" },
@@ -411,7 +389,7 @@ export class SharedQueueConsumer {
411389
const queue = await prisma.taskQueue.findUnique({
412390
where: {
413391
runtimeEnvironmentId_name: {
414-
runtimeEnvironmentId: environment.id,
392+
runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId,
415393
name: lockedTaskRun.queue,
416394
},
417395
},
@@ -437,7 +415,7 @@ export class SharedQueueConsumer {
437415
backgroundWorkerTaskId: backgroundTask.id,
438416
status: "PENDING" as const,
439417
queueId: queue.id,
440-
runtimeEnvironmentId: environment.id,
418+
runtimeEnvironmentId: lockedTaskRun.runtimeEnvironmentId,
441419
},
442420
include: {
443421
backgroundWorkerTask: true,
@@ -493,10 +471,10 @@ export class SharedQueueConsumer {
493471
machine: machine.data,
494472
// identifiers
495473
id: taskRunAttempt.id,
496-
envId: environment.id,
497-
envType: environment.type,
498-
orgId: environment.organizationId,
499-
projectId: environment.projectId,
474+
envId: lockedTaskRun.runtimeEnvironment.id,
475+
envType: lockedTaskRun.runtimeEnvironment.type,
476+
orgId: lockedTaskRun.runtimeEnvironment.organizationId,
477+
projectId: lockedTaskRun.runtimeEnvironment.projectId,
500478
runId: taskRunAttempt.taskRunId,
501479
},
502480
});
@@ -625,7 +603,7 @@ export class SharedQueueConsumer {
625603
const queue = await prisma.taskQueue.findUnique({
626604
where: {
627605
runtimeEnvironmentId_name: {
628-
runtimeEnvironmentId: environment.id,
606+
runtimeEnvironmentId: resumableAttempt.runtimeEnvironmentId,
629607
name: resumableRun.queue,
630608
},
631609
},
@@ -754,10 +732,6 @@ export class SharedQueueConsumer {
754732
return;
755733
}
756734

757-
#envIdFromQueue(queueName: string) {
758-
return queueName.split(":")[1];
759-
}
760-
761735
#doMoreWork(intervalInMs = this._options.interval) {
762736
setTimeout(() => this.#doWork(), intervalInMs);
763737
}

0 commit comments

Comments
 (0)