Skip to content

v3: fix string and non-standard outputs #983

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/many-ligers-pump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

Handle string and non-stringifiable outputs like functions
8 changes: 0 additions & 8 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,3 @@ export { apiRateLimiter } from "./services/apiRateLimit.server";
export { socketIo } from "./v3/handleSocketIo.server";
export { wss } from "./v3/handleWebsockets.server";
export { registryProxy } from "./v3/registryProxy.server";

process.on("uncaughtException", (error, origin) => {
logger.error("Uncaught Exception", { error, origin });
});

process.on("unhandledRejection", (reason, promise) => {
logger.error("Unhandled Rejection", { reason });
});
Original file line number Diff line number Diff line change
Expand Up @@ -222,19 +222,25 @@ function PacketDisplay({
dataType: string;
title: string;
}) {
if (dataType === "application/store") {
return (
<div className="flex flex-col">
<Paragraph variant="base/bright" className="w-full border-b border-grid-dimmed py-2.5">
{title}
</Paragraph>
<LinkButton LeadingIcon={CloudArrowDownIcon} to={data} variant="tertiary/medium" download>
Download
</LinkButton>
</div>
);
} else {
return <CodeBlock rowTitle={title} code={data} maxLines={20} />;
switch (dataType) {
case "application/store": {
return (
<div className="flex flex-col">
<Paragraph variant="base/bright" className="w-full border-b border-grid-dimmed py-2.5">
{title}
</Paragraph>
<LinkButton LeadingIcon={CloudArrowDownIcon} to={data} variant="tertiary/medium" download>
Download
</LinkButton>
</div>
);
}
case "text/plain": {
return <CodeBlock language="markdown" rowTitle={title} code={data} maxLines={20} />;
}
default: {
return <CodeBlock language="json" rowTitle={title} code={data} maxLines={20} />;
}
}
}

Expand Down
9 changes: 5 additions & 4 deletions apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
SpanMessagingEvent,
TaskEventStyle,
correctErrorStackTrace,
createPackageAttributesAsJson,
createPacketAttributesAsJson,
flattenAttributes,
isExceptionSpanEvent,
omit,
Expand Down Expand Up @@ -188,7 +188,7 @@ export class EventRepository {
const event = events[0];

const output = options?.attributes.output
? await createPackageAttributesAsJson(
? await createPacketAttributesAsJson(
options?.attributes.output,
options?.attributes.outputType ?? "application/json"
)
Expand All @@ -213,8 +213,9 @@ export class EventRepository {
style: event.style as Attributes,
output: output,
outputType:
options?.attributes.outputType === "application/store"
? "application/store"
options?.attributes.outputType === "application/store" ||
options?.attributes.outputType === "text/plain"
? options?.attributes.outputType
: "application/json",
payload: event.payload as Attributes,
payloadType: event.payloadType,
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/completeAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export class CompleteAttemptService extends BaseService {
attributes: {
isError: false,
output:
completion.outputType === "application/store"
completion.outputType === "application/store" || completion.outputType === "text/plain"
? completion.output
: completion.output
? (safeJsonParse(completion.output) as Attributes)
Expand Down
20 changes: 12 additions & 8 deletions packages/cli-v3/src/commands/dev.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import * as packageJson from "../../package.json";
import { CliApiClient } from "../apiClient";
import { CommonCommandOptions, commonOptions, wrapCommandAction } from "../cli/common.js";
import { bundleDependenciesPlugin, workerSetupImportConfigPlugin } from "../utilities/build";
import { chalkGrey, chalkPurple, chalkWorker } from "../utilities/cliOutput";
import { chalkError, chalkGrey, chalkPurple, chalkTask, chalkWorker } from "../utilities/cliOutput";
import { readConfig } from "../utilities/configFiles";
import { readJSONFile } from "../utilities/fileSystem";
import { printDevBanner, printStandloneInitialBanner } from "../utilities/initialBanner.js";
Expand Down Expand Up @@ -77,9 +77,13 @@ export async function devCommand(dir: string, options: DevCommandOptions) {

if (!authorization.ok) {
if (authorization.error === "fetch failed") {
logger.error("Fetch failed. Platform down?");
logger.log(
`${chalkError(
"X Error:"
)} Connecting to the server failed. Please check your internet connection or contact [email protected] for help.`
);
} else {
logger.error("You must login first. Use `trigger.dev login` to login.");
logger.log(`${chalkError("X Error:")} You must login first. Use the \`login\` CLI command.`);
}
process.exitCode = 1;
return;
Expand Down Expand Up @@ -701,13 +705,13 @@ function createDuplicateTaskIdOutputErrorMessage(
.map((id) => {
const tasks = taskResources.filter((task) => task.id === id);

return `id "${chalkPurple(id)}" was found in:\n${tasks
.map((task) => `${task.filePath} -> ${task.exportName}`)
.join("\n")}`;
return `\n\n${chalkTask(id)} was found in:${tasks
.map((task) => `\n${task.filePath} -> ${task.exportName}`)
.join("")}`;
})
.join("\n\n");
.join("");

return `Duplicate task ids detected:\n\n${duplicateTable}\n\n`;
return `Duplicate ${chalkTask("task id")} detected:${duplicateTable}`;
}

function gatherProcessEnv() {
Expand Down
9 changes: 4 additions & 5 deletions packages/cli-v3/src/workers/dev/backgroundWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,11 @@ import {
formatDurationMilliseconds,
workerToChildMessages,
} from "@trigger.dev/core/v3";
import chalk from "chalk";
import dotenv from "dotenv";
import { Evt } from "evt";
import { ChildProcess, fork } from "node:child_process";
import { dirname, resolve } from "node:path";
import terminalLink from "terminal-link";
import { safeDeleteFileSync } from "../../utilities/fileSystem.js";
import { installPackages } from "../../utilities/installPackages.js";
import { logger } from "../../utilities/logger.js";
import { UncaughtExceptionError } from "../common/errors.js";
import {
chalkError,
chalkGrey,
Expand All @@ -39,6 +34,10 @@ import {
chalkWorker,
prettyPrintDate,
} from "../../utilities/cliOutput.js";
import { safeDeleteFileSync } from "../../utilities/fileSystem.js";
import { installPackages } from "../../utilities/installPackages.js";
import { logger } from "../../utilities/logger.js";
import { UncaughtExceptionError } from "../common/errors.js";

export type CurrentWorkers = BackgroundWorkerCoordinator["currentWorkers"];
export class BackgroundWorkerCoordinator {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/v3/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export {
stringifyIO,
prettyPrintPacket,
createPacketAttributes,
createPackageAttributesAsJson,
createPacketAttributesAsJson,
conditionallyExportPacket,
conditionallyImportPacket,
packetRequiresOffloading,
Expand Down
52 changes: 37 additions & 15 deletions packages/core/src/v3/utils/ioSerialization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,14 @@ export async function stringifyIO(value: any): Promise<IOPacket> {
return { data: value, dataType: "text/plain" };
}

const { stringify } = await loadSuperJSON();
try {
const { stringify } = await loadSuperJSON();
const data = stringify(value);

return { data: stringify(value), dataType: "application/super+json" };
return { data, dataType: "application/super+json" };
} catch {
return { dataType: "application/json" };
}
}

export async function conditionallyExportPacket(
Expand Down Expand Up @@ -193,9 +198,9 @@ export async function createPacketAttributes(
packet: IOPacket,
dataKey: string,
dataTypeKey: string
): Promise<Attributes> {
): Promise<Attributes | undefined> {
if (!packet.data) {
return {};
return;
}

switch (packet.dataType) {
Expand All @@ -207,29 +212,38 @@ export async function createPacketAttributes(
case "application/super+json":
const { parse } = await loadSuperJSON();

const parsed = parse(packet.data) as any;
const jsonified = JSON.parse(JSON.stringify(parsed, safeReplacer));
if (typeof packet.data === "undefined" || packet.data === null) {
return;
}

try {
const parsed = parse(packet.data) as any;
const jsonified = JSON.parse(JSON.stringify(parsed, safeReplacer));

return {
...flattenAttributes(jsonified, dataKey),
[dataTypeKey]: "application/json",
};
} catch {
return;
}

return {
...flattenAttributes(jsonified, dataKey),
[dataTypeKey]: "application/json",
};
case "application/store":
return {
[dataKey]: packet.data,
[dataTypeKey]: packet.dataType,
};
case "text/plain":
return {
[SemanticInternalAttributes.OUTPUT]: packet.data,
[SemanticInternalAttributes.OUTPUT_TYPE]: packet.dataType,
[dataKey]: packet.data,
[dataTypeKey]: packet.dataType,
};
default:
return {};
return;
}
}

export async function createPackageAttributesAsJson(
export async function createPacketAttributesAsJson(
data: any,
dataType: string
): Promise<Attributes> {
Expand All @@ -250,7 +264,7 @@ export async function createPackageAttributesAsJson(
const { deserialize } = await loadSuperJSON();

const deserialized = deserialize(data) as any;
const jsonify = JSON.parse(JSON.stringify(deserialized, safeReplacer));
const jsonify = safeJsonParse(JSON.stringify(deserialized, safeReplacer));

return imposeAttributeLimits(flattenAttributes(jsonify, undefined));
case "application/store":
Expand Down Expand Up @@ -326,3 +340,11 @@ function getPacketExtension(outputType: string): string {
async function loadSuperJSON(): Promise<typeof import("superjson")> {
return await import("superjson");
}

function safeJsonParse(value: string): any {
try {
return JSON.parse(value);
} catch {
return;
}
}
14 changes: 8 additions & 6 deletions packages/core/src/v3/workers/taskExecutor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,16 @@ export class TaskExecutor {
this._tracer
);

span.setAttributes(
await createPacketAttributes(
finalOutput,
SemanticInternalAttributes.OUTPUT,
SemanticInternalAttributes.OUTPUT_TYPE
)
const attributes = await createPacketAttributes(
finalOutput,
SemanticInternalAttributes.OUTPUT,
SemanticInternalAttributes.OUTPUT_TYPE
);

if (attributes) {
span.setAttributes(attributes);
}

return {
ok: true,
id: execution.attempt.id,
Expand Down
33 changes: 30 additions & 3 deletions references/v3-catalog/src/trigger/superjson.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ export const superParentTask = task({
logger.log(`typeof result.error = ${typeof result.error}`);
logger.log(`typeof result.url = ${typeof result.url}`);

return {
result,
};
return "## super-parent-task completed";
},
});

Expand Down Expand Up @@ -120,6 +118,35 @@ export const superHugeOutputTask = task({
},
});

export const superStringTask = task({
id: "super-string-parent-task",
run: async () => {
const result = await superStringChildTask.triggerAndWait({
payload: {
foo: "bar",
},
});

return result;
},
});

export const superStringChildTask = task({
id: "super-string-child-task",
run: async () => {
return "## super-string-child-task completed";
},
});

export const superBadOutputTask = task({
id: "super-bad-output-task",
run: async () => {
// Returning something that cannot be serialized

return () => {};
},
});

function createLargeObject(size: number, length: number) {
return Array.from({ length }, (_, i) => [i.toString(), i.toString().padStart(size, "0")]).reduce(
(acc, [key, value]) => {
Expand Down