Skip to content

Commit 2dd5d46

Browse files
committed
If a checkpoint has been created, the coordinator won’t continue the run with RESUME_AFTER_DEPENDENCY_WITH_ACK
1 parent 7256244 commit 2dd5d46

File tree

2 files changed

+29
-0
lines changed

2 files changed

+29
-0
lines changed

apps/coordinator/src/index.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,22 @@ class TaskCoordinator {
178178
};
179179
}
180180

181+
//if this is set, we want to kill the process because it will be resumed with the checkpoint from the queue
182+
if (taskSocket.data.requiresCheckpointResumeWithMessage) {
183+
logger.log("RESUME_AFTER_DEPENDENCY_WITH_ACK: Checkpoint is set so going to nack", {
184+
socketData: taskSocket.data,
185+
});
186+
187+
return {
188+
success: false,
189+
error: {
190+
name: "CheckpointMessagePresentError",
191+
message:
192+
"Checkpoint message is present, so we need to kill the process and resume from the queue.",
193+
},
194+
};
195+
}
196+
181197
await chaosMonkey.call();
182198

183199
// In case the task resumed faster than we could checkpoint
@@ -819,6 +835,12 @@ class TaskCoordinator {
819835
return;
820836
}
821837

838+
//setting this means we can only resume from a checkpoint
839+
socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`;
840+
logger.log("WAIT_FOR_TASK set requiresCheckpointResumeWithMessage", {
841+
requiresCheckpointResumeWithMessage: socket.data.requiresCheckpointResumeWithMessage,
842+
});
843+
822844
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
823845
version: "v1",
824846
attemptFriendlyId: message.attemptFriendlyId,
@@ -889,6 +911,12 @@ class TaskCoordinator {
889911
return;
890912
}
891913

914+
//setting this means we can only resume from a checkpoint
915+
socket.data.requiresCheckpointResumeWithMessage = `location:${checkpoint.location}-docker:${checkpoint.docker}`;
916+
logger.log("WAIT_FOR_BATCH set requiresCheckpointResumeWithMessage", {
917+
requiresCheckpointResumeWithMessage: socket.data.requiresCheckpointResumeWithMessage,
918+
});
919+
892920
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
893921
version: "v1",
894922
attemptFriendlyId: message.attemptFriendlyId,

packages/core/src/v3/schemas/messages.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,7 @@ export const ProdWorkerSocketData = z.object({
861861
podName: z.string(),
862862
deploymentId: z.string(),
863863
deploymentVersion: z.string(),
864+
requiresCheckpointResumeWithMessage: z.string().optional(),
864865
});
865866

866867
export const CoordinatorSocketData = z.object({

0 commit comments

Comments
 (0)