Skip to content

Commit e2585c0

Browse files
committed
Make sourceTaskAttemptId optional on resumeBatchRun
1 parent fccc0ac commit e2585c0

File tree

4 files changed

+63
-32
lines changed

4 files changed

+63
-32
lines changed

apps/webapp/app/services/worker.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ const workerCatalog = {
131131
}),
132132
"v3.resumeBatchRun": z.object({
133133
batchRunId: z.string(),
134-
sourceTaskAttemptId: z.string(),
134+
sourceTaskAttemptId: z.string().optional(),
135135
}),
136136
"v3.resumeTaskDependency": z.object({
137137
dependencyId: z.string(),

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -617,15 +617,15 @@ export class SharedQueueConsumer {
617617
return;
618618
}
619619

620-
if (messageBody.data.completedAttemptIds.length < 1) {
621-
logger.error("No attempt IDs provided", {
622-
queueMessage: message.data,
623-
messageId: message.messageId,
624-
});
625-
626-
await this.#ackAndDoMoreWork(message.messageId);
627-
return;
628-
}
620+
// if (messageBody.data.completedAttemptIds.length < 1) {
621+
// logger.error("No attempt IDs provided", {
622+
// queueMessage: message.data,
623+
// messageId: message.messageId,
624+
// });
625+
626+
// await this.#ackAndDoMoreWork(message.messageId);
627+
// return;
628+
// }
629629

630630
const resumableRun = await prisma.taskRun.findUnique({
631631
where: {

apps/webapp/app/v3/services/createCheckpoint.server.ts

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import { marqs } from "~/v3/marqs/index.server";
77
import { CreateCheckpointRestoreEventService } from "./createCheckpointRestoreEvent.server";
88
import { BaseService } from "./baseService.server";
99
import { isFinalRunStatus, isFreezableAttemptStatus, isFreezableRunStatus } from "../taskStatus";
10+
import { ResumeBatchRunService } from "./resumeBatchRun.server";
11+
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
1012

1113
export class CreateCheckpointService extends BaseService {
1214
public async call(
@@ -89,6 +91,9 @@ export class CreateCheckpointService extends BaseService {
8991
};
9092
}
9193

94+
//sleep for 20 seconds
95+
// await new Promise((resolve) => setTimeout(resolve, 10_000));
96+
9297
const checkpoint = await this._prisma.checkpoint.create({
9398
data: {
9499
friendlyId: generateFriendlyId("checkpoint"),
@@ -154,10 +159,33 @@ export class CreateCheckpointService extends BaseService {
154159
batchDependencyFriendlyId: reason.batchFriendlyId,
155160
});
156161

157-
keepRunAlive = await this.#isBatchCompleted(reason.batchFriendlyId);
162+
if (checkpointEvent) {
163+
const batchRun = await this._prisma.batchTaskRun.findFirst({
164+
select: {
165+
id: true,
166+
},
167+
where: {
168+
friendlyId: reason.batchFriendlyId,
169+
},
170+
});
158171

159-
if (!keepRunAlive) {
160-
await marqs?.acknowledgeMessage(attempt.taskRunId);
172+
if (!batchRun) {
173+
logger.error("Batch not found", { friendlyId: reason.batchFriendlyId });
174+
await marqs?.acknowledgeMessage(attempt.taskRunId);
175+
176+
return {
177+
success: false,
178+
};
179+
}
180+
181+
await ResumeBatchRunService.enqueue(batchRun.id, undefined, this._prisma);
182+
183+
return {
184+
success: true,
185+
checkpoint,
186+
event: checkpointEvent,
187+
keepRunAlive: false,
188+
};
161189
}
162190

163191
break;

apps/webapp/app/v3/services/resumeBatchRun.server.ts

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { BaseService } from "./baseService.server";
55
import { logger } from "~/services/logger.server";
66

77
export class ResumeBatchRunService extends BaseService {
8-
public async call(batchRunId: string, sourceTaskAttemptId: string) {
8+
public async call(batchRunId: string, sourceTaskAttemptId?: string) {
99
const batchRun = await this._prisma.batchTaskRun.findFirst({
1010
where: {
1111
id: batchRunId,
@@ -41,34 +41,43 @@ export class ResumeBatchRunService extends BaseService {
4141
return;
4242
}
4343

44-
// We need to update the batchRun status so we don't resume it again
45-
await this._prisma.batchTaskRun.update({
46-
where: {
47-
id: batchRun.id,
48-
},
49-
data: {
50-
status: "COMPLETED",
51-
},
52-
});
53-
5444
// This batch has a dependent attempt and just finalized, we should resume that attempt
5545
const environment = batchRun.dependentTaskAttempt.runtimeEnvironment;
5646

5747
// If we are in development, we don't need to resume the dependent task (that will happen automatically)
5848
if (environment.type === "DEVELOPMENT") {
49+
// We need to update the batchRun status so we don't resume it again
50+
await this._prisma.batchTaskRun.update({
51+
where: {
52+
id: batchRun.id,
53+
},
54+
data: {
55+
status: "COMPLETED",
56+
},
57+
});
5958
return;
6059
}
6160

6261
const dependentRun = batchRun.dependentTaskAttempt.taskRun;
6362

6463
if (batchRun.dependentTaskAttempt.status === "PAUSED" && batchRun.checkpointEventId) {
64+
// We need to update the batchRun status so we don't resume it again
65+
await this._prisma.batchTaskRun.update({
66+
where: {
67+
id: batchRun.id,
68+
},
69+
data: {
70+
status: "COMPLETED",
71+
},
72+
});
73+
6574
await marqs?.enqueueMessage(
6675
environment,
6776
dependentRun.queue,
6877
dependentRun.id,
6978
{
7079
type: "RESUME",
71-
completedAttemptIds: [sourceTaskAttemptId],
80+
completedAttemptIds: sourceTaskAttemptId ? [sourceTaskAttemptId] : [],
7281
resumableAttemptId: batchRun.dependentTaskAttempt.id,
7382
checkpointEventId: batchRun.checkpointEventId,
7483
projectId: batchRun.dependentTaskAttempt.runtimeEnvironment.projectId,
@@ -86,18 +95,12 @@ export class ResumeBatchRunService extends BaseService {
8695
dependentTaskAttemptId: batchRun.dependentTaskAttempt.id,
8796
});
8897
}
89-
90-
await marqs?.replaceMessage(dependentRun.id, {
91-
type: "RESUME",
92-
completedAttemptIds: batchRun.items.map((item) => item.taskRunAttemptId).filter(Boolean),
93-
resumableAttemptId: batchRun.dependentTaskAttempt.id,
94-
});
9598
}
9699
}
97100

98101
static async enqueue(
99102
batchRunId: string,
100-
sourceTaskAttemptId: string,
103+
sourceTaskAttemptId: string | undefined,
101104
tx: PrismaClientOrTransaction,
102105
runAt?: Date
103106
) {

0 commit comments

Comments
 (0)