Skip to content

Commit 38fcc85

Browse files
committed
Add reason to MarQS acknowledgeMessage to track why it's called
1 parent 0a18e1d commit 38fcc85

File tree

7 files changed

+52
-15
lines changed

7 files changed

+52
-15
lines changed

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -329,13 +329,16 @@ export class DevQueueConsumer {
329329
env: this.env,
330330
});
331331

332-
await marqs?.acknowledgeMessage(message.messageId);
332+
await marqs?.acknowledgeMessage(
333+
message.messageId,
334+
"Failed to parse message.data with MessageBody schema in DevQueueConsumer"
335+
);
333336

334337
setTimeout(() => this.#doWork(), 100);
335338
return;
336339
}
337340

338-
const existingTaskRun = await prisma.taskRun.findUnique({
341+
const existingTaskRun = await prisma.taskRun.findFirst({
339342
where: {
340343
id: message.messageId,
341344
},
@@ -346,7 +349,10 @@ export class DevQueueConsumer {
346349
messageId: message.messageId,
347350
});
348351

349-
await marqs?.acknowledgeMessage(message.messageId);
352+
await marqs?.acknowledgeMessage(
353+
message.messageId,
354+
"Failed to find task run in DevQueueConsumer"
355+
);
350356
setTimeout(() => this.#doWork(), 100);
351357
return;
352358
}
@@ -365,7 +371,10 @@ export class DevQueueConsumer {
365371
latestWorker: this.#getLatestBackgroundWorker(),
366372
});
367373

368-
await marqs?.acknowledgeMessage(message.messageId);
374+
await marqs?.acknowledgeMessage(
375+
message.messageId,
376+
"Failed to find background worker in DevQueueConsumer"
377+
);
369378
setTimeout(() => this.#doWork(), 100);
370379
return;
371380
}
@@ -382,7 +391,10 @@ export class DevQueueConsumer {
382391
taskSlugs: backgroundWorker.tasks.map((task) => task.slug),
383392
});
384393

385-
await marqs?.acknowledgeMessage(message.messageId);
394+
await marqs?.acknowledgeMessage(
395+
message.messageId,
396+
"No matching background task found in DevQueueConsumer"
397+
);
386398

387399
setTimeout(() => this.#doWork(), 100);
388400
return;
@@ -416,7 +428,10 @@ export class DevQueueConsumer {
416428
messageId: message.messageId,
417429
});
418430

419-
await marqs?.acknowledgeMessage(message.messageId);
431+
await marqs?.acknowledgeMessage(
432+
message.messageId,
433+
"Failed to lock task run in DevQueueConsumer"
434+
);
420435

421436
setTimeout(() => this.#doWork(), 100);
422437
return;
@@ -515,7 +530,10 @@ export class DevQueueConsumer {
515530
messageId: message.messageId,
516531
backgroundWorker,
517532
});
518-
await marqs?.acknowledgeMessage(message.messageId);
533+
await marqs?.acknowledgeMessage(
534+
message.messageId,
535+
"Non-lazy attempts are no longer supported in DevQueueConsumer"
536+
);
519537

520538
setTimeout(() => this.#doWork(), 100);
521539
}

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,8 @@ export class MarQS {
291291
orgConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(messageQueue),
292292
messageId: messageData.messageId,
293293
});
294+
295+
return;
294296
}
295297

296298
await this.options.visibilityTimeoutStrategy.heartbeat(
@@ -411,7 +413,7 @@ export class MarQS {
411413
);
412414
}
413415

414-
public async acknowledgeMessage(messageId: string) {
416+
public async acknowledgeMessage(messageId: string, reason: string = "unknown") {
415417
return this.#trace(
416418
"acknowledgeMessage",
417419
async (span) => {
@@ -421,6 +423,7 @@ export class MarQS {
421423
logger.log(`[${this.name}].acknowledgeMessage() message not found`, {
422424
messageId,
423425
service: this.name,
426+
reason,
424427
});
425428
return;
426429
}
@@ -430,6 +433,7 @@ export class MarQS {
430433
[SemanticAttributes.MESSAGE_ID]: message.messageId,
431434
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
432435
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
436+
["marqs.reason"]: reason,
433437
});
434438

435439
await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId);

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -975,7 +975,7 @@ export class SharedQueueConsumer {
975975
}
976976

977977
async #ackAndDoMoreWork(messageId: string, intervalInMs?: number) {
978-
await marqs?.acknowledgeMessage(messageId);
978+
await marqs?.acknowledgeMessage(messageId, "Acking and doing more work in SharedQueueConsumer");
979979
this.#doMoreWork(intervalInMs);
980980
}
981981

apps/webapp/app/v3/requeueTaskRun.server.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,10 @@ export class RequeueTaskRunService extends BaseService {
7171
case "WAITING_FOR_DEPLOY": {
7272
logger.debug("[RequeueTaskRunService] Removing task run from queue", { taskRun });
7373

74-
await marqs?.acknowledgeMessage(taskRun.id);
74+
await marqs?.acknowledgeMessage(
75+
taskRun.id,
76+
"Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in RequeueTaskRunService"
77+
);
7578

7679
break;
7780
}
@@ -93,7 +96,10 @@ export class RequeueTaskRunService extends BaseService {
9396
case "CANCELED": {
9497
logger.debug("[RequeueTaskRunService] Task run is completed", { taskRun });
9598

96-
await marqs?.acknowledgeMessage(taskRun.id);
99+
await marqs?.acknowledgeMessage(
100+
taskRun.id,
101+
"Task run is already completed in RequeueTaskRunService"
102+
);
97103

98104
try {
99105
if (taskRun.runtimeEnvironment.type === "DEVELOPMENT") {

apps/webapp/app/v3/services/createCheckpoint.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,10 @@ export class CreateCheckpointService extends BaseService {
307307
checkpointId: checkpoint.id,
308308
params,
309309
});
310-
await marqs?.acknowledgeMessage(attempt.taskRunId);
310+
await marqs?.acknowledgeMessage(
311+
attempt.taskRunId,
312+
"No checkpoint event in CreateCheckpointService"
313+
);
311314

312315
return {
313316
success: false,

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ export class FinalizeTaskRunService extends BaseService {
6161
expiredAt,
6262
completedAt,
6363
});
64-
await marqs?.acknowledgeMessage(id);
64+
await marqs?.acknowledgeMessage(id, "FinalTaskRunService call");
6565

6666
logger.debug("Finalizing run updating run status", {
6767
id,

apps/webapp/app/v3/services/resumeAttempt.server.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,10 @@ export class ResumeAttemptService extends BaseService {
172172

173173
if (!completedAttempt) {
174174
this._logger.error("Completed attempt not found", { completedAttemptId });
175-
await marqs?.acknowledgeMessage(attempt.taskRunId);
175+
await marqs?.acknowledgeMessage(
176+
attempt.taskRunId,
177+
"Cannot find completed attempt in ResumeAttemptService"
178+
);
176179
return;
177180
}
178181

@@ -186,7 +189,10 @@ export class ResumeAttemptService extends BaseService {
186189

187190
if (!resumePayload) {
188191
logger.error("Failed to get resume payload");
189-
await marqs?.acknowledgeMessage(attempt.taskRunId);
192+
await marqs?.acknowledgeMessage(
193+
attempt.taskRunId,
194+
"Failed to get resume payload in ResumeAttemptService"
195+
);
190196
return;
191197
}
192198

0 commit comments

Comments
 (0)