Skip to content

v3: fix consecutive waits #1073

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cea9452
fix spacing for delete hints
nicktrn Apr 24, 2024
f08ceb3
don't try to resume deps on wait resume
nicktrn Apr 24, 2024
d286642
sending duration wait resumes is not an error anymore
nicktrn Apr 24, 2024
00b58fd
set correct status with new wait resume flow
nicktrn Apr 24, 2024
3d2c3a8
cancel checkpoint schema v2
nicktrn Apr 25, 2024
0a6e549
don't mix messages and schemas
nicktrn Apr 25, 2024
35efb08
prevent unintended case fallthrough in tree view
nicktrn Apr 25, 2024
77a8a64
completely switch to platform-led duration wait resumes
nicktrn Apr 25, 2024
46a19fc
prevent infinite restores
nicktrn Apr 25, 2024
26e56a9
some entries for the catalog
nicktrn Apr 25, 2024
40eb9ee
Merge branch 'main' into v3/fix-consecutive-waits
nicktrn Apr 26, 2024
79d1e1d
Merge branch 'main' into v3/fix-consecutive-waits
nicktrn Apr 26, 2024
07ae47b
Merge branch 'main' into v3/fix-consecutive-waits
nicktrn Apr 28, 2024
77aac0d
Merge branch 'main' into v3/fix-consecutive-waits
nicktrn Apr 29, 2024
6480853
Merge branch 'main' into v3/fix-consecutive-waits
nicktrn Apr 30, 2024
0d13d50
add pg to additional packages
nicktrn Apr 30, 2024
6ff0952
add checkpoint safe timeout
nicktrn Apr 30, 2024
65ffd99
prevent duplicate spans after restore
nicktrn Apr 30, 2024
09564d5
wait for post start
nicktrn Apr 30, 2024
f330176
add sdk version to deploy tab
nicktrn Apr 30, 2024
d368f80
fail on impossible checkpoint scenarios
nicktrn Apr 30, 2024
354cf5f
Merge branch 'main' into v3/fix-consecutive-waits
nicktrn Apr 30, 2024
de479b9
remove debug logs
nicktrn May 1, 2024
e25a5e9
Merge branch 'main' into v3/fix-consecutive-waits
nicktrn May 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,18 @@ class Checkpointer {
return this.#abortControllers.has(runId);
}

cancelCheckpoint(runId: string) {
cancelCheckpoint(runId: string): boolean {
const controller = this.#abortControllers.get(runId);

if (!controller) {
logger.debug("Nothing to cancel", { runId });
return;
return false;
}

controller.abort("cancelCheckpointing()");
this.#abortControllers.delete(runId);

return true;
}

async #checkpointAndPush({
Expand Down Expand Up @@ -725,10 +727,18 @@ class TaskCoordinator {
checkpointable.resolve();
});

socket.on("CANCEL_CHECKPOINT", async (message) => {
socket.on("CANCEL_CHECKPOINT", async (message, callback) => {
logger.log("[CANCEL_CHECKPOINT]", message);

this.#cancelCheckpoint(socket.data.runId);
if (message.version === "v1") {
this.#cancelCheckpoint(socket.data.runId);
// v1 has no callback
return;
}

const checkpointCanceled = this.#cancelCheckpoint(socket.data.runId);

callback({ version: "v2", checkpointCanceled });
});

socket.on("WAIT_FOR_DURATION", async (message, callback) => {
Expand Down Expand Up @@ -933,7 +943,9 @@ class TaskCoordinator {
}

// Cancel checkpointing procedure
this.#checkpointer.cancelCheckpoint(runId);
const checkpointCanceled = this.#checkpointer.cancelCheckpoint(runId);

return checkpointCanceled;
}

#createHttpServer() {
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/components/primitives/TreeView/reducer.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import assertNever from "assert-never";
import { Filter, FlatTree } from "./TreeView";
import {
applyFilterToState,
Expand Down Expand Up @@ -543,6 +544,9 @@ export function reducer(state: TreeState, action: Action): TreeState {
});
return newState;
}
default: {
assertNever(action);
}
}

throw new Error(`Unhandled action type: ${(action as any).type}`);
Expand Down
2 changes: 2 additions & 0 deletions apps/webapp/app/presenters/v3/DeploymentPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ export class DeploymentPresenter {
exportName: "asc",
},
},
sdkVersion: true,
},
},
triggeredBy: {
Expand Down Expand Up @@ -135,6 +136,7 @@ export class DeploymentPresenter {
},
deployedBy: deployment.triggeredBy,
errorData: this.#prepareErrorData(deployment.errorData),
sdkVersion: deployment.worker?.sdkVersion,
},
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ export default function Page() {
<FormError id={projectSlug.errorId}>{projectSlug.error}</FormError>
<FormError>{deleteForm.error}</FormError>
<Hint>
This change is irreversible, so please be certain. Type in the Project slug
This change is irreversible, so please be certain. Type in the Project slug{" "}
<InlineCode variant="extra-small">{project.slug}</InlineCode> and then press
Delete.
</Hint>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ export default function Page() {
<DeploymentStatus status={deployment.status} className="text-sm" />
</Property>
<Property label="Tasks">{deployment.tasks ? deployment.tasks.length : "–"}</Property>
<Property label="SDK Version">
{deployment.sdkVersion ? deployment.sdkVersion : "–"}
</Property>
<Property label="Started at">
<Paragraph variant="small/bright">
<DateTimeAccurate date={deployment.createdAt} /> UTC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ export default function Page() {
<FormError id={organizationSlug.errorId}>{organizationSlug.error}</FormError>
<FormError>{deleteForm.error}</FormError>
<Hint>
This change is irreversible, so please be certain. Type in the Organization slug
This change is irreversible, so please be certain. Type in the Organization slug{" "}
<InlineCode variant="extra-small">{organization.slug}</InlineCode> and then
press Delete.
</Hint>
Expand Down
82 changes: 65 additions & 17 deletions apps/webapp/app/v3/marqs/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ export class MarQS {
return;
}

const message = await this.#readMessage(messageData.messageId);
const message = await this.readMessage(messageData.messageId);

if (message) {
span.setAttributes({
Expand Down Expand Up @@ -308,7 +308,7 @@ export class MarQS {
return;
}

const message = await this.#readMessage(messageData.messageId);
const message = await this.readMessage(messageData.messageId);

if (message) {
span.setAttributes({
Expand Down Expand Up @@ -336,7 +336,7 @@ export class MarQS {
return this.#trace(
"acknowledgeMessage",
async (span) => {
const message = await this.#readMessage(messageId);
const message = await this.readMessage(messageId);

if (!message) {
return;
Expand Down Expand Up @@ -374,12 +374,13 @@ export class MarQS {
public async replaceMessage(
messageId: string,
messageData: Record<string, unknown>,
timestamp?: number
timestamp?: number,
inplace?: boolean
) {
return this.#trace(
"replaceMessage",
async (span) => {
const oldMessage = await this.#readMessage(messageId);
const oldMessage = await this.readMessage(messageId);

if (!oldMessage) {
return;
Expand All @@ -392,6 +393,27 @@ export class MarQS {
[SemanticAttributes.PARENT_QUEUE]: oldMessage.parentQueue,
});

const traceContext = {
traceparent: oldMessage.data.traceparent,
tracestate: oldMessage.data.tracestate,
};

const newMessage: MessagePayload = {
version: "1",
// preserve original trace context
data: { ...messageData, ...traceContext },
queue: oldMessage.queue,
concurrencyKey: oldMessage.concurrencyKey,
timestamp: timestamp ?? Date.now(),
messageId,
parentQueue: oldMessage.parentQueue,
};

if (inplace) {
await this.#callReplaceMessage(newMessage);
return;
}

await this.#callAcknowledgeMessage({
parentQueue: oldMessage.parentQueue,
messageKey: this.keys.messageKey(messageId),
Expand All @@ -403,16 +425,6 @@ export class MarQS {
messageId,
});

const newMessage: MessagePayload = {
version: "1",
data: messageData,
queue: oldMessage.queue,
concurrencyKey: oldMessage.concurrencyKey,
timestamp: timestamp ?? Date.now(),
messageId,
parentQueue: oldMessage.parentQueue,
};

await this.#callEnqueueMessage(newMessage);
},
{
Expand Down Expand Up @@ -455,7 +467,7 @@ export class MarQS {
return this.#trace(
"nackMessage",
async (span) => {
const message = await this.#readMessage(messageId);
const message = await this.readMessage(messageId);

if (!message) {
return;
Expand Down Expand Up @@ -505,7 +517,7 @@ export class MarQS {
return this.options.visibilityTimeoutInMs ?? 300000;
}

async #readMessage(messageId: string) {
async readMessage(messageId: string) {
return this.#trace(
"readMessage",
async (span) => {
Expand Down Expand Up @@ -881,6 +893,17 @@ export class MarQS {
};
}

async #callReplaceMessage(message: MessagePayload) {
logger.debug("Calling replaceMessage", {
messagePayload: message,
});

return this.redis.replaceMessage(
this.keys.messageKey(message.messageId),
JSON.stringify(message)
);
}

async #callAcknowledgeMessage({
parentQueue,
messageKey,
Expand Down Expand Up @@ -1185,6 +1208,25 @@ return {messageId, messageScore} -- Return message details
`,
});

this.redis.defineCommand("replaceMessage", {
numberOfKeys: 1,
lua: `
local messageKey = KEYS[1]
local messageData = ARGV[1]

-- Check if message exists
local existingMessage = redis.call('GET', messageKey)

-- Do nothing if it doesn't
if #existingMessage == nil then
return nil
end

-- Replace the message
redis.call('SET', messageKey, messageData, 'GET')
`,
});

this.redis.defineCommand("acknowledgeMessage", {
numberOfKeys: 7,
lua: `
Expand Down Expand Up @@ -1406,6 +1448,12 @@ declare module "ioredis" {
callback?: Callback<[string, string]>
): Result<[string, string] | null, Context>;

replaceMessage(
messageKey: string,
messageData: string,
callback?: Callback<void>
): Result<void, Context>;

acknowledgeMessage(
parentQueue: string,
messageKey: string,
Expand Down
39 changes: 37 additions & 2 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ import { socketIo } from "../handleSocketIo.server";
import { findCurrentWorkerDeployment } from "../models/workerDeployment.server";
import { RestoreCheckpointService } from "../services/restoreCheckpoint.server";
import { tracer } from "../tracer.server";
import { CrashTaskRunService } from "../services/crashTaskRun.server";

const WithTraceContext = z.object({
traceparent: z.string().optional(),
tracestate: z.string().optional(),
});

const MessageBody = z.discriminatedUnion("type", [
export const SharedQueueMessageBody = z.discriminatedUnion("type", [
WithTraceContext.extend({
type: z.literal("EXECUTE"),
taskIdentifier: z.string(),
Expand All @@ -51,8 +52,14 @@ const MessageBody = z.discriminatedUnion("type", [
resumableAttemptId: z.string(),
checkpointEventId: z.string(),
}),
WithTraceContext.extend({
type: z.literal("FAIL"),
reason: z.string(),
}),
]);

export type SharedQueueMessageBody = z.infer<typeof SharedQueueMessageBody>;

type BackgroundWorkerWithTasks = BackgroundWorker & { tasks: BackgroundWorkerTask[] };

export type SharedQueueConsumerOptions = {
Expand Down Expand Up @@ -233,7 +240,7 @@ export class SharedQueueConsumer {

logger.log("dequeueMessageInSharedQueue()", { queueMessage: message });

const messageBody = MessageBody.safeParse(message.data);
const messageBody = SharedQueueMessageBody.safeParse(message.data);

if (!messageBody.success) {
logger.error("Failed to parse message", {
Expand Down Expand Up @@ -754,6 +761,34 @@ export class SharedQueueConsumer {

break;
}
// Fail for whatever reason, usually runs that have been resumed but stopped heartbeating
case "FAIL": {
const existingTaskRun = await prisma.taskRun.findUnique({
where: {
id: message.messageId,
},
});

if (!existingTaskRun) {
logger.error("No existing task run to fail", {
queueMessage: messageBody,
messageId: message.messageId,
});

await this.#ackAndDoMoreWork(message.messageId);
return;
}

// TODO: Consider failing the attempt and retrying instead. This may not be a good idea, as dequeued FAIL messages tend to point towards critical, persistent errors.
const service = new CrashTaskRunService();
await service.call(existingTaskRun.id, {
crashAttempts: true,
reason: messageBody.data.reason,
});

await this.#ackAndDoMoreWork(message.messageId);
return;
}
}

this.#doMoreWork();
Expand Down
9 changes: 9 additions & 0 deletions apps/webapp/app/v3/services/createCheckpoint.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { generateFriendlyId } from "../friendlyIdentifiers";
import { marqs } from "~/v3/marqs/index.server";
import { CreateCheckpointRestoreEventService } from "./createCheckpointRestoreEvent.server";
import { BaseService } from "./baseService.server";
import { CrashTaskRunService } from "./crashTaskRun.server";

const FREEZABLE_RUN_STATUSES: TaskRunStatus[] = ["EXECUTING", "RETRYING_AFTER_FAILURE"];
const FREEZABLE_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = ["EXECUTING", "FAILED"];
Expand Down Expand Up @@ -61,6 +62,14 @@ export class CreateCheckpointService extends BaseService {
status: attempt.taskRun.status,
},
});

// This should only affect CLIs < beta.24, in very limited scenarios
const service = new CrashTaskRunService(this._prisma);
await service.call(attempt.taskRunId, {
crashAttempts: true,
reason: "Unfreezable state: Please upgrade your CLI",
});

return;
}

Expand Down
Loading