Skip to content

Commit 5886208

Browse files
committed
ensure we only execute after the run creation tx has completed
1 parent 3905a6f commit 5886208

File tree

2 files changed

+19
-10
lines changed

2 files changed

+19
-10
lines changed

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,9 @@ export class SharedQueueConsumer {
295295
messageId: message.messageId,
296296
});
297297

298+
// INFO: There used to be a race condition where tasks could be triggered, but execute messages could be dequeued before the run finished being created in the DB
299+
// This should not be happening anymore. In case it does, consider reqeueuing here with a brief delay while limiting total retries.
300+
298301
await this.#ackAndDoMoreWork(message.messageId);
299302
return;
300303
}

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ export class TriggerTaskService extends BaseService {
7171
async (event, traceContext) => {
7272
const lockId = taskIdentifierToLockId(taskId);
7373

74-
return await $transaction(this._prisma, async (tx) => {
74+
const run = await $transaction(this._prisma, async (tx) => {
7575
await tx.$executeRaw`SELECT pg_advisory_xact_lock(${lockId})`;
7676

7777
const lockedToBackgroundWorker = body.options?.lockToVersion
@@ -169,17 +169,23 @@ export class TriggerTaskService extends BaseService {
169169
}
170170
}
171171

172-
// We need to enqueue the task run into the appropriate queue
173-
await marqs?.enqueueMessage(
174-
environment,
175-
queueName,
176-
taskRun.id,
177-
{ type: "EXECUTE", taskIdentifier: taskId },
178-
body.options?.concurrencyKey
179-
);
180-
181172
return taskRun;
182173
});
174+
175+
if (!run) {
176+
return;
177+
}
178+
179+
// We need to enqueue the task run into the appropriate queue. This is done after the tx completes to prevent a race condition where the task run hasn't been created yet by the time we dequeue.
180+
await marqs?.enqueueMessage(
181+
environment,
182+
run.queue,
183+
run.id,
184+
{ type: "EXECUTE", taskIdentifier: taskId },
185+
body.options?.concurrencyKey
186+
);
187+
188+
return run;
183189
}
184190
);
185191
});

0 commit comments

Comments
 (0)