Skip to content

Prevent crashes on expected checkpoint cancellations #1324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 29 additions & 7 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ const chaosMonkey = new ChaosMonkey(
!!process.env.CHAOS_MONKEY_DISABLE_DELAYS
);

class CheckpointReadinessTimeoutError extends Error {}
class CheckpointCancelError extends Error {}

class TaskCoordinator {
#httpServer: ReturnType<typeof createServer>;
#checkpointer = new Checkpointer({
Expand Down Expand Up @@ -241,7 +244,7 @@ class TaskCoordinator {
return;
}

this.#checkpointer.cancelCheckpoint(message.runId);
this.#cancelCheckpoint(message.runId);

if (message.delayInMs) {
taskSocket.emit("REQUEST_EXIT", {
Expand Down Expand Up @@ -398,9 +401,14 @@ class TaskCoordinator {

let timeout: NodeJS.Timeout | undefined = undefined;

const CHECKPOINTABLE_TIMEOUT_SECONDS = 20;

const isCheckpointable = new Promise((resolve, reject) => {
// We set a reasonable timeout to prevent waiting forever
timeout = setTimeout(() => reject("timeout"), 20_000);
timeout = setTimeout(
() => reject(new CheckpointReadinessTimeoutError()),
CHECKPOINTABLE_TIMEOUT_SECONDS * 1000
);

this.#checkpointableTasks.set(socket.data.runId, { resolve, reject });
});
Expand All @@ -415,10 +423,24 @@ class TaskCoordinator {
} catch (error) {
logger.error("Error while waiting for checkpointable state", { error });

await crashRun({
name: "ReadyForCheckpointError",
message: `Failed to become checkpointable for ${reason}`,
});
if (error instanceof CheckpointReadinessTimeoutError) {
await crashRun({
name: error.name,
message: `Failed to become checkpointable in ${CHECKPOINTABLE_TIMEOUT_SECONDS}s for ${reason}`,
});

return {
success: false,
reason: "timeout",
};
}

if (error instanceof CheckpointCancelError) {
return {
success: false,
reason: "canceled",
};
}

return {
success: false,
Expand Down Expand Up @@ -1065,7 +1087,7 @@ class TaskCoordinator {

if (checkpointWait) {
// Stop waiting for task to reach checkpointable state
checkpointWait.reject("Checkpoint cancelled");
checkpointWait.reject(new CheckpointCancelError());
}

// Cancel checkpointing procedure
Expand Down
Loading