Skip to content

Various perf improvements to prevent event loop lag #1186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/thick-carrots-sneeze.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/core": patch
"@trigger.dev/sdk": patch
---

v3: Include presigned urls for downloading large payloads and outputs when using runs.retrieve
3 changes: 3 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ const EnvironmentSchema = z.object({
USAGE_OPEN_METER_BASE_URL: z.string().optional(),
EVENT_LOOP_MONITOR_ENABLED: z.string().default("1"),
MAXIMUM_LIVE_RELOADING_EVENTS: z.coerce.number().int().default(1000),
MAXIMUM_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB
TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
4 changes: 0 additions & 4 deletions apps/webapp/app/presenters/RunListPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { z } from "zod";
import {
Direction,
FilterableEnvironment,
FilterableStatus,
filterableStatuses,
} from "~/components/runs/RunStatuses";
import { PrismaClient, prisma } from "~/db.server";
import { getUsername } from "~/utils/username";
import { BasePresenter } from "./v3/basePresenter.server";

Expand All @@ -29,8 +27,6 @@ const DEFAULT_PAGE_SIZE = 20;
export type RunList = Awaited<ReturnType<RunListPresenter["call"]>>;

export class RunListPresenter extends BasePresenter {


public async call({
userId,
eventId,
Expand Down
98 changes: 77 additions & 21 deletions apps/webapp/app/presenters/ScheduledTriggersPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,52 @@
import { User } from "@trigger.dev/database";
import { ScheduleMetadataSchema } from "@trigger.dev/core";
import { PrismaClient, prisma } from "~/db.server";
import { User } from "@trigger.dev/database";
import { Organization } from "~/models/organization.server";
import { Project } from "~/models/project.server";
import { calculateNextScheduledEvent } from "~/services/schedules/nextScheduledEvent.server";
import { BasePresenter } from "./v3/basePresenter.server";

export class ScheduledTriggersPresenter {
#prismaClient: PrismaClient;

constructor(prismaClient: PrismaClient = prisma) {
this.#prismaClient = prismaClient;
}
const DEFAULT_PAGE_SIZE = 20;

export class ScheduledTriggersPresenter extends BasePresenter {
public async call({
userId,
projectSlug,
organizationSlug,
direction = "forward",
pageSize = DEFAULT_PAGE_SIZE,
cursor,
}: {
userId: User["id"];
projectSlug: Project["slug"];
organizationSlug: Organization["slug"];
direction?: "forward" | "backward";
pageSize?: number;
cursor?: string;
}) {
const scheduled = await this.#prismaClient.scheduleSource.findMany({
const organization = await this._replica.organization.findFirstOrThrow({
select: {
id: true,
},
where: {
slug: organizationSlug,
members: { some: { userId } },
},
});

// Find the project scoped to the organization
const project = await this._replica.project.findFirstOrThrow({
select: {
id: true,
},
where: {
slug: projectSlug,
organizationId: organization.id,
},
});

const directionMultiplier = direction === "forward" ? 1 : -1;

const scheduled = await this._replica.scheduleSource.findMany({
select: {
id: true,
key: true,
Expand Down Expand Up @@ -50,23 +75,50 @@ export class ScheduledTriggersPresenter {
},
},
],
organization: {
slug: organizationSlug,
members: {
some: {
userId,
},
},
},
project: {
slug: projectSlug,
},
projectId: project.id,
},
},
orderBy: [{ id: "desc" }],
//take an extra record to tell if there are more
take: directionMultiplier * (pageSize + 1),
//skip the cursor if there is one
skip: cursor ? 1 : 0,
cursor: cursor
? {
id: cursor,
}
: undefined,
});

const hasMore = scheduled.length > pageSize;

//get cursors for next and previous pages
let next: string | undefined;
let previous: string | undefined;
switch (direction) {
case "forward":
previous = cursor ? scheduled.at(0)?.id : undefined;
if (hasMore) {
next = scheduled[pageSize - 1]?.id;
}
break;
case "backward":
if (hasMore) {
previous = scheduled[1]?.id;
next = scheduled[pageSize]?.id;
} else {
next = scheduled[pageSize - 1]?.id;
}
break;
}

const scheduledToReturn =
direction === "backward" && hasMore
? scheduled.slice(1, pageSize + 1)
: scheduled.slice(0, pageSize);

return {
scheduled: scheduled.map((s) => {
scheduled: scheduledToReturn.map((s) => {
const schedule = ScheduleMetadataSchema.parse(s.schedule);
const nextEventTimestamp = s.active
? calculateNextScheduledEvent(schedule, s.lastEventTimestamp)
Expand All @@ -78,6 +130,10 @@ export class ScheduledTriggersPresenter {
nextEventTimestamp,
};
}),
pagination: {
next,
previous,
},
};
}
}
33 changes: 31 additions & 2 deletions apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import { Prisma, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database";
import assertNever from "assert-never";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { generatePresignedUrl } from "~/v3/r2.server";
import { BasePresenter } from "./basePresenter.server";

export class ApiRetrieveRunPresenter extends BasePresenter {
Expand Down Expand Up @@ -44,15 +45,29 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
}

let $payload: any;
let $payloadPresignedUrl: string | undefined;
let $output: any;
let $outputPresignedUrl: string | undefined;

if (showSecretDetails) {
const payloadPacket = await conditionallyImportPacket({
data: taskRun.payload,
dataType: taskRun.payloadType,
});

$payload = await parsePacket(payloadPacket);
if (
payloadPacket.dataType === "application/store" &&
typeof payloadPacket.data === "string"
) {
$payloadPresignedUrl = await generatePresignedUrl(
env.project.externalRef,
env.slug,
payloadPacket.data,
"GET"
);
} else {
$payload = await parsePacket(payloadPacket);
}

if (taskRun.status === "COMPLETED_SUCCESSFULLY") {
const completedAttempt = taskRun.attempts.find(
Expand All @@ -65,7 +80,19 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
dataType: completedAttempt.outputType,
});

$output = await parsePacket(outputPacket);
if (
outputPacket.dataType === "application/store" &&
typeof outputPacket.data === "string"
) {
$outputPresignedUrl = await generatePresignedUrl(
env.project.externalRef,
env.slug,
outputPacket.data,
"GET"
);
} else {
$output = await parsePacket(outputPacket);
}
}
}
}
Expand All @@ -85,7 +112,9 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
? taskRun.updatedAt
: undefined,
payload: $payload,
payloadPresignedUrl: $payloadPresignedUrl,
output: $output,
outputPresignedUrl: $outputPresignedUrl,
isTest: taskRun.isTest,
schedule: taskRun.schedule
? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { NoSymbolIcon } from "@heroicons/react/20/solid";
import { CheckCircleIcon, XCircleIcon } from "@heroicons/react/24/solid";
import type { LoaderFunctionArgs } from "@remix-run/server-runtime";
import { typedjson, useTypedLoaderData } from "remix-typedjson";
import { z } from "zod";
import { ListPagination } from "~/components/ListPagination";
import { EnvironmentLabel } from "~/components/environments/EnvironmentLabel";
import { DateTime } from "~/components/primitives/DateTime";
import { LabelValueStack } from "~/components/primitives/LabelValueStack";
Expand All @@ -17,30 +19,38 @@ import {
TableRow,
} from "~/components/primitives/Table";
import { TextLink } from "~/components/primitives/TextLink";
import { useOrganization } from "~/hooks/useOrganizations";
import { useProject } from "~/hooks/useProject";
import { DirectionSchema } from "~/components/runs/RunStatuses";
import { ScheduledTriggersPresenter } from "~/presenters/ScheduledTriggersPresenter.server";
import { requireUserId } from "~/services/session.server";
import { ProjectParamSchema, docsPath } from "~/utils/pathBuilder";

const SearchSchema = z.object({
cursor: z.string().optional(),
direction: DirectionSchema.optional(),
});

export const loader = async ({ request, params }: LoaderFunctionArgs) => {
const userId = await requireUserId(request);
const { organizationSlug, projectParam } = ProjectParamSchema.parse(params);

const url = new URL(request.url);
const s = Object.fromEntries(url.searchParams.entries());
const searchParams = SearchSchema.parse(s);

const presenter = new ScheduledTriggersPresenter();
const data = await presenter.call({
userId,
organizationSlug,
projectSlug: projectParam,
direction: searchParams.direction,
cursor: searchParams.cursor,
});

return typedjson(data);
};

export default function Integrations() {
const { scheduled } = useTypedLoaderData<typeof loader>();
const organization = useOrganization();
const project = useProject();
export default function Route() {
const { scheduled, pagination } = useTypedLoaderData<typeof loader>();

return (
<>
Expand All @@ -49,6 +59,10 @@ export default function Integrations() {
expression or an interval.
</Paragraph>

{scheduled.length > 0 && (
<ListPagination list={{ pagination }} className="mt-2 justify-end" />
)}

<Table containerClassName="mt-4">
<TableHeader>
<TableRow>
Expand Down
Loading
Loading