Skip to content

Commit 01d2dcc

Browse files
committed
Cancel in progress runs when disconnecting the dev CLI (in between attempts)
1 parent f31b29c commit 01d2dcc

File tree

5 files changed

+140
-63
lines changed

5 files changed

+140
-63
lines changed

apps/webapp/app/v3/marqs.server.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -611,14 +611,14 @@ export class MarQS {
611611
String(this.options.defaultConcurrency ?? 10)
612612
);
613613

614-
logger.debug("Dequeue message result", {
615-
result,
616-
});
617-
618614
if (!result) {
619615
return;
620616
}
621617

618+
logger.debug("Dequeue message result", {
619+
result,
620+
});
621+
622622
if (result.length !== 2) {
623623
return;
624624
}

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { CancelAttemptService } from "../services/cancelAttempt.server";
1818
import { CompleteAttemptService } from "../services/completeAttempt.server";
1919
import { attributesFromAuthenticatedEnv } from "../tracer.server";
2020
import { DevSubscriber, devPubSub } from "./devPubSub.server";
21+
import { CancelTaskRunService } from "../services/cancelTaskRun.server";
2122

2223
const tracer = trace.getTracer("devQueueConsumer");
2324

@@ -50,6 +51,7 @@ export class DevQueueConsumer {
5051
private _currentSpan: Span | undefined;
5152
private _endSpanInNextIteration = false;
5253
private _inProgressAttempts: Map<string, string> = new Map(); // Keys are task attempt friendly IDs, values are TaskRun ids/queue message ids
54+
private _inProgressRuns: Map<string, string> = new Map(); // Keys are task run friendly IDs, values are TaskRun internal ids/queue message ids
5355

5456
constructor(
5557
public env: AuthenticatedEnvironment,
@@ -123,7 +125,11 @@ export class DevQueueConsumer {
123125
logger.debug("Task run completed", { taskRunCompletion: completion, execution });
124126

125127
const service = new CompleteAttemptService();
126-
await service.call(completion, execution, this.env);
128+
const result = await service.call(completion, execution, this.env);
129+
130+
if (result === "COMPLETED") {
131+
this._inProgressRuns.delete(execution.run.id);
132+
}
127133
}
128134

129135
public async taskHeartbeat(workerId: string, id: string, seconds: number = 60) {
@@ -148,7 +154,7 @@ export class DevQueueConsumer {
148154
this._enabled = false;
149155

150156
// We need to cancel all the in progress task run attempts and ack the messages so they will stop processing
151-
await this.#cancelInProgressAttempts(reason);
157+
await this.#cancelInProgressRunsAndAttempts(reason);
152158

153159
// We need to unsubscribe from the background worker channels
154160
for (const [id, subscriber] of this._backgroundWorkerSubscriber) {
@@ -161,21 +167,44 @@ export class DevQueueConsumer {
161167
}
162168
}
163169

164-
async #cancelInProgressAttempts(reason: string) {
165-
const service = new CancelAttemptService();
170+
async #cancelInProgressRunsAndAttempts(reason: string) {
171+
const cancelAttemptService = new CancelAttemptService();
172+
const cancelTaskRunService = new CancelTaskRunService();
166173

167174
const cancelledAt = new Date();
168175

169176
const inProgressAttempts = new Map(this._inProgressAttempts);
177+
const inProgressRuns = new Map(this._inProgressRuns);
170178

171179
this._inProgressAttempts.clear();
180+
this._inProgressRuns.clear();
172181

173-
logger.debug("Cancelling in progress attempts", {
182+
const inProgressRunsWithNoInProgressAttempts: string[] = [];
183+
const inProgressAttemptRunIds = new Set(inProgressAttempts.values());
184+
185+
for (const [runId, messageId] of inProgressRuns) {
186+
if (!inProgressAttemptRunIds.has(messageId)) {
187+
inProgressRunsWithNoInProgressAttempts.push(messageId);
188+
}
189+
}
190+
191+
logger.debug("Cancelling in progress runs and attempts", {
174192
attempts: Array.from(inProgressAttempts.keys()),
193+
runs: Array.from(inProgressRuns.keys()),
175194
});
176195

177196
for (const [attemptId, messageId] of inProgressAttempts) {
178-
await this.#cancelInProgressAttempt(attemptId, messageId, service, cancelledAt, reason);
197+
await this.#cancelInProgressAttempt(
198+
attemptId,
199+
messageId,
200+
cancelAttemptService,
201+
cancelledAt,
202+
reason
203+
);
204+
}
205+
206+
for (const runId of inProgressRunsWithNoInProgressAttempts) {
207+
await this.#cancelInProgressRun(runId, cancelTaskRunService, cancelledAt, reason);
179208
}
180209
}
181210

@@ -199,6 +228,32 @@ export class DevQueueConsumer {
199228
}
200229
}
201230

231+
async #cancelInProgressRun(
232+
runId: string,
233+
service: CancelTaskRunService,
234+
cancelledAt: Date,
235+
reason: string
236+
) {
237+
logger.debug("Cancelling in progress run", { runId });
238+
239+
const taskRun = await prisma.taskRun.findUnique({
240+
where: { id: runId },
241+
});
242+
243+
if (!taskRun) {
244+
return;
245+
}
246+
247+
try {
248+
await service.call(taskRun, { reason, cancelAttempts: false, cancelledAt });
249+
} catch (e) {
250+
logger.error("Failed to cancel in progress run", {
251+
runId,
252+
error: e,
253+
});
254+
}
255+
}
256+
202257
#enable() {
203258
if (this._enabled) {
204259
return;
@@ -481,6 +536,7 @@ export class DevQueueConsumer {
481536
});
482537

483538
this._inProgressAttempts.set(taskRunAttempt.friendlyId, message.messageId);
539+
this._inProgressRuns.set(lockedTaskRun.friendlyId, message.messageId);
484540
} catch (e) {
485541
if (e instanceof Error) {
486542
this._currentSpan?.recordException(e);
@@ -499,6 +555,7 @@ export class DevQueueConsumer {
499555
data: {
500556
lockedAt: null,
501557
lockedById: null,
558+
status: "PENDING",
502559
},
503560
}),
504561
prisma.taskRunAttempt.delete({
@@ -508,6 +565,9 @@ export class DevQueueConsumer {
508565
}),
509566
]);
510567

568+
this._inProgressAttempts.delete(taskRunAttempt.friendlyId);
569+
this._inProgressRuns.delete(lockedTaskRun.friendlyId);
570+
511571
// Finally we need to nack the message so it can be retried
512572
await marqs?.nackMessage(message.messageId);
513573
} finally {

apps/webapp/app/v3/services/cancelTaskRun.server.ts

Lines changed: 61 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,21 @@ const CANCELLABLE_ATTEMPT_STATUSES: Array<TaskRunAttemptStatus> = [
2323
"PENDING",
2424
];
2525

26+
export type CancelTaskRunServiceOptions = {
27+
reason?: string;
28+
cancelAttempts?: boolean;
29+
cancelledAt?: Date;
30+
};
31+
2632
export class CancelTaskRunService extends BaseService {
27-
public async call(taskRun: TaskRun) {
33+
public async call(taskRun: TaskRun, options?: CancelTaskRunServiceOptions) {
34+
const opts = {
35+
reason: "Task run was cancelled by user",
36+
cancelAttempts: true,
37+
cancelledAt: new Date(),
38+
...options,
39+
};
40+
2841
// Make sure the task run is in a cancellable state
2942
if (!CANCELLABLE_STATUSES.includes(taskRun.status)) {
3043
return;
@@ -68,59 +81,61 @@ export class CancelTaskRunService extends BaseService {
6881

6982
await Promise.all(
7083
inProgressEvents.map((event) => {
71-
return eventRepository.cancelEvent(event, new Date(), "Task run was cancelled by user");
84+
return eventRepository.cancelEvent(event, opts.cancelledAt, opts.reason);
7285
})
7386
);
7487

7588
// Cancel any in progress attempts
76-
for (const attempt of cancelledTaskRun.attempts) {
77-
if (attempt.runtimeEnvironment.type === "DEVELOPMENT") {
78-
// Signal the task run attempt to stop
79-
await devPubSub.publish(
80-
`backgroundWorker:${attempt.backgroundWorkerId}:${attempt.id}`,
81-
"CANCEL_ATTEMPT",
82-
{
83-
attemptId: attempt.friendlyId,
84-
backgroundWorkerId: attempt.backgroundWorker.friendlyId,
85-
taskRunId: cancelledTaskRun.friendlyId,
86-
}
87-
);
88-
} else {
89-
switch (attempt.status) {
90-
case "EXECUTING": {
91-
// We need to send a cancel message to the coordinator
92-
socketIo.coordinatorNamespace.emit("REQUEST_ATTEMPT_CANCELLATION", {
93-
version: "v1",
94-
attemptId: attempt.id,
95-
});
89+
if (opts.cancelAttempts) {
90+
for (const attempt of cancelledTaskRun.attempts) {
91+
if (attempt.runtimeEnvironment.type === "DEVELOPMENT") {
92+
// Signal the task run attempt to stop
93+
await devPubSub.publish(
94+
`backgroundWorker:${attempt.backgroundWorkerId}:${attempt.id}`,
95+
"CANCEL_ATTEMPT",
96+
{
97+
attemptId: attempt.friendlyId,
98+
backgroundWorkerId: attempt.backgroundWorker.friendlyId,
99+
taskRunId: cancelledTaskRun.friendlyId,
100+
}
101+
);
102+
} else {
103+
switch (attempt.status) {
104+
case "EXECUTING": {
105+
// We need to send a cancel message to the coordinator
106+
socketIo.coordinatorNamespace.emit("REQUEST_ATTEMPT_CANCELLATION", {
107+
version: "v1",
108+
attemptId: attempt.id,
109+
});
96110

97-
break;
98-
}
99-
case "PENDING":
100-
case "PAUSED": {
101-
logger.debug("Cancelling pending or paused attempt", {
102-
attempt,
103-
});
111+
break;
112+
}
113+
case "PENDING":
114+
case "PAUSED": {
115+
logger.debug("Cancelling pending or paused attempt", {
116+
attempt,
117+
});
104118

105-
const service = new CancelAttemptService();
119+
const service = new CancelAttemptService();
106120

107-
await service.call(
108-
attempt.friendlyId,
109-
taskRun.id,
110-
new Date(),
111-
"Task run was cancelled by user"
112-
);
121+
await service.call(
122+
attempt.friendlyId,
123+
taskRun.id,
124+
new Date(),
125+
"Task run was cancelled by user"
126+
);
113127

114-
break;
115-
}
116-
case "CANCELED":
117-
case "COMPLETED":
118-
case "FAILED": {
119-
// Do nothing
120-
break;
121-
}
122-
default: {
123-
assertUnreachable(attempt.status);
128+
break;
129+
}
130+
case "CANCELED":
131+
case "COMPLETED":
132+
case "FAILED": {
133+
// Do nothing
134+
break;
135+
}
136+
default: {
137+
assertUnreachable(attempt.status);
138+
}
124139
}
125140
}
126141
}

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export class CompleteAttemptService extends BaseService {
2525
completion: TaskRunExecutionResult,
2626
execution: TaskRunExecution,
2727
env?: AuthenticatedEnvironment
28-
) {
28+
): Promise<"COMPLETED" | "RETRIED"> {
2929
const taskRunAttempt = await findAttempt(this._prisma, completion.id);
3030

3131
if (!taskRunAttempt) {
@@ -41,7 +41,7 @@ export class CompleteAttemptService extends BaseService {
4141
},
4242
});
4343

44-
return "FAILED";
44+
return "COMPLETED";
4545
}
4646

4747
if (completion.ok) {
@@ -55,7 +55,7 @@ export class CompleteAttemptService extends BaseService {
5555
completion: TaskRunSuccessfulExecutionResult,
5656
taskRunAttempt: NonNullable<FoundAttempt>,
5757
env?: AuthenticatedEnvironment
58-
) {
58+
): Promise<"COMPLETED" | "RETRIED"> {
5959
await this._prisma.taskRunAttempt.update({
6060
where: { friendlyId: completion.id },
6161
data: {
@@ -90,7 +90,7 @@ export class CompleteAttemptService extends BaseService {
9090
await ResumeTaskRunDependenciesService.enqueue(taskRunAttempt.id, this._prisma);
9191
}
9292

93-
return "ACKNOWLEDGED";
93+
return "COMPLETED";
9494
}
9595

9696
async #completeAttemptFailed(
@@ -106,13 +106,15 @@ export class CompleteAttemptService extends BaseService {
106106
// We need to cancel the task run instead of fail it
107107
const cancelService = new CancelAttemptService();
108108

109-
return await cancelService.call(
109+
await cancelService.call(
110110
taskRunAttempt.friendlyId,
111111
taskRunAttempt.taskRunId,
112112
new Date(),
113113
"Cancelled by user",
114114
env
115115
);
116+
117+
return "COMPLETED";
116118
}
117119

118120
await this._prisma.taskRunAttempt.update({
@@ -204,7 +206,7 @@ export class CompleteAttemptService extends BaseService {
204206
await ResumeTaskRunDependenciesService.enqueue(taskRunAttempt.id, this._prisma);
205207
}
206208

207-
return "ACKNOWLEDGED";
209+
return "COMPLETED";
208210
}
209211
}
210212

references/v3-catalog/src/trigger/openai.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export const openaiTask = task({
2525

2626
return {
2727
error: new Error("Custom OpenAI API error"),
28-
retryDelayInMs: 1000,
28+
retryDelayInMs: 10000,
2929
};
3030
}
3131
},

0 commit comments

Comments
 (0)