Skip to content

Commit bbfa96c

Browse files
committed
A few improvements
1 parent cce0fb2 commit bbfa96c

File tree

6 files changed

+30
-61
lines changed

6 files changed

+30
-61
lines changed

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -409,27 +409,11 @@ const zodIpc = new ZodIpcConnection({
409409

410410
_executionMeasurement = usage.start();
411411

412-
// This lives outside of the executor because this will eventually be moved to the controller level
413412
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);
414413

415-
timeoutController.signal.addEventListener("abort", () => {
416-
if (_isCancelled) {
417-
return;
418-
}
419-
420-
if (cancelController.signal.aborted) {
421-
return;
422-
}
414+
const signal = AbortSignal.any([cancelController.signal, timeoutController.signal]);
423415

424-
cancelController.abort(timeoutController.signal.reason);
425-
});
426-
427-
const { result } = await executor.execute(
428-
execution,
429-
metadata,
430-
traceContext,
431-
cancelController.signal
432-
);
416+
const { result } = await executor.execute(execution, metadata, traceContext, signal);
433417

434418
if (_isRunning && !_isCancelled) {
435419
const usageSample = usage.stop(_executionMeasurement);

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -404,27 +404,11 @@ const zodIpc = new ZodIpcConnection({
404404

405405
_executionMeasurement = usage.start();
406406

407-
// This lives outside of the executor because this will eventually be moved to the controller level
408407
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);
409408

410-
timeoutController.signal.addEventListener("abort", () => {
411-
if (_isCancelled) {
412-
return;
413-
}
414-
415-
if (cancelController.signal.aborted) {
416-
return;
417-
}
409+
const signal = AbortSignal.any([cancelController.signal, timeoutController.signal]);
418410

419-
cancelController.abort(timeoutController.signal.reason);
420-
});
421-
422-
const { result } = await executor.execute(
423-
execution,
424-
metadata,
425-
traceContext,
426-
cancelController.signal
427-
);
411+
const { result } = await executor.execute(execution, metadata, traceContext, signal);
428412

429413
if (_isRunning && !_isCancelled) {
430414
const usageSample = usage.stop(_executionMeasurement);

packages/core/src/v3/timeout/usageTimeoutManager.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ export class UsageTimeoutManager implements TimeoutManager {
2121
return this._abortController;
2222
}
2323

24+
if (this._intervalId) {
25+
clearInterval(this._intervalId);
26+
}
27+
2428
// Now we need to start an interval that will measure usage and abort the signal if the usage is too high
2529
this._intervalId = setInterval(() => {
2630
const sample = this.usageManager.sample();

packages/core/src/v3/types/tasks.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ export type CancelFnParams = Prettify<{
117117
ctx: Context;
118118
/** Abort signal that is aborted when a task run exceeds it's maxDuration or if the task run is cancelled. Can be used to automatically cancel downstream requests */
119119
signal: AbortSignal;
120+
runPromise: Promise<unknown>;
120121
init?: InitOutput;
121122
}>;
122123

references/d3-chat/src/trigger/chat.ts

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,7 @@ export const interruptibleChat = schemaTask({
231231

232232
// 👇 This is a global onCancel hook, but it's inside of the run function
233233
tasks.onCancel(async () => {
234+
// We have access to the chunks here
234235
logger.info("interruptible-chat: task cancelled with chunks", { chunks });
235236
});
236237

@@ -246,16 +247,6 @@ export const interruptibleChat = schemaTask({
246247
onChunk: ({ chunk }) => {
247248
chunks.push(chunk);
248249
},
249-
onError: ({ error }) => {
250-
if (error instanceof Error && error.name === "AbortError") {
251-
logger.info("interruptible-chat: streamText aborted", { error });
252-
} else {
253-
logger.error("interruptible-chat: streamText error", { error });
254-
}
255-
},
256-
onFinish: ({ finishReason }) => {
257-
logger.info("interruptible-chat: streamText finished", { finishReason });
258-
},
259250
});
260251

261252
const textParts = [];
@@ -267,16 +258,10 @@ export const interruptibleChat = schemaTask({
267258
return textParts.join("");
268259
} catch (error) {
269260
if (error instanceof Error && error.name === "AbortError") {
270-
logger.info("interruptible-chat: streamText aborted (inside catch)", { error });
261+
// streamText will throw an AbortError if the signal is aborted, so we can handle it here
271262
} else {
272-
logger.error("interruptible-chat: streamText error (inside catch)", { error });
263+
throw error;
273264
}
274265
}
275266
},
276-
onCancel: async ({ runPromise }) => {
277-
// 👇 output is typed as `string` because that's the return type of the run function
278-
const output = await runPromise;
279-
280-
logger.info("interruptible-chat: task cancelled with output", { output });
281-
},
282267
});

references/hello-world/src/trigger/example.ts

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -212,25 +212,31 @@ export const hooksTask = task({
212212
},
213213
});
214214

215-
export const cancelTask = task({
216-
id: "cancel",
217-
run: async (payload: { message: string }, { ctx, signal }) => {
218-
logger.info("Hello, world from the cancel task", { message: payload.message });
215+
export const cancelExampleTask = task({
216+
id: "cancel-example",
217+
// Signal will be aborted when the task is cancelled 👇
218+
run: async (payload: { timeoutInSeconds: number }, { signal }) => {
219+
logger.info("Hello, world from the cancel task", {
220+
timeoutInSeconds: payload.timeoutInSeconds,
221+
});
219222

223+
// This is a global hook that will be called if the task is cancelled
220224
tasks.onCancel(async () => {
221225
logger.info("global task onCancel hook but inside of the run function baby!");
222226
});
223227

224-
await logger.trace("10s timeout", async (span) => {
228+
await logger.trace("timeout", async (span) => {
225229
try {
226-
await setTimeout(10_000, undefined, { signal });
230+
// We pass the signal to setTimeout to abort the timeout if the task is cancelled
231+
await setTimeout(payload.timeoutInSeconds * 1000, undefined, { signal });
227232
} catch (error) {
233+
// If the timeout is aborted, this error will be thrown, we can handle it here
228234
logger.error("Timeout error", { error });
229235
}
230236
});
231237

232238
logger.info("Hello, world from the cancel task after the timeout", {
233-
message: payload.message,
239+
timeoutInSeconds: payload.timeoutInSeconds,
234240
});
235241

236242
return {
@@ -239,9 +245,14 @@ export const cancelTask = task({
239245
},
240246
onCancel: async ({ payload, runPromise }) => {
241247
logger.info("Hello, world from the onCancel hook", { payload });
248+
// You can await the runPromise to get the output of the task
242249
const output = await runPromise;
250+
243251
logger.info("Hello, world from the onCancel hook after the run", { payload, output });
252+
253+
// You can do work inside the onCancel hook, up to 30 seconds
244254
await setTimeout(10_000);
255+
245256
logger.info("Hello, world from the onCancel hook after the timeout", { payload });
246257
},
247258
});

0 commit comments

Comments
 (0)