Skip to content

Commit 2b44a1b

Browse files
committed
v4: implement onCancel callbacks
1 parent 15816e9 commit 2b44a1b

File tree

25 files changed

+543
-109
lines changed

25 files changed

+543
-109
lines changed

apps/webapp/app/components/runs/v3/RunIcon.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ export function RunIcon({ name, className, spanName }: TaskIconProps) {
9797
case "task-hook-onResume":
9898
case "task-hook-onComplete":
9999
case "task-hook-cleanup":
100+
case "task-hook-onCancel":
100101
return <FunctionIcon className={cn(className, "text-text-dimmed")} />;
101102
case "task-hook-onFailure":
102103
case "task-hook-catchError":

apps/webapp/app/v3/services/cancelTaskRun.server.ts

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -47,29 +47,6 @@ export class CancelTaskRunService extends BaseService {
4747
tx: this._prisma,
4848
});
4949

50-
const inProgressEvents = await eventRepository.queryIncompleteEvents(
51-
getTaskEventStoreTableForRun(taskRun),
52-
{
53-
runId: taskRun.friendlyId,
54-
},
55-
taskRun.createdAt,
56-
taskRun.completedAt ?? undefined
57-
);
58-
59-
logger.debug("Cancelling in-progress events", {
60-
inProgressEvents: inProgressEvents.map((event) => event.id),
61-
});
62-
63-
await Promise.all(
64-
inProgressEvents.map((event) => {
65-
return eventRepository.cancelEvent(
66-
event,
67-
options?.cancelledAt ?? new Date(),
68-
options?.reason ?? "Run cancelled"
69-
);
70-
})
71-
);
72-
7350
return {
7451
id: result.run.id,
7552
};

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import {
2323
TaskRunExecution,
2424
timeout,
2525
TriggerConfig,
26+
UsageMeasurement,
2627
waitUntil,
2728
WorkerManifest,
2829
WorkerToExecutorMessageCatalog,
@@ -232,7 +233,10 @@ async function bootstrap() {
232233

233234
let _execution: TaskRunExecution | undefined;
234235
let _isRunning = false;
236+
let _isCancelled = false;
235237
let _tracingSDK: TracingSDK | undefined;
238+
let _executionMeasurement: UsageMeasurement | undefined;
239+
const cancelController = new AbortController();
236240

237241
const zodIpc = new ZodIpcConnection({
238242
listenSchema: WorkerToExecutorMessageCatalog,
@@ -403,18 +407,33 @@ const zodIpc = new ZodIpcConnection({
403407
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
404408
);
405409

406-
const measurement = usage.start();
410+
_executionMeasurement = usage.start();
407411

408412
// This lives outside of the executor because this will eventually be moved to the controller level
409-
const signal = execution.run.maxDuration
410-
? timeout.abortAfterTimeout(execution.run.maxDuration)
411-
: undefined;
413+
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);
412414

413-
const { result } = await executor.execute(execution, metadata, traceContext, signal);
415+
timeoutController.signal.addEventListener("abort", () => {
416+
if (_isCancelled) {
417+
return;
418+
}
414419

415-
const usageSample = usage.stop(measurement);
420+
if (cancelController.signal.aborted) {
421+
return;
422+
}
423+
424+
cancelController.abort(timeoutController.signal.reason);
425+
});
426+
427+
const { result } = await executor.execute(
428+
execution,
429+
metadata,
430+
traceContext,
431+
cancelController.signal
432+
);
433+
434+
if (_isRunning && !_isCancelled) {
435+
const usageSample = usage.stop(_executionMeasurement);
416436

417-
if (_isRunning) {
418437
return sender.send("TASK_RUN_COMPLETED", {
419438
execution,
420439
result: {
@@ -458,7 +477,16 @@ const zodIpc = new ZodIpcConnection({
458477
WAIT_COMPLETED_NOTIFICATION: async () => {
459478
await managedWorkerRuntime.completeWaitpoints([]);
460479
},
461-
FLUSH: async ({ timeoutInMs }, sender) => {
480+
CANCEL: async ({ timeoutInMs }) => {
481+
_isCancelled = true;
482+
cancelController.abort("run cancelled");
483+
await callCancelHooks(timeoutInMs);
484+
if (_executionMeasurement) {
485+
usage.stop(_executionMeasurement);
486+
}
487+
await flushAll(timeoutInMs);
488+
},
489+
FLUSH: async ({ timeoutInMs }) => {
462490
await flushAll(timeoutInMs);
463491
},
464492
WAITPOINT_CREATED: async ({ wait, waitpoint }) => {
@@ -470,6 +498,18 @@ const zodIpc = new ZodIpcConnection({
470498
},
471499
});
472500

501+
async function callCancelHooks(timeoutInMs: number = 10_000) {
502+
const now = performance.now();
503+
504+
try {
505+
await Promise.race([lifecycleHooks.callOnCancelHookListeners(), setTimeout(timeoutInMs)]);
506+
} finally {
507+
const duration = performance.now() - now;
508+
509+
log(`Called cancel hooks in ${duration}ms`);
510+
}
511+
}
512+
473513
async function flushAll(timeoutInMs: number = 10_000) {
474514
const now = performance.now();
475515

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
TaskRunExecution,
2323
timeout,
2424
TriggerConfig,
25+
UsageMeasurement,
2526
waitUntil,
2627
WorkerManifest,
2728
WorkerToExecutorMessageCatalog,
@@ -229,7 +230,10 @@ async function bootstrap() {
229230

230231
let _execution: TaskRunExecution | undefined;
231232
let _isRunning = false;
233+
let _isCancelled = false;
232234
let _tracingSDK: TracingSDK | undefined;
235+
let _executionMeasurement: UsageMeasurement | undefined;
236+
const cancelController = new AbortController();
233237

234238
const zodIpc = new ZodIpcConnection({
235239
listenSchema: WorkerToExecutorMessageCatalog,
@@ -398,18 +402,33 @@ const zodIpc = new ZodIpcConnection({
398402
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
399403
);
400404

401-
const measurement = usage.start();
405+
_executionMeasurement = usage.start();
402406

403407
// This lives outside of the executor because this will eventually be moved to the controller level
404-
const signal = execution.run.maxDuration
405-
? timeout.abortAfterTimeout(execution.run.maxDuration)
406-
: undefined;
408+
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);
407409

408-
const { result } = await executor.execute(execution, metadata, traceContext, signal);
410+
timeoutController.signal.addEventListener("abort", () => {
411+
if (_isCancelled) {
412+
return;
413+
}
409414

410-
const usageSample = usage.stop(measurement);
415+
if (cancelController.signal.aborted) {
416+
return;
417+
}
418+
419+
cancelController.abort(timeoutController.signal.reason);
420+
});
421+
422+
const { result } = await executor.execute(
423+
execution,
424+
metadata,
425+
traceContext,
426+
cancelController.signal
427+
);
428+
429+
if (_isRunning && !_isCancelled) {
430+
const usageSample = usage.stop(_executionMeasurement);
411431

412-
if (_isRunning) {
413432
return sender.send("TASK_RUN_COMPLETED", {
414433
execution,
415434
result: {
@@ -454,6 +473,15 @@ const zodIpc = new ZodIpcConnection({
454473
FLUSH: async ({ timeoutInMs }, sender) => {
455474
await flushAll(timeoutInMs);
456475
},
476+
CANCEL: async ({ timeoutInMs }, sender) => {
477+
_isCancelled = true;
478+
cancelController.abort("run cancelled");
479+
await callCancelHooks(timeoutInMs);
480+
if (_executionMeasurement) {
481+
usage.stop(_executionMeasurement);
482+
}
483+
await flushAll(timeoutInMs);
484+
},
457485
WAITPOINT_CREATED: async ({ wait, waitpoint }) => {
458486
managedWorkerRuntime.associateWaitWithWaitpoint(wait.id, waitpoint.id);
459487
},
@@ -463,6 +491,18 @@ const zodIpc = new ZodIpcConnection({
463491
},
464492
});
465493

494+
async function callCancelHooks(timeoutInMs: number = 10_000) {
495+
const now = performance.now();
496+
497+
try {
498+
await Promise.race([lifecycleHooks.callOnCancelHookListeners(), setTimeout(timeoutInMs)]);
499+
} finally {
500+
const duration = performance.now() - now;
501+
502+
console.log(`Called cancel hooks in ${duration}ms`);
503+
}
504+
}
505+
466506
async function flushAll(timeoutInMs: number = 10_000) {
467507
const now = performance.now();
468508

packages/cli-v3/src/executions/taskRunProcess.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ export class TaskRunProcess {
109109
this._isBeingCancelled = true;
110110

111111
try {
112-
await this.#flush();
112+
await this.#cancel();
113113
} catch (err) {
114-
console.error("Error flushing task run process", { err });
114+
console.error("Error cancelling task run process", { err });
115115
}
116116

117117
await this.kill();
@@ -120,6 +120,10 @@ export class TaskRunProcess {
120120
async cleanup(kill = true) {
121121
this._isPreparedForNextRun = false;
122122

123+
if (this._isBeingCancelled) {
124+
return;
125+
}
126+
123127
try {
124128
await this.#flush();
125129
} catch (err) {
@@ -224,10 +228,17 @@ export class TaskRunProcess {
224228
await this._ipc?.sendWithAck("FLUSH", { timeoutInMs }, timeoutInMs + 1_000);
225229
}
226230

231+
async #cancel(timeoutInMs: number = 30_000) {
232+
logger.debug("sending cancel message to task run process", { pid: this.pid, timeoutInMs });
233+
234+
await this._ipc?.sendWithAck("CANCEL", { timeoutInMs }, timeoutInMs + 1_000);
235+
}
236+
227237
async execute(
228238
params: TaskRunProcessExecuteParams,
229239
isWarmStart?: boolean
230240
): Promise<TaskRunExecutionResult> {
241+
this._isBeingCancelled = false;
231242
this._isPreparedForNextRun = false;
232243
this._isPreparedForNextAttempt = false;
233244

packages/core/src/utils.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,25 @@ export async function tryCatch<T, E = Error>(
1616
return [error as E, null];
1717
}
1818
}
19+
20+
export type Deferred<T> = {
21+
promise: Promise<T>;
22+
resolve: (value: T) => void;
23+
reject: (reason?: any) => void;
24+
};
25+
26+
export function promiseWithResolvers<T>(): Deferred<T> {
27+
let resolve!: (value: T) => void;
28+
let reject!: (reason?: any) => void;
29+
30+
const promise = new Promise<T>((_resolve, _reject) => {
31+
resolve = _resolve;
32+
reject = _reject;
33+
});
34+
35+
return {
36+
promise,
37+
resolve,
38+
reject,
39+
};
40+
}

packages/core/src/v3/lifecycle-hooks-api.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,7 @@ export type {
3232
AnyOnCleanupHookFunction,
3333
TaskCleanupHookParams,
3434
TaskWait,
35+
TaskCancelHookParams,
36+
OnCancelHookFunction,
37+
AnyOnCancelHookFunction,
3538
} from "./lifecycleHooks/types.js";

packages/core/src/v3/lifecycleHooks/index.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
AnyOnStartHookFunction,
1414
AnyOnSuccessHookFunction,
1515
AnyOnWaitHookFunction,
16+
AnyOnCancelHookFunction,
1617
RegisteredHookFunction,
1718
RegisterHookFunctionParams,
1819
TaskWait,
@@ -260,6 +261,33 @@ export class LifecycleHooksAPI {
260261
this.#getManager().registerOnResumeHookListener(listener);
261262
}
262263

264+
public registerGlobalCancelHook(hook: RegisterHookFunctionParams<AnyOnCancelHookFunction>): void {
265+
this.#getManager().registerGlobalCancelHook(hook);
266+
}
267+
268+
public registerTaskCancelHook(
269+
taskId: string,
270+
hook: RegisterHookFunctionParams<AnyOnCancelHookFunction>
271+
): void {
272+
this.#getManager().registerTaskCancelHook(taskId, hook);
273+
}
274+
275+
public getTaskCancelHook(taskId: string): AnyOnCancelHookFunction | undefined {
276+
return this.#getManager().getTaskCancelHook(taskId);
277+
}
278+
279+
public getGlobalCancelHooks(): RegisteredHookFunction<AnyOnCancelHookFunction>[] {
280+
return this.#getManager().getGlobalCancelHooks();
281+
}
282+
283+
public callOnCancelHookListeners(): Promise<void> {
284+
return this.#getManager().callOnCancelHookListeners();
285+
}
286+
287+
public registerOnCancelHookListener(listener: () => Promise<void>): void {
288+
this.#getManager().registerOnCancelHookListener(listener);
289+
}
290+
263291
#getManager(): LifecycleHooksManager {
264292
return getGlobal(API_NAME) ?? NOOP_LIFECYCLE_HOOKS_MANAGER;
265293
}

0 commit comments

Comments
 (0)