Skip to content

v3: superjson dynamic import and deploy fixes #982

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/tall-bees-wave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@trigger.dev/sdk": patch
"trigger.dev": patch
"@trigger.dev/core": patch
---

Dynamically import superjson and fix some bundling issues
14 changes: 11 additions & 3 deletions apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import {
OperatingSystemContextProvider,
OperatingSystemPlatform,
} from "./components/primitives/OperatingSystemProvider";
import { env } from "./env.server";
import { getSharedSqsEventConsumer } from "./services/events/sqsEventConsumer";
import { singleton } from "./utils/singleton";
import { logger } from "./services/logger.server";

const ABORT_DELAY = 30000;

Expand Down Expand Up @@ -178,7 +178,15 @@ function logError(error: unknown, request?: Request) {

const sqsEventConsumer = singleton("sqsEventConsumer", getSharedSqsEventConsumer);

export { wss } from "./v3/handleWebsockets.server";
export { apiRateLimiter } from "./services/apiRateLimit.server";
export { socketIo } from "./v3/handleSocketIo.server";
export { wss } from "./v3/handleWebsockets.server";
export { registryProxy } from "./v3/registryProxy.server";
export { apiRateLimiter } from "./services/apiRateLimit.server";

process.on("uncaughtException", (error, origin) => {
logger.error("Uncaught Exception", { error, origin });
});

process.on("unhandledRejection", (reason, promise) => {
logger.error("Unhandled Rejection", { reason });
});
4 changes: 2 additions & 2 deletions apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ export class SpanPresenter {
span.outputType === "application/store"
? `/resources/packets/${span.environmentId}/${span.output}`
: typeof span.output !== "undefined" && span.output !== null
? prettyPrintPacket(span.output, span.outputType ?? undefined)
? await prettyPrintPacket(span.output, span.outputType ?? undefined)
: undefined;

const payload =
span.payloadType === "application/store"
? `/resources/packets/${span.environmentId}/${span.payload}`
: typeof span.payload !== "undefined" && span.payload !== null
? prettyPrintPacket(span.payload, span.payloadType ?? undefined)
? await prettyPrintPacket(span.payload, span.payloadType ?? undefined)
: undefined;

return {
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ export class EventRepository {
const event = events[0];

const output = options?.attributes.output
? createPackageAttributesAsJson(
? await createPackageAttributesAsJson(
options?.attributes.output,
options?.attributes.outputType ?? "application/json"
)
Expand Down
32 changes: 26 additions & 6 deletions apps/webapp/app/v3/registryProxy.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,18 @@ export class RegistryProxy {
proxyRes.pipe(response, { end: true });
});

request.on("close", () => {
logger.debug("Client closed the connection");
proxyReq.destroy();
cleanupTempFile();
});

request.on("abort", () => {
logger.debug("Client aborted the connection");
proxyReq.destroy(); // Abort the proxied request
cleanupTempFile(); // Clean up the temporary file if necessary
});

if (tempFilePath) {
const readStream = createReadStream(tempFilePath);

Expand Down Expand Up @@ -427,14 +439,22 @@ function initializeProxy() {
});
}

async function streamRequestBodyToTempFile(request: IncomingMessage): Promise<string> {
const tempDir = await mkdtemp(`${tmpdir()}/`);
const tempFilePath = `${tempDir}/requestBody.tmp`;
const writeStream = createWriteStream(tempFilePath);
async function streamRequestBodyToTempFile(request: IncomingMessage): Promise<string | undefined> {
try {
const tempDir = await mkdtemp(`${tmpdir()}/`);
const tempFilePath = `${tempDir}/requestBody.tmp`;
const writeStream = createWriteStream(tempFilePath);

await pipeline(request, writeStream);
await pipeline(request, writeStream);

return tempFilePath;
return tempFilePath;
} catch (error) {
logger.error("Failed to stream request body to temp file", {
error: error instanceof Error ? error.message : error,
});

return;
}
}

type DockerImageParts = {
Expand Down
25 changes: 22 additions & 3 deletions packages/cli-v3/src/commands/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,14 @@ async function compileProject(
TRIGGER_API_URL: `"${config.triggerUrl}"`,
__PROJECT_CONFIG__: JSON.stringify(config),
},
plugins: [bundleDependenciesPlugin(config), workerSetupImportConfigPlugin(configPath)],
plugins: [
bundleDependenciesPlugin(
"workerFacade",
config.dependenciesToBundle,
config.tsconfigPath
),
workerSetupImportConfigPlugin(configPath),
],
});

if (result.errors.length > 0) {
Expand Down Expand Up @@ -927,15 +934,22 @@ async function compileProject(
write: false,
minify: false,
sourcemap: false,
packages: "external", // https://esbuild.github.io/api/#packages
logLevel: "error",
platform: "node",
packages: "external",
format: "cjs", // This is needed to support opentelemetry instrumentation that uses module patching
target: ["node18", "es2020"],
outdir: "out",
define: {
__PROJECT_CONFIG__: JSON.stringify(config),
},
plugins: [
bundleDependenciesPlugin(
"entryPoint.ts",
config.dependenciesToBundle,
config.tsconfigPath
),
],
});

if (entryPointResult.errors.length > 0) {
Expand Down Expand Up @@ -1003,6 +1017,11 @@ async function compileProject(
// Save the entryPoint outputFile to /tmp/dir/index.js
await writeFile(join(tempDir, "index.js"), entryPointOutputFile.text);

logger.debug("Getting the imports for the worker and entryPoint builds", {
workerImports: metaOutput.imports,
entryPointImports: entryPointMetaOutput.imports,
});

// Get all the required dependencies from the metaOutputs and save them to /tmp/dir/package.json
const allImports = [...metaOutput.imports, ...entryPointMetaOutput.imports];

Expand Down Expand Up @@ -1246,7 +1265,7 @@ async function gatherRequiredDependencies(
const dependencies: Record<string, string> = {};

for (const file of imports) {
if (file.kind !== "require-call" || !file.external) {
if ((file.kind !== "require-call" && file.kind !== "dynamic-import") || !file.external) {
continue;
}

Expand Down
12 changes: 10 additions & 2 deletions packages/cli-v3/src/commands/dev.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,11 @@ function useDev({
__PROJECT_CONFIG__: JSON.stringify(config),
},
plugins: [
bundleDependenciesPlugin(config),
bundleDependenciesPlugin(
"workerFacade",
(config.dependenciesToBundle ?? []).concat([/^@trigger.dev/]),
config.tsconfigPath
),
workerSetupImportConfigPlugin(configPath),
{
name: "trigger.dev v3",
Expand Down Expand Up @@ -631,8 +635,12 @@ async function gatherRequiredDependencies(
) {
const dependencies: Record<string, string> = {};

logger.debug("Gathering required dependencies from imports", {
imports: outputMeta.imports,
});

for (const file of outputMeta.imports) {
if (file.kind !== "require-call" || !file.external) {
if ((file.kind !== "require-call" && file.kind !== "dynamic-import") || !file.external) {
continue;
}

Expand Down
42 changes: 10 additions & 32 deletions packages/cli-v3/src/utilities/build.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { ResolvedConfig } from "@trigger.dev/core/v3";
import type * as esbuild from "esbuild";
import type { Plugin } from "esbuild";
import { readFileSync } from "node:fs";
import { extname, isAbsolute } from "node:path";
import tsConfigPaths from "tsconfig-paths";
import { logger } from "./logger";
import { readFileSync } from "node:fs";

export function workerSetupImportConfigPlugin(configPath?: string): Plugin {
return {
Expand Down Expand Up @@ -37,8 +36,12 @@ export function workerSetupImportConfigPlugin(configPath?: string): Plugin {
};
}

export function bundleDependenciesPlugin(config: ResolvedConfig): Plugin {
const matchPath = config.tsconfigPath ? createMatchPath(config.tsconfigPath) : undefined;
export function bundleDependenciesPlugin(
buildIdentifier: string,
dependenciesToBundle?: Array<string | RegExp>,
tsconfigPath?: string
): Plugin {
const matchPath = tsconfigPath ? createMatchPath(tsconfigPath) : undefined;

function resolvePath(id: string) {
if (!matchPath) {
Expand All @@ -53,33 +56,8 @@ export function bundleDependenciesPlugin(config: ResolvedConfig): Plugin {
build.onResolve({ filter: /.*/ }, (args) => {
const resolvedPath = resolvePath(args.path);

logger.ignore(`Checking if ${args.path} should be bundled or external`, {
...args,
resolvedPath,
});

if (!isBareModuleId(resolvedPath)) {
logger.ignore(`Bundling ${args.path} because its not a bareModuleId`, {
...args,
});

return undefined; // let esbuild bundle it
}

if (args.path.startsWith("@trigger.dev/")) {
logger.ignore(`Bundling ${args.path} because its a trigger.dev package`, {
...args,
});

return undefined; // let esbuild bundle it
}

if (args.path === "superjson" || args.path === "copy-anything" || args.path === "is-what") {
logger.debug(`Bundling ${args.path} because its superjson/copy-anything/is-what`, {
...args,
});

return undefined; // let esbuild bundle it
return undefined; // let esbuild handle it
}

// Skip assets that are treated as files (.css, .svg, .png, etc.).
Expand All @@ -97,13 +75,13 @@ export function bundleDependenciesPlugin(config: ResolvedConfig): Plugin {
return undefined;
}

for (let pattern of config.dependenciesToBundle ?? []) {
for (let pattern of dependenciesToBundle ?? []) {
if (typeof pattern === "string" ? args.path === pattern : pattern.test(args.path)) {
return undefined; // let esbuild bundle it
}
}

logger.ignore(`Externalizing ${args.path}`, {
logger.ignore(`[${buildIdentifier}] Externalizing ${args.path}`, {
...args,
});

Expand Down
32 changes: 24 additions & 8 deletions packages/core/src/v3/utils/ioSerialization.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Attributes, Span } from "@opentelemetry/api";
import { deserialize, parse, stringify } from "superjson";
import { apiClientManager } from "../apiClient";
import { OFFLOAD_IO_PACKET_LENGTH_LIMIT, imposeAttributeLimits } from "../limits";
import { SemanticInternalAttributes } from "../semanticInternalAttributes";
Expand All @@ -11,7 +10,7 @@ export type IOPacket = {
dataType: string;
};

export function parsePacket(value: IOPacket): any {
export async function parsePacket(value: IOPacket): Promise<any> {
if (!value.data) {
return undefined;
}
Expand All @@ -20,6 +19,8 @@ export function parsePacket(value: IOPacket): any {
case "application/json":
return JSON.parse(value.data);
case "application/super+json":
const { parse } = await loadSuperJSON();

return parse(value.data);
case "text/plain":
return value.data;
Expand All @@ -32,7 +33,7 @@ export function parsePacket(value: IOPacket): any {
}
}

export function stringifyIO(value: any): IOPacket {
export async function stringifyIO(value: any): Promise<IOPacket> {
if (value === undefined) {
return { dataType: "application/json" };
}
Expand All @@ -41,6 +42,8 @@ export function stringifyIO(value: any): IOPacket {
return { data: value, dataType: "text/plain" };
}

const { stringify } = await loadSuperJSON();

return { data: stringify(value), dataType: "application/super+json" };
}

Expand Down Expand Up @@ -186,11 +189,11 @@ async function importPacket(packet: IOPacket, span?: Span): Promise<IOPacket> {
return packet;
}

export function createPacketAttributes(
export async function createPacketAttributes(
packet: IOPacket,
dataKey: string,
dataTypeKey: string
): Attributes {
): Promise<Attributes> {
if (!packet.data) {
return {};
}
Expand All @@ -202,6 +205,8 @@ export function createPacketAttributes(
[dataTypeKey]: packet.dataType,
};
case "application/super+json":
const { parse } = await loadSuperJSON();

const parsed = parse(packet.data) as any;
const jsonified = JSON.parse(JSON.stringify(parsed, safeReplacer));

Expand All @@ -224,7 +229,10 @@ export function createPacketAttributes(
}
}

export function createPackageAttributesAsJson(data: any, dataType: string): Attributes {
export async function createPackageAttributesAsJson(
data: any,
dataType: string
): Promise<Attributes> {
if (
typeof data === "string" ||
typeof data === "number" ||
Expand All @@ -239,6 +247,8 @@ export function createPackageAttributesAsJson(data: any, dataType: string): Attr
case "application/json":
return imposeAttributeLimits(flattenAttributes(data, undefined));
case "application/super+json":
const { deserialize } = await loadSuperJSON();

const deserialized = deserialize(data) as any;
const jsonify = JSON.parse(JSON.stringify(deserialized, safeReplacer));

Expand All @@ -250,13 +260,15 @@ export function createPackageAttributesAsJson(data: any, dataType: string): Attr
}
}

export function prettyPrintPacket(rawData: any, dataType?: string): string {
export async function prettyPrintPacket(rawData: any, dataType?: string): Promise<string> {
if (rawData === undefined) {
return "";
}

if (dataType === "application/super+json") {
return prettyPrintPacket(deserialize(rawData), "application/json");
const { deserialize } = await loadSuperJSON();

return await prettyPrintPacket(deserialize(rawData), "application/json");
}

if (dataType === "application/json") {
Expand Down Expand Up @@ -310,3 +322,7 @@ function getPacketExtension(outputType: string): string {
return "txt";
}
}

async function loadSuperJSON(): Promise<typeof import("superjson")> {
return await import("superjson");
}
Loading