Skip to content

Commit 25522a3

Browse files
committed
handle dependency resume edge case
1 parent e877b4d commit 25522a3

File tree

3 files changed

+20
-21
lines changed

3 files changed

+20
-21
lines changed

apps/webapp/app/v3/services/createCheckpoint.server.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { generateFriendlyId } from "../friendlyIdentifiers";
66
import { marqs } from "~/v3/marqs/index.server";
77
import { CreateCheckpointRestoreEventService } from "./createCheckpointRestoreEvent.server";
88
import { BaseService } from "./baseService.server";
9-
import { CrashTaskRunService } from "./crashTaskRun.server";
109
import { isFinalRunStatus, isFreezableAttemptStatus, isFreezableRunStatus } from "../taskStatus";
1110

1211
export class CreateCheckpointService extends BaseService {

apps/webapp/app/v3/services/resumeBatchRun.server.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,7 @@ export class ResumeBatchRunService extends BaseService {
6161

6262
const dependentRun = batchRun.dependentTaskAttempt.taskRun;
6363

64-
if (batchRun.dependentTaskAttempt.status === "PAUSED") {
65-
if (!batchRun.checkpointEventId) {
66-
logger.error("Can't resume paused attempt without checkpoint event", {
67-
batchRunId: batchRun.id,
68-
});
69-
70-
await marqs?.acknowledgeMessage(dependentRun.id);
71-
return;
72-
}
73-
64+
if (batchRun.dependentTaskAttempt.status === "PAUSED" && batchRun.checkpointEventId) {
7465
await marqs?.enqueueMessage(
7566
environment,
7667
dependentRun.queue,
@@ -84,6 +75,15 @@ export class ResumeBatchRunService extends BaseService {
8475
dependentRun.concurrencyKey ?? undefined
8576
);
8677
} else {
78+
if (batchRun.dependentTaskAttempt.status === "PAUSED" && !batchRun.checkpointEventId) {
79+
// In case of race conditions and other bugs, the status can be PAUSED without a checkpoint event
80+
// The worker may still be up, so we will try to resume the dependent attempt by sending a message to the worker (on dequeue)
81+
logger.error("Batch run resume: Attempt is paused but there's no checkpoint event", {
82+
batchRunId: batchRun.id,
83+
dependentTaskAttemptId: batchRun.dependentTaskAttempt.id,
84+
});
85+
}
86+
8787
await marqs?.replaceMessage(dependentRun.id, {
8888
type: "RESUME",
8989
completedAttemptIds: batchRun.items.map((item) => item.taskRunAttemptId).filter(Boolean),

apps/webapp/app/v3/services/resumeTaskDependency.server.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,7 @@ export class ResumeTaskDependencyService extends BaseService {
3838

3939
const dependentRun = dependency.dependentAttempt.taskRun;
4040

41-
if (dependency.dependentAttempt.status === "PAUSED") {
42-
if (!dependency.checkpointEventId) {
43-
logger.error("Can't resume paused attempt without checkpoint event", {
44-
attemptId: dependency.id,
45-
});
46-
47-
await marqs?.acknowledgeMessage(dependentRun.id);
48-
return;
49-
}
50-
41+
if (dependency.dependentAttempt.status === "PAUSED" && dependency.checkpointEventId) {
5142
await marqs?.enqueueMessage(
5243
dependency.taskRun.runtimeEnvironment,
5344
dependentRun.queue,
@@ -61,6 +52,15 @@ export class ResumeTaskDependencyService extends BaseService {
6152
dependentRun.concurrencyKey ?? undefined
6253
);
6354
} else {
55+
if (dependency.dependentAttempt.status === "PAUSED" && !dependency.checkpointEventId) {
56+
// In case of race conditions and other bugs, the status can be PAUSED without a checkpoint event
57+
// The worker may still be up, so we will try to resume the dependent attempt by sending a message to the worker (on dequeue)
58+
logger.warn("Task dependency resume: Attempt is paused but there's no checkpoint event", {
59+
attemptId: dependency.id,
60+
dependentAttemptId: dependency.dependentAttempt.id,
61+
});
62+
}
63+
6464
await marqs?.replaceMessage(dependentRun.id, {
6565
type: "RESUME",
6666
completedAttemptIds: [sourceTaskAttemptId],

0 commit comments

Comments
 (0)