Skip to content

Commit 643d0b8

Browse files
committed
adding agent-loops reference, adding useWaitToken react hook, adding a way to throw an error to output from the run, adding an icon option to logger.trace
1 parent 3fc142a commit 643d0b8

38 files changed

+1923
-406
lines changed

internal-packages/run-engine/src/engine/errors.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus {
4949
case "TASK_EXECUTION_ABORTED":
5050
case "TASK_EXECUTION_FAILED":
5151
case "TASK_PROCESS_SIGTERM":
52-
case "TASK_DEQUEUED_INVALID_RETRY_CONFIG":
53-
case "TASK_DEQUEUED_NO_RETRY_CONFIG":
5452
case "TASK_DID_CONCURRENT_WAIT":
5553
return "SYSTEM_FAILURE";
5654
default:

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -307,41 +307,9 @@ export class DequeueSystem {
307307
task: result.task.id,
308308
rawRetryConfig: retryConfig,
309309
});
310-
311-
await this.runAttemptSystem.systemFailure({
312-
runId,
313-
error: {
314-
type: "INTERNAL_ERROR",
315-
code: "TASK_DEQUEUED_INVALID_RETRY_CONFIG",
316-
message: `Invalid retry config: ${retryConfig}`,
317-
},
318-
tx: prisma,
319-
});
320-
321-
return null;
322-
}
323-
324-
if (!parsedConfig.data) {
325-
this.$.logger.error("RunEngine.dequeueFromMasterQueue(): No retry config", {
326-
runId,
327-
task: result.task.id,
328-
rawRetryConfig: retryConfig,
329-
});
330-
331-
await this.runAttemptSystem.systemFailure({
332-
runId,
333-
error: {
334-
type: "INTERNAL_ERROR",
335-
code: "TASK_DEQUEUED_NO_RETRY_CONFIG",
336-
message: `No retry config found`,
337-
},
338-
tx: prisma,
339-
});
340-
341-
return null;
342310
}
343311

344-
maxAttempts = parsedConfig.data.maxAttempts;
312+
maxAttempts = parsedConfig.data?.maxAttempts;
345313
}
346314
//update the run
347315
const lockedTaskRun = await prisma.taskRun.update({

packages/core/src/v3/apiClient/core.ts

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ export const defaultRetryOptions = {
2727
randomize: false,
2828
} satisfies RetryOptions;
2929

30-
export type ZodFetchOptions<TInput = unknown, TOutput = TInput> = {
30+
export type ZodFetchOptions<TData = any> = {
3131
retry?: RetryOptions;
3232
tracer?: TriggerTracer;
3333
name?: string;
3434
attributes?: Attributes;
3535
icon?: string;
36-
onResponseBody?: (body: TInput, span: Span) => void;
37-
prepareData?: (data: TInput, response: Response) => Promise<TOutput> | TOutput;
36+
onResponseBody?: (body: TData, span: Span) => void;
37+
prepareData?: (data: TData, response: Response) => Promise<TData> | TData;
3838
};
3939

4040
export type AnyZodFetchOptions = ZodFetchOptions<any>;
@@ -67,15 +67,12 @@ interface FetchOffsetLimitPageParams extends OffsetLimitPageParams {
6767
query?: URLSearchParams;
6868
}
6969

70-
export function zodfetch<
71-
TResponseBodySchema extends z.ZodTypeAny,
72-
TOutput = z.output<TResponseBodySchema>,
73-
>(
70+
export function zodfetch<TResponseBodySchema extends z.ZodTypeAny>(
7471
schema: TResponseBodySchema,
7572
url: string,
7673
requestInit?: RequestInit,
77-
options?: ZodFetchOptions<z.output<TResponseBodySchema>, TOutput>
78-
): ApiPromise<TOutput> {
74+
options?: ZodFetchOptions<z.output<TResponseBodySchema>>
75+
): ApiPromise<z.output<TResponseBodySchema>> {
7976
return new ApiPromise(_doZodFetch(schema, url, requestInit, options));
8077
}
8178

@@ -113,14 +110,7 @@ export function zodfetchCursorPage<TItemSchema extends z.ZodTypeAny>(
113110

114111
const fetchResult = _doZodFetch(cursorPageSchema, $url.href, requestInit, options);
115112

116-
return new CursorPagePromise(
117-
fetchResult as Promise<ZodFetchResult<CursorPageResponse<z.output<TItemSchema>>>>,
118-
schema,
119-
url,
120-
params,
121-
requestInit,
122-
options
123-
);
113+
return new CursorPagePromise(fetchResult, schema, url, params, requestInit, options);
124114
}
125115

126116
export function zodfetchOffsetLimitPage<TItemSchema extends z.ZodTypeAny>(
@@ -201,15 +191,12 @@ async function traceZodFetch<T>(
201191
);
202192
}
203193

204-
async function _doZodFetch<
205-
TResponseBodySchema extends z.ZodTypeAny,
206-
TOutput = z.output<TResponseBodySchema>,
207-
>(
194+
async function _doZodFetch<TResponseBodySchema extends z.ZodTypeAny>(
208195
schema: TResponseBodySchema,
209196
url: string,
210197
requestInit?: PromiseOrValue<RequestInit>,
211-
options?: ZodFetchOptions<z.output<TResponseBodySchema>, TOutput>
212-
): Promise<ZodFetchResult<TOutput>> {
198+
options?: ZodFetchOptions<z.output<TResponseBodySchema>>
199+
): Promise<ZodFetchResult<z.output<TResponseBodySchema>>> {
213200
let $requestInit = await requestInit;
214201

215202
return traceZodFetch({ url, requestInit: $requestInit, options }, async (span) => {
@@ -727,7 +714,7 @@ export async function wrapZodFetch<T extends z.ZodTypeAny>(
727714
schema: T,
728715
url: string,
729716
requestInit?: RequestInit,
730-
options?: ZodFetchOptions<z.output<T>, z.infer<T>>
717+
options?: ZodFetchOptions<z.output<T>>
731718
): Promise<ApiResult<z.infer<T>>> {
732719
try {
733720
const response = await zodfetch(schema, url, requestInit, {

packages/core/src/v3/apiClient/index.ts

Lines changed: 58 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ import {
7676
UpdateEnvironmentVariableParams,
7777
} from "./types.js";
7878
import { AsyncIterableStream } from "../streams/asyncIterableStream.js";
79+
import { Prettify } from "../types/utils.js";
80+
81+
export type CreateWaitpointTokenResponse = Prettify<
82+
CreateWaitpointTokenResponseBody & {
83+
publicAccessToken: string;
84+
}
85+
>;
7986

8087
export type {
8188
CreateEnvironmentVariableParams,
@@ -214,37 +221,36 @@ export class ApiClient {
214221
headers: this.#getHeaders(clientOptions?.spanParentAsLink ?? false),
215222
body: JSON.stringify(body),
216223
},
217-
{
218-
...mergeRequestOptions(this.defaultRequestOptions, requestOptions),
219-
prepareData: async (data, response) => {
220-
const jwtHeader = response.headers.get("x-trigger-jwt");
221-
222-
if (typeof jwtHeader === "string") {
223-
return {
224-
...data,
225-
publicAccessToken: jwtHeader,
226-
};
227-
}
228-
229-
const claimsHeader = response.headers.get("x-trigger-jwt-claims");
230-
const claims = claimsHeader ? JSON.parse(claimsHeader) : undefined;
231-
232-
const jwt = await generateJWT({
233-
secretKey: this.accessToken,
234-
payload: {
235-
...claims,
236-
scopes: [`read:runs:${data.id}`],
237-
},
238-
expirationTime: requestOptions?.publicAccessToken?.expirationTime ?? "1h",
239-
});
224+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
225+
)
226+
.withResponse()
227+
.then(async ({ data, response }) => {
228+
const jwtHeader = response.headers.get("x-trigger-jwt");
240229

230+
if (typeof jwtHeader === "string") {
241231
return {
242232
...data,
243-
publicAccessToken: jwt,
233+
publicAccessToken: jwtHeader,
244234
};
245-
},
246-
}
247-
);
235+
}
236+
237+
const claimsHeader = response.headers.get("x-trigger-jwt-claims");
238+
const claims = claimsHeader ? JSON.parse(claimsHeader) : undefined;
239+
240+
const jwt = await generateJWT({
241+
secretKey: this.accessToken,
242+
payload: {
243+
...claims,
244+
scopes: [`read:runs:${data.id}`],
245+
},
246+
expirationTime: requestOptions?.publicAccessToken?.expirationTime ?? "1h",
247+
});
248+
249+
return {
250+
...data,
251+
publicAccessToken: jwt,
252+
};
253+
});
248254
}
249255

250256
batchTriggerV3(
@@ -262,28 +268,27 @@ export class ApiClient {
262268
}),
263269
body: JSON.stringify(body),
264270
},
265-
{
266-
...mergeRequestOptions(this.defaultRequestOptions, requestOptions),
267-
prepareData: async (data, response) => {
268-
const claimsHeader = response.headers.get("x-trigger-jwt-claims");
269-
const claims = claimsHeader ? JSON.parse(claimsHeader) : undefined;
270-
271-
const jwt = await generateJWT({
272-
secretKey: this.accessToken,
273-
payload: {
274-
...claims,
275-
scopes: [`read:batch:${data.id}`],
276-
},
277-
expirationTime: requestOptions?.publicAccessToken?.expirationTime ?? "1h",
278-
});
279-
280-
return {
281-
...data,
282-
publicAccessToken: jwt,
283-
};
284-
},
285-
}
286-
);
271+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
272+
)
273+
.withResponse()
274+
.then(async ({ data, response }) => {
275+
const claimsHeader = response.headers.get("x-trigger-jwt-claims");
276+
const claims = claimsHeader ? JSON.parse(claimsHeader) : undefined;
277+
278+
const jwt = await generateJWT({
279+
secretKey: this.accessToken,
280+
payload: {
281+
...claims,
282+
scopes: [`read:batch:${data.id}`],
283+
},
284+
expirationTime: requestOptions?.publicAccessToken?.expirationTime ?? "1h",
285+
});
286+
287+
return {
288+
...data,
289+
publicAccessToken: jwt,
290+
};
291+
});
287292
}
288293

289294
createUploadPayloadUrl(filename: string, requestOptions?: ZodFetchOptions) {
@@ -701,7 +706,7 @@ export class ApiClient {
701706
};
702707
},
703708
}
704-
);
709+
) as ApiPromise<CreateWaitpointTokenResponse>;
705710
}
706711

707712
listWaitpointTokens(
@@ -752,7 +757,9 @@ export class ApiClient {
752757
headers: this.#getHeaders(false),
753758
body: JSON.stringify(options),
754759
},
755-
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
760+
{
761+
...mergeRequestOptions(this.defaultRequestOptions, requestOptions),
762+
}
756763
);
757764
}
758765

packages/core/src/v3/errors.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,20 @@ export class TaskPayloadParsedError extends Error {
140140
}
141141
}
142142

143+
export class CompleteTaskWithOutput extends Error {
144+
public readonly output: unknown;
145+
146+
constructor(output?: unknown) {
147+
super("Complete task with output");
148+
this.name = "CompleteTaskWithOutput";
149+
this.output = output;
150+
}
151+
}
152+
153+
export function isCompleteTaskWithOutput(error: unknown): error is CompleteTaskWithOutput {
154+
return error instanceof Error && error.name === "CompleteTaskWithOutput";
155+
}
156+
143157
export function parseError(error: unknown): TaskRunError {
144158
if (isInternalError(error)) {
145159
return {
@@ -291,8 +305,6 @@ export function shouldRetryError(error: TaskRunError): boolean {
291305
// run engine errors
292306
case "TASK_DEQUEUED_INVALID_STATE":
293307
case "TASK_DEQUEUED_QUEUE_NOT_FOUND":
294-
case "TASK_DEQUEUED_INVALID_RETRY_CONFIG":
295-
case "TASK_DEQUEUED_NO_RETRY_CONFIG":
296308
case "TASK_HAS_N0_EXECUTION_SNAPSHOT":
297309
case "TASK_RUN_DEQUEUED_MAX_RETRIES":
298310
return false;

packages/core/src/v3/logger/index.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import { NoopTaskLogger, TaskLogger } from "./taskLogger.js";
1+
import { NoopTaskLogger, TaskLogger, TraceOptions } from "./taskLogger.js";
22
import { getGlobal, registerGlobal, unregisterGlobal } from "../utils/globals.js";
3-
import { Span, SpanOptions } from "@opentelemetry/api";
3+
import { Span } from "@opentelemetry/api";
44

55
const API_NAME = "logger";
66

@@ -47,11 +47,15 @@ export class LoggerAPI implements TaskLogger {
4747
this.#getTaskLogger().error(message, metadata);
4848
}
4949

50-
public trace<T>(name: string, fn: (span: Span) => Promise<T>, options?: SpanOptions): Promise<T> {
50+
public trace<T>(
51+
name: string,
52+
fn: (span: Span) => Promise<T>,
53+
options?: TraceOptions
54+
): Promise<T> {
5155
return this.#getTaskLogger().trace(name, fn, options);
5256
}
5357

54-
public startSpan(name: string, options?: SpanOptions): Span {
58+
public startSpan(name: string, options?: TraceOptions): Span {
5559
return this.#getTaskLogger().startSpan(name, options);
5660
}
5761

0 commit comments

Comments
 (0)