Skip to content

Auto-resolve payload/output presigned urls when retrieving a run #1317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/polite-tables-exercise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Auto-resolve payload/output presigned urls when retrieving a run with runs.retrieve
25 changes: 25 additions & 0 deletions packages/core/src/v3/utils/ioSerialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,31 @@ export async function conditionallyImportPacket(
}
}

export async function resolvePresignedPacketUrl(
url: string,
tracer?: TriggerTracer
): Promise<any | undefined> {
try {
const response = await fetch(url);

if (!response.ok) {
return;
}

const data = await response.text();
const dataType = response.headers.get("content-type") ?? "application/json";

const packet = {
data,
dataType,
};

return await parsePacket(packet);
} catch (error) {
return;
}
}

async function importPacket(packet: IOPacket, span?: Span): Promise<IOPacket> {
if (!packet.data) {
return packet;
Expand Down
32 changes: 26 additions & 6 deletions packages/trigger-sdk/src/v3/runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type {
ListProjectRunsQueryParams,
ListRunsQueryParams,
RescheduleRunRequestBody,
TriggerTracer,
} from "@trigger.dev/core/v3";
import {
ApiPromise,
Expand All @@ -19,6 +20,7 @@ import {
} from "@trigger.dev/core/v3";
import { AnyTask, Prettify, RunHandle, Task, apiClientMissingError } from "./shared.js";
import { tracer } from "./tracer.js";
import { resolvePresignedPacketUrl } from "@trigger.dev/core/v3/utils/ioSerialization";

export type RetrieveRunResult<TRunId> = Prettify<
TRunId extends RunHandle<infer TOutput>
Expand Down Expand Up @@ -183,13 +185,31 @@ function retrieveRun<TRunId extends RunHandle<any> | AnyTask | string>(
requestOptions
);

if (typeof runId === "string") {
return apiClient.retrieveRun(runId, $requestOptions) as ApiPromise<RetrieveRunResult<TRunId>>;
} else {
return apiClient.retrieveRun(runId.id, $requestOptions) as ApiPromise<
RetrieveRunResult<TRunId>
>;
const $runId = typeof runId === "string" ? runId : runId.id;

return apiClient.retrieveRun($runId, $requestOptions).then((retrievedRun) => {
return resolvePayloadAndOutputUrls(retrievedRun);
}) as ApiPromise<RetrieveRunResult<TRunId>>;
}

async function resolvePayloadAndOutputUrls(run: RetrieveRunResult<any>) {
const resolvedRun = { ...run };

if (run.payloadPresignedUrl && run.outputPresignedUrl) {
const [payload, output] = await Promise.all([
resolvePresignedPacketUrl(run.payloadPresignedUrl, tracer),
resolvePresignedPacketUrl(run.outputPresignedUrl, tracer),
]);

resolvedRun.payload = payload;
resolvedRun.output = output;
} else if (run.payloadPresignedUrl) {
resolvedRun.payload = await resolvePresignedPacketUrl(run.payloadPresignedUrl, tracer);
} else if (run.outputPresignedUrl) {
resolvedRun.output = await resolvePresignedPacketUrl(run.outputPresignedUrl, tracer);
}

return resolvedRun;
}

function replayRun(
Expand Down
43 changes: 42 additions & 1 deletion references/v3-catalog/src/trigger/sdkUsage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,51 @@ export const sdkUsage = task({

export const sdkChild = task({
id: "sdk-child",
run: async (payload: any) => {},
run: async (payload: any) => {
return payload;
},
});

export const sdkSchedule = schedules.task({
id: "sdk-schedule",
run: async (payload: any) => {},
});

export const autoResolvePayloadAndOutput = task({
id: "auto-resolve-payload-and-output",
run: async (payload: any, { ctx }) => {
// Generate a large JSON payload (bigger than 128KB)
const childPayload = Array.from({ length: 10000 }, () => ({
key: "value",
date: new Date(),
}));

const handle = await tasks.trigger<typeof sdkChild>("sdk-child", childPayload);

const childRun = await runs.retrieve(handle.id);

if (childRun.payload) {
console.log("Child run payload exists", {
payloadPresignedUrl: childRun.payloadPresignedUrl,
});
} else {
console.log("Child run payload does not exist", {
payloadPresignedUrl: childRun.payloadPresignedUrl,
});
}

await runs.poll(handle.id);

const finishedRun = await runs.retrieve(handle.id);

if (finishedRun.output) {
console.log("Finished run output exists", {
outputPresignedUrl: finishedRun.outputPresignedUrl,
});
} else {
console.log("Finished run payload does not exist", {
outputPresignedUrl: finishedRun.outputPresignedUrl,
});
}
},
});
Loading