Skip to content

Commit 0a33bf7

Browse files
authored
v3: handle errors and customizing retrying (#943)
* Add the ability to handle errors at the task and project level * Cancel in progress runs when disconnecting the dev CLI (in between attempts) * unify the task executor across dev & prod
1 parent 819b663 commit 0a33bf7

File tree

27 files changed

+745
-499
lines changed

27 files changed

+745
-499
lines changed

apps/webapp/app/components/primitives/TreeView/TreeView.tsx

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,9 @@
11
import { VirtualItem, Virtualizer, useVirtualizer } from "@tanstack/react-virtual";
2-
import {
3-
Fragment,
4-
MutableRefObject,
5-
RefObject,
6-
useCallback,
7-
useEffect,
8-
useImperativeHandle,
9-
useReducer,
10-
useRef,
11-
useState,
12-
} from "react";
2+
import { MutableRefObject, RefObject, useCallback, useEffect, useReducer, useRef } from "react";
133
import { UnmountClosed } from "react-collapse";
144
import { cn } from "~/utils/cn";
15-
import { Changes, NodeState, NodesState, reducer } from "./reducer";
16-
import {
17-
applyFilterToState,
18-
concreteStateFromInput,
19-
firstVisibleNode,
20-
lastVisibleNode,
21-
selectedIdFromState,
22-
} from "./utils";
5+
import { NodeState, NodesState, reducer } from "./reducer";
6+
import { applyFilterToState, concreteStateFromInput, selectedIdFromState } from "./utils";
237

248
export type TreeViewProps<TData> = {
259
tree: FlatTree<TData>;
@@ -232,6 +216,7 @@ export function useTree<TData>({
232216
index,
233217
});
234218
},
219+
overscan: 20,
235220
});
236221

237222
const scrollToNodeFn = useCallback(

apps/webapp/app/consts.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ export const RUN_CHUNK_EXECUTION_BUFFER = 350;
1010
export const MAX_RUN_CHUNK_EXECUTION_LIMIT = 120000; // 2 minutes
1111
export const VERCEL_RESPONSE_TIMEOUT_STATUS_CODES = [408, 504];
1212
export const MAX_BATCH_TRIGGER_ITEMS = 100;
13+
export const MAX_TASK_RUN_ATTEMPTS = 250;

apps/webapp/app/v3/marqs.server.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -611,14 +611,14 @@ export class MarQS {
611611
String(this.options.defaultConcurrency ?? 10)
612612
);
613613

614-
logger.debug("Dequeue message result", {
615-
result,
616-
});
617-
618614
if (!result) {
619615
return;
620616
}
621617

618+
logger.debug("Dequeue message result", {
619+
result,
620+
});
621+
622622
if (result.length !== 2) {
623623
return;
624624
}

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { CancelAttemptService } from "../services/cancelAttempt.server";
1818
import { CompleteAttemptService } from "../services/completeAttempt.server";
1919
import { attributesFromAuthenticatedEnv } from "../tracer.server";
2020
import { DevSubscriber, devPubSub } from "./devPubSub.server";
21+
import { CancelTaskRunService } from "../services/cancelTaskRun.server";
2122

2223
const tracer = trace.getTracer("devQueueConsumer");
2324

@@ -50,6 +51,7 @@ export class DevQueueConsumer {
5051
private _currentSpan: Span | undefined;
5152
private _endSpanInNextIteration = false;
5253
private _inProgressAttempts: Map<string, string> = new Map(); // Keys are task attempt friendly IDs, values are TaskRun ids/queue message ids
54+
private _inProgressRuns: Map<string, string> = new Map(); // Keys are task run friendly IDs, values are TaskRun internal ids/queue message ids
5355

5456
constructor(
5557
public env: AuthenticatedEnvironment,
@@ -123,7 +125,11 @@ export class DevQueueConsumer {
123125
logger.debug("Task run completed", { taskRunCompletion: completion, execution });
124126

125127
const service = new CompleteAttemptService();
126-
await service.call(completion, execution, this.env);
128+
const result = await service.call(completion, execution, this.env);
129+
130+
if (result === "COMPLETED") {
131+
this._inProgressRuns.delete(execution.run.id);
132+
}
127133
}
128134

129135
public async taskHeartbeat(workerId: string, id: string, seconds: number = 60) {
@@ -148,7 +154,7 @@ export class DevQueueConsumer {
148154
this._enabled = false;
149155

150156
// We need to cancel all the in progress task run attempts and ack the messages so they will stop processing
151-
await this.#cancelInProgressAttempts(reason);
157+
await this.#cancelInProgressRunsAndAttempts(reason);
152158

153159
// We need to unsubscribe from the background worker channels
154160
for (const [id, subscriber] of this._backgroundWorkerSubscriber) {
@@ -161,21 +167,44 @@ export class DevQueueConsumer {
161167
}
162168
}
163169

164-
async #cancelInProgressAttempts(reason: string) {
165-
const service = new CancelAttemptService();
170+
async #cancelInProgressRunsAndAttempts(reason: string) {
171+
const cancelAttemptService = new CancelAttemptService();
172+
const cancelTaskRunService = new CancelTaskRunService();
166173

167174
const cancelledAt = new Date();
168175

169176
const inProgressAttempts = new Map(this._inProgressAttempts);
177+
const inProgressRuns = new Map(this._inProgressRuns);
170178

171179
this._inProgressAttempts.clear();
180+
this._inProgressRuns.clear();
172181

173-
logger.debug("Cancelling in progress attempts", {
182+
const inProgressRunsWithNoInProgressAttempts: string[] = [];
183+
const inProgressAttemptRunIds = new Set(inProgressAttempts.values());
184+
185+
for (const [runId, messageId] of inProgressRuns) {
186+
if (!inProgressAttemptRunIds.has(messageId)) {
187+
inProgressRunsWithNoInProgressAttempts.push(messageId);
188+
}
189+
}
190+
191+
logger.debug("Cancelling in progress runs and attempts", {
174192
attempts: Array.from(inProgressAttempts.keys()),
193+
runs: Array.from(inProgressRuns.keys()),
175194
});
176195

177196
for (const [attemptId, messageId] of inProgressAttempts) {
178-
await this.#cancelInProgressAttempt(attemptId, messageId, service, cancelledAt, reason);
197+
await this.#cancelInProgressAttempt(
198+
attemptId,
199+
messageId,
200+
cancelAttemptService,
201+
cancelledAt,
202+
reason
203+
);
204+
}
205+
206+
for (const runId of inProgressRunsWithNoInProgressAttempts) {
207+
await this.#cancelInProgressRun(runId, cancelTaskRunService, cancelledAt, reason);
179208
}
180209
}
181210

@@ -199,6 +228,32 @@ export class DevQueueConsumer {
199228
}
200229
}
201230

231+
async #cancelInProgressRun(
232+
runId: string,
233+
service: CancelTaskRunService,
234+
cancelledAt: Date,
235+
reason: string
236+
) {
237+
logger.debug("Cancelling in progress run", { runId });
238+
239+
const taskRun = await prisma.taskRun.findUnique({
240+
where: { id: runId },
241+
});
242+
243+
if (!taskRun) {
244+
return;
245+
}
246+
247+
try {
248+
await service.call(taskRun, { reason, cancelAttempts: false, cancelledAt });
249+
} catch (e) {
250+
logger.error("Failed to cancel in progress run", {
251+
runId,
252+
error: e,
253+
});
254+
}
255+
}
256+
202257
#enable() {
203258
if (this._enabled) {
204259
return;
@@ -481,6 +536,7 @@ export class DevQueueConsumer {
481536
});
482537

483538
this._inProgressAttempts.set(taskRunAttempt.friendlyId, message.messageId);
539+
this._inProgressRuns.set(lockedTaskRun.friendlyId, message.messageId);
484540
} catch (e) {
485541
if (e instanceof Error) {
486542
this._currentSpan?.recordException(e);
@@ -499,6 +555,7 @@ export class DevQueueConsumer {
499555
data: {
500556
lockedAt: null,
501557
lockedById: null,
558+
status: "PENDING",
502559
},
503560
}),
504561
prisma.taskRunAttempt.delete({
@@ -508,6 +565,9 @@ export class DevQueueConsumer {
508565
}),
509566
]);
510567

568+
this._inProgressAttempts.delete(taskRunAttempt.friendlyId);
569+
this._inProgressRuns.delete(lockedTaskRun.friendlyId);
570+
511571
// Finally we need to nack the message so it can be retried
512572
await marqs?.nackMessage(message.messageId);
513573
} finally {

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,10 @@ export class SharedQueueConsumer {
300300
return;
301301
}
302302

303-
if (existingTaskRun.status !== "PENDING") {
303+
if (
304+
existingTaskRun.status !== "PENDING" &&
305+
existingTaskRun.status !== "RETRYING_AFTER_FAILURE"
306+
) {
304307
logger.debug("Task run is not pending, aborting", {
305308
queueMessage: message.data,
306309
messageId: message.messageId,

apps/webapp/app/v3/services/cancelTaskRun.server.ts

Lines changed: 61 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,21 @@ const CANCELLABLE_ATTEMPT_STATUSES: Array<TaskRunAttemptStatus> = [
2323
"PENDING",
2424
];
2525

26+
export type CancelTaskRunServiceOptions = {
27+
reason?: string;
28+
cancelAttempts?: boolean;
29+
cancelledAt?: Date;
30+
};
31+
2632
export class CancelTaskRunService extends BaseService {
27-
public async call(taskRun: TaskRun) {
33+
public async call(taskRun: TaskRun, options?: CancelTaskRunServiceOptions) {
34+
const opts = {
35+
reason: "Task run was cancelled by user",
36+
cancelAttempts: true,
37+
cancelledAt: new Date(),
38+
...options,
39+
};
40+
2841
// Make sure the task run is in a cancellable state
2942
if (!CANCELLABLE_STATUSES.includes(taskRun.status)) {
3043
return;
@@ -68,59 +81,61 @@ export class CancelTaskRunService extends BaseService {
6881

6982
await Promise.all(
7083
inProgressEvents.map((event) => {
71-
return eventRepository.cancelEvent(event, new Date(), "Task run was cancelled by user");
84+
return eventRepository.cancelEvent(event, opts.cancelledAt, opts.reason);
7285
})
7386
);
7487

7588
// Cancel any in progress attempts
76-
for (const attempt of cancelledTaskRun.attempts) {
77-
if (attempt.runtimeEnvironment.type === "DEVELOPMENT") {
78-
// Signal the task run attempt to stop
79-
await devPubSub.publish(
80-
`backgroundWorker:${attempt.backgroundWorkerId}:${attempt.id}`,
81-
"CANCEL_ATTEMPT",
82-
{
83-
attemptId: attempt.friendlyId,
84-
backgroundWorkerId: attempt.backgroundWorker.friendlyId,
85-
taskRunId: cancelledTaskRun.friendlyId,
86-
}
87-
);
88-
} else {
89-
switch (attempt.status) {
90-
case "EXECUTING": {
91-
// We need to send a cancel message to the coordinator
92-
socketIo.coordinatorNamespace.emit("REQUEST_ATTEMPT_CANCELLATION", {
93-
version: "v1",
94-
attemptId: attempt.id,
95-
});
89+
if (opts.cancelAttempts) {
90+
for (const attempt of cancelledTaskRun.attempts) {
91+
if (attempt.runtimeEnvironment.type === "DEVELOPMENT") {
92+
// Signal the task run attempt to stop
93+
await devPubSub.publish(
94+
`backgroundWorker:${attempt.backgroundWorkerId}:${attempt.id}`,
95+
"CANCEL_ATTEMPT",
96+
{
97+
attemptId: attempt.friendlyId,
98+
backgroundWorkerId: attempt.backgroundWorker.friendlyId,
99+
taskRunId: cancelledTaskRun.friendlyId,
100+
}
101+
);
102+
} else {
103+
switch (attempt.status) {
104+
case "EXECUTING": {
105+
// We need to send a cancel message to the coordinator
106+
socketIo.coordinatorNamespace.emit("REQUEST_ATTEMPT_CANCELLATION", {
107+
version: "v1",
108+
attemptId: attempt.id,
109+
});
96110

97-
break;
98-
}
99-
case "PENDING":
100-
case "PAUSED": {
101-
logger.debug("Cancelling pending or paused attempt", {
102-
attempt,
103-
});
111+
break;
112+
}
113+
case "PENDING":
114+
case "PAUSED": {
115+
logger.debug("Cancelling pending or paused attempt", {
116+
attempt,
117+
});
104118

105-
const service = new CancelAttemptService();
119+
const service = new CancelAttemptService();
106120

107-
await service.call(
108-
attempt.friendlyId,
109-
taskRun.id,
110-
new Date(),
111-
"Task run was cancelled by user"
112-
);
121+
await service.call(
122+
attempt.friendlyId,
123+
taskRun.id,
124+
new Date(),
125+
"Task run was cancelled by user"
126+
);
113127

114-
break;
115-
}
116-
case "CANCELED":
117-
case "COMPLETED":
118-
case "FAILED": {
119-
// Do nothing
120-
break;
121-
}
122-
default: {
123-
assertUnreachable(attempt.status);
128+
break;
129+
}
130+
case "CANCELED":
131+
case "COMPLETED":
132+
case "FAILED": {
133+
// Do nothing
134+
break;
135+
}
136+
default: {
137+
assertUnreachable(attempt.status);
138+
}
124139
}
125140
}
126141
}

0 commit comments

Comments
 (0)