Skip to content

Commit 6270024

Browse files
authored
v3: fix consecutive waits (#1073)
* fix spacing for delete hints * don't try to resume deps on wait resume * sending duration wait resumes is not an error anymore * set correct status with new wait resume flow * cancel checkpoint schema v2 * don't mix messages and schemas * prevent unintended case fallthrough in tree view * completely switch to platform-led duration wait resumes * prevent infinite restores * some entries for the catalog * add pg to additional packages * add checkpoint safe timeout * prevent duplicate spans after restore * wait for post start * add sdk version to deploy tab * fail on impossible checkpoint scenarios * remove debug logs
1 parent 6ce820c commit 6270024

File tree

22 files changed

+1113
-754
lines changed

22 files changed

+1113
-754
lines changed

apps/coordinator/src/index.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -157,16 +157,18 @@ class Checkpointer {
157157
return this.#abortControllers.has(runId);
158158
}
159159

160-
cancelCheckpoint(runId: string) {
160+
cancelCheckpoint(runId: string): boolean {
161161
const controller = this.#abortControllers.get(runId);
162162

163163
if (!controller) {
164164
logger.debug("Nothing to cancel", { runId });
165-
return;
165+
return false;
166166
}
167167

168168
controller.abort("cancelCheckpointing()");
169169
this.#abortControllers.delete(runId);
170+
171+
return true;
170172
}
171173

172174
async #checkpointAndPush({
@@ -725,10 +727,18 @@ class TaskCoordinator {
725727
checkpointable.resolve();
726728
});
727729

728-
socket.on("CANCEL_CHECKPOINT", async (message) => {
730+
socket.on("CANCEL_CHECKPOINT", async (message, callback) => {
729731
logger.log("[CANCEL_CHECKPOINT]", message);
730732

731-
this.#cancelCheckpoint(socket.data.runId);
733+
if (message.version === "v1") {
734+
this.#cancelCheckpoint(socket.data.runId);
735+
// v1 has no callback
736+
return;
737+
}
738+
739+
const checkpointCanceled = this.#cancelCheckpoint(socket.data.runId);
740+
741+
callback({ version: "v2", checkpointCanceled });
732742
});
733743

734744
socket.on("WAIT_FOR_DURATION", async (message, callback) => {
@@ -933,7 +943,9 @@ class TaskCoordinator {
933943
}
934944

935945
// Cancel checkpointing procedure
936-
this.#checkpointer.cancelCheckpoint(runId);
946+
const checkpointCanceled = this.#checkpointer.cancelCheckpoint(runId);
947+
948+
return checkpointCanceled;
937949
}
938950

939951
#createHttpServer() {

apps/webapp/app/components/primitives/TreeView/reducer.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import assertNever from "assert-never";
12
import { Filter, FlatTree } from "./TreeView";
23
import {
34
applyFilterToState,
@@ -543,6 +544,9 @@ export function reducer(state: TreeState, action: Action): TreeState {
543544
});
544545
return newState;
545546
}
547+
default: {
548+
assertNever(action);
549+
}
546550
}
547551

548552
throw new Error(`Unhandled action type: ${(action as any).type}`);

apps/webapp/app/presenters/v3/DeploymentPresenter.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ export class DeploymentPresenter {
103103
exportName: "asc",
104104
},
105105
},
106+
sdkVersion: true,
106107
},
107108
},
108109
triggeredBy: {
@@ -135,6 +136,7 @@ export class DeploymentPresenter {
135136
},
136137
deployedBy: deployment.triggeredBy,
137138
errorData: this.#prepareErrorData(deployment.errorData),
139+
sdkVersion: deployment.worker?.sdkVersion,
138140
},
139141
};
140142
}

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.settings/route.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ export default function Page() {
232232
<FormError id={projectSlug.errorId}>{projectSlug.error}</FormError>
233233
<FormError>{deleteForm.error}</FormError>
234234
<Hint>
235-
This change is irreversible, so please be certain. Type in the Project slug
235+
This change is irreversible, so please be certain. Type in the Project slug{" "}
236236
<InlineCode variant="extra-small">{project.slug}</InlineCode> and then press
237237
Delete.
238238
</Hint>

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.v3.$projectParam.deployments.$deploymentParam/route.tsx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ export default function Page() {
9292
<DeploymentStatus status={deployment.status} className="text-sm" />
9393
</Property>
9494
<Property label="Tasks">{deployment.tasks ? deployment.tasks.length : "–"}</Property>
95+
<Property label="SDK Version">
96+
{deployment.sdkVersion ? deployment.sdkVersion : "–"}
97+
</Property>
9598
<Property label="Started at">
9699
<Paragraph variant="small/bright">
97100
<DateTimeAccurate date={deployment.createdAt} /> UTC

apps/webapp/app/routes/_app.orgs.$organizationSlug.settings/route.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ export default function Page() {
239239
<FormError id={organizationSlug.errorId}>{organizationSlug.error}</FormError>
240240
<FormError>{deleteForm.error}</FormError>
241241
<Hint>
242-
This change is irreversible, so please be certain. Type in the Organization slug
242+
This change is irreversible, so please be certain. Type in the Organization slug{" "}
243243
<InlineCode variant="extra-small">{organization.slug}</InlineCode> and then
244244
press Delete.
245245
</Hint>

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ export class MarQS {
231231
return;
232232
}
233233

234-
const message = await this.#readMessage(messageData.messageId);
234+
const message = await this.readMessage(messageData.messageId);
235235

236236
if (message) {
237237
span.setAttributes({
@@ -308,7 +308,7 @@ export class MarQS {
308308
return;
309309
}
310310

311-
const message = await this.#readMessage(messageData.messageId);
311+
const message = await this.readMessage(messageData.messageId);
312312

313313
if (message) {
314314
span.setAttributes({
@@ -336,7 +336,7 @@ export class MarQS {
336336
return this.#trace(
337337
"acknowledgeMessage",
338338
async (span) => {
339-
const message = await this.#readMessage(messageId);
339+
const message = await this.readMessage(messageId);
340340

341341
if (!message) {
342342
return;
@@ -374,12 +374,13 @@ export class MarQS {
374374
public async replaceMessage(
375375
messageId: string,
376376
messageData: Record<string, unknown>,
377-
timestamp?: number
377+
timestamp?: number,
378+
inplace?: boolean
378379
) {
379380
return this.#trace(
380381
"replaceMessage",
381382
async (span) => {
382-
const oldMessage = await this.#readMessage(messageId);
383+
const oldMessage = await this.readMessage(messageId);
383384

384385
if (!oldMessage) {
385386
return;
@@ -392,6 +393,27 @@ export class MarQS {
392393
[SemanticAttributes.PARENT_QUEUE]: oldMessage.parentQueue,
393394
});
394395

396+
const traceContext = {
397+
traceparent: oldMessage.data.traceparent,
398+
tracestate: oldMessage.data.tracestate,
399+
};
400+
401+
const newMessage: MessagePayload = {
402+
version: "1",
403+
// preserve original trace context
404+
data: { ...messageData, ...traceContext },
405+
queue: oldMessage.queue,
406+
concurrencyKey: oldMessage.concurrencyKey,
407+
timestamp: timestamp ?? Date.now(),
408+
messageId,
409+
parentQueue: oldMessage.parentQueue,
410+
};
411+
412+
if (inplace) {
413+
await this.#callReplaceMessage(newMessage);
414+
return;
415+
}
416+
395417
await this.#callAcknowledgeMessage({
396418
parentQueue: oldMessage.parentQueue,
397419
messageKey: this.keys.messageKey(messageId),
@@ -403,16 +425,6 @@ export class MarQS {
403425
messageId,
404426
});
405427

406-
const newMessage: MessagePayload = {
407-
version: "1",
408-
data: messageData,
409-
queue: oldMessage.queue,
410-
concurrencyKey: oldMessage.concurrencyKey,
411-
timestamp: timestamp ?? Date.now(),
412-
messageId,
413-
parentQueue: oldMessage.parentQueue,
414-
};
415-
416428
await this.#callEnqueueMessage(newMessage);
417429
},
418430
{
@@ -455,7 +467,7 @@ export class MarQS {
455467
return this.#trace(
456468
"nackMessage",
457469
async (span) => {
458-
const message = await this.#readMessage(messageId);
470+
const message = await this.readMessage(messageId);
459471

460472
if (!message) {
461473
return;
@@ -505,7 +517,7 @@ export class MarQS {
505517
return this.options.visibilityTimeoutInMs ?? 300000;
506518
}
507519

508-
async #readMessage(messageId: string) {
520+
async readMessage(messageId: string) {
509521
return this.#trace(
510522
"readMessage",
511523
async (span) => {
@@ -881,6 +893,17 @@ export class MarQS {
881893
};
882894
}
883895

896+
async #callReplaceMessage(message: MessagePayload) {
897+
logger.debug("Calling replaceMessage", {
898+
messagePayload: message,
899+
});
900+
901+
return this.redis.replaceMessage(
902+
this.keys.messageKey(message.messageId),
903+
JSON.stringify(message)
904+
);
905+
}
906+
884907
async #callAcknowledgeMessage({
885908
parentQueue,
886909
messageKey,
@@ -1185,6 +1208,25 @@ return {messageId, messageScore} -- Return message details
11851208
`,
11861209
});
11871210

1211+
this.redis.defineCommand("replaceMessage", {
1212+
numberOfKeys: 1,
1213+
lua: `
1214+
local messageKey = KEYS[1]
1215+
local messageData = ARGV[1]
1216+
1217+
-- Check if message exists
1218+
local existingMessage = redis.call('GET', messageKey)
1219+
1220+
-- Do nothing if it doesn't
1221+
if #existingMessage == nil then
1222+
return nil
1223+
end
1224+
1225+
-- Replace the message
1226+
redis.call('SET', messageKey, messageData, 'GET')
1227+
`,
1228+
});
1229+
11881230
this.redis.defineCommand("acknowledgeMessage", {
11891231
numberOfKeys: 7,
11901232
lua: `
@@ -1406,6 +1448,12 @@ declare module "ioredis" {
14061448
callback?: Callback<[string, string]>
14071449
): Result<[string, string] | null, Context>;
14081450

1451+
replaceMessage(
1452+
messageKey: string,
1453+
messageData: string,
1454+
callback?: Callback<void>
1455+
): Result<void, Context>;
1456+
14091457
acknowledgeMessage(
14101458
parentQueue: string,
14111459
messageKey: string,

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,14 @@ import { socketIo } from "../handleSocketIo.server";
2828
import { findCurrentWorkerDeployment } from "../models/workerDeployment.server";
2929
import { RestoreCheckpointService } from "../services/restoreCheckpoint.server";
3030
import { tracer } from "../tracer.server";
31+
import { CrashTaskRunService } from "../services/crashTaskRun.server";
3132

3233
const WithTraceContext = z.object({
3334
traceparent: z.string().optional(),
3435
tracestate: z.string().optional(),
3536
});
3637

37-
const MessageBody = z.discriminatedUnion("type", [
38+
export const SharedQueueMessageBody = z.discriminatedUnion("type", [
3839
WithTraceContext.extend({
3940
type: z.literal("EXECUTE"),
4041
taskIdentifier: z.string(),
@@ -51,8 +52,14 @@ const MessageBody = z.discriminatedUnion("type", [
5152
resumableAttemptId: z.string(),
5253
checkpointEventId: z.string(),
5354
}),
55+
WithTraceContext.extend({
56+
type: z.literal("FAIL"),
57+
reason: z.string(),
58+
}),
5459
]);
5560

61+
export type SharedQueueMessageBody = z.infer<typeof SharedQueueMessageBody>;
62+
5663
type BackgroundWorkerWithTasks = BackgroundWorker & { tasks: BackgroundWorkerTask[] };
5764

5865
export type SharedQueueConsumerOptions = {
@@ -233,7 +240,7 @@ export class SharedQueueConsumer {
233240

234241
logger.log("dequeueMessageInSharedQueue()", { queueMessage: message });
235242

236-
const messageBody = MessageBody.safeParse(message.data);
243+
const messageBody = SharedQueueMessageBody.safeParse(message.data);
237244

238245
if (!messageBody.success) {
239246
logger.error("Failed to parse message", {
@@ -754,6 +761,34 @@ export class SharedQueueConsumer {
754761

755762
break;
756763
}
764+
// Fail for whatever reason, usually runs that have been resumed but stopped heartbeating
765+
case "FAIL": {
766+
const existingTaskRun = await prisma.taskRun.findUnique({
767+
where: {
768+
id: message.messageId,
769+
},
770+
});
771+
772+
if (!existingTaskRun) {
773+
logger.error("No existing task run to fail", {
774+
queueMessage: messageBody,
775+
messageId: message.messageId,
776+
});
777+
778+
await this.#ackAndDoMoreWork(message.messageId);
779+
return;
780+
}
781+
782+
// TODO: Consider failing the attempt and retrying instead. This may not be a good idea, as dequeued FAIL messages tend to point towards critical, persistent errors.
783+
const service = new CrashTaskRunService();
784+
await service.call(existingTaskRun.id, {
785+
crashAttempts: true,
786+
reason: messageBody.data.reason,
787+
});
788+
789+
await this.#ackAndDoMoreWork(message.messageId);
790+
return;
791+
}
757792
}
758793

759794
this.#doMoreWork();

apps/webapp/app/v3/services/createCheckpoint.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { generateFriendlyId } from "../friendlyIdentifiers";
1010
import { marqs } from "~/v3/marqs/index.server";
1111
import { CreateCheckpointRestoreEventService } from "./createCheckpointRestoreEvent.server";
1212
import { BaseService } from "./baseService.server";
13+
import { CrashTaskRunService } from "./crashTaskRun.server";
1314

1415
const FREEZABLE_RUN_STATUSES: TaskRunStatus[] = ["EXECUTING", "RETRYING_AFTER_FAILURE"];
1516
const FREEZABLE_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = ["EXECUTING", "FAILED"];
@@ -61,6 +62,14 @@ export class CreateCheckpointService extends BaseService {
6162
status: attempt.taskRun.status,
6263
},
6364
});
65+
66+
// This should only affect CLIs < beta.24, in very limited scenarios
67+
const service = new CrashTaskRunService(this._prisma);
68+
await service.call(attempt.taskRunId, {
69+
crashAttempts: true,
70+
reason: "Unfreezable state: Please upgrade your CLI",
71+
});
72+
6473
return;
6574
}
6675

0 commit comments

Comments
 (0)