Skip to content

Concurrency page and more accurate tracking #1252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions apps/webapp/app/components/admin/debugTooltip.tsx
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
import * as Property from "~/components/primitives/PropertyTable";
import { ShieldCheckIcon } from "@heroicons/react/20/solid";
import * as Property from "~/components/primitives/PropertyTable";
import {
Tooltip,
TooltipContent,
TooltipProvider,
TooltipTrigger,
} from "~/components/primitives/Tooltip";
import {
useIsImpersonating,
useOptionalOrganization,
useOrganization,
} from "~/hooks/useOrganizations";
import { useOptionalProject, useProject } from "~/hooks/useProject";
import { useIsImpersonating, useOptionalOrganization } from "~/hooks/useOrganizations";
import { useOptionalProject } from "~/hooks/useProject";
import { useHasAdminAccess, useUser } from "~/hooks/useUser";

export function AdminDebugTooltip({ children }: { children: React.ReactNode }) {
export function AdminDebugTooltip({ children }: { children?: React.ReactNode }) {
const hasAdminAccess = useHasAdminAccess();
const isImpersonating = useIsImpersonating();

Expand Down
12 changes: 11 additions & 1 deletion apps/webapp/app/components/navigation/SideMenu.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
CursorArrowRaysIcon,
IdentificationIcon,
KeyIcon,
RectangleStackIcon,
ServerStackIcon,
ShieldCheckIcon,
SignalIcon,
Expand Down Expand Up @@ -46,6 +47,7 @@ import {
projectTriggersPath,
v3ApiKeysPath,
v3BillingPath,
v3ConcurrencyPath,
v3DeploymentsPath,
v3EnvironmentVariablesPath,
v3ProjectAlertsPath,
Expand Down Expand Up @@ -77,9 +79,9 @@ import {
PopoverSectionHeader,
} from "../primitives/Popover";
import { StepNumber } from "../primitives/StepNumber";
import { TextLink } from "../primitives/TextLink";
import { SideMenuHeader } from "./SideMenuHeader";
import { SideMenuItem } from "./SideMenuItem";
import { TextLink } from "../primitives/TextLink";

type SideMenuUser = Pick<User, "email" | "admin"> & { isImpersonating: boolean };
type SideMenuProject = Pick<
Expand Down Expand Up @@ -602,6 +604,7 @@ function V3ProjectSideMenu({
to={v3EnvironmentVariablesPath(organization, project)}
data-action="environment variables"
/>

<SideMenuItem
name="Deployments"
icon={ServerStackIcon}
Expand All @@ -618,6 +621,13 @@ function V3ProjectSideMenu({
data-action="alerts"
/>
)}
<SideMenuItem
name="Concurrency limits"
icon={RectangleStackIcon}
iconColor="text-indigo-500"
to={v3ConcurrencyPath(organization, project)}
data-action="concurrency"
/>
<SideMenuItem
name="Project settings"
icon="settings"
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/models/runtimeEnvironment.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ export async function disconnectSession(environmentId: string) {
return session;
}

type DisplayableInputEnvironment = Prisma.RuntimeEnvironmentGetPayload<{
export type DisplayableInputEnvironment = Prisma.RuntimeEnvironmentGetPayload<{
select: {
id: true;
type: true;
Expand Down
22 changes: 21 additions & 1 deletion apps/webapp/app/models/task.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { JobRun, Task, TaskAttempt } from "@trigger.dev/database";
import type { JobRun, Task, TaskAttempt, TaskTriggerSource } from "@trigger.dev/database";
import { CachedTask, ServerTask } from "@trigger.dev/core";
import { PrismaClientOrTransaction, sqlDatabaseSchema } from "~/db.server";

export type TaskWithAttempts = Task & {
attempts: TaskAttempt[];
Expand Down Expand Up @@ -116,3 +117,22 @@ function prepareTaskForCaching(task: TaskForCaching): CachedTask {
function calculateCachedTaskSize(task: CachedTask): number {
return JSON.stringify(task).length;
}

/**
*
* @param prisma An efficient query to get all task identifiers for a project.
* It has indexes for fast performance.
* It does NOT care about versions, so includes all tasks ever created.
*/
export function getAllTaskIdentifiers(prisma: PrismaClientOrTransaction, projectId: string) {
return prisma.$queryRaw<
{
slug: string;
triggerSource: TaskTriggerSource;
}[]
>`
SELECT DISTINCT(slug), "triggerSource"
FROM ${sqlDatabaseSchema}."BackgroundWorkerTask"
WHERE "projectId" = ${projectId}
ORDER BY slug ASC;`;
}
98 changes: 98 additions & 0 deletions apps/webapp/app/presenters/v3/ConcurrencyPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import { QUEUED_STATUSES } from "~/components/runs/v3/TaskRunStatus";
import { Prisma, sqlDatabaseSchema } from "~/db.server";
import { type Project } from "~/models/project.server";
import {
displayableEnvironment,
type DisplayableInputEnvironment,
} from "~/models/runtimeEnvironment.server";
import { type User } from "~/models/user.server";
import { getLimit } from "~/services/platform.v3.server";
import { sortEnvironments } from "~/utils/environmentSort";
import { concurrencyTracker } from "~/v3/services/taskRunConcurrencyTracker.server";
import { BasePresenter } from "./basePresenter.server";

export type Environment = Awaited<
ReturnType<ConcurrencyPresenter["environmentConcurrency"]>
>[number];

export class ConcurrencyPresenter extends BasePresenter {
public async call({ userId, projectSlug }: { userId: User["id"]; projectSlug: Project["slug"] }) {
const project = await this._replica.project.findFirst({
select: {
id: true,
organizationId: true,
environments: {
select: {
id: true,
apiKey: true,
pkApiKey: true,
type: true,
slug: true,
updatedAt: true,
orgMember: {
select: {
user: { select: { id: true, name: true, displayName: true } },
},
},
maximumConcurrencyLimit: true,
},
},
},
where: {
slug: projectSlug,
organization: {
members: {
some: {
userId,
},
},
},
},
});

if (!project) {
throw new Error(`Project not found: ${projectSlug}`);
}

return {
environments: this.environmentConcurrency(project.id, userId, project.environments),
};
}

async environmentConcurrency(
projectId: string,
userId: string,
environments: (DisplayableInputEnvironment & { maximumConcurrencyLimit: number })[]
) {
const environmentConcurrency = await concurrencyTracker.environmentConcurrentRunCounts(
projectId,
environments.map((env) => env.id)
);

const queued = await this._replica.$queryRaw<
{
runtimeEnvironmentId: string;
count: BigInt;
}[]
>`
SELECT
"runtimeEnvironmentId",
COUNT(*)
FROM
${sqlDatabaseSchema}."TaskRun" as tr
WHERE
tr."projectId" = ${projectId}
AND tr."status" = ANY(ARRAY[${Prisma.join(QUEUED_STATUSES)}]::\"TaskRunStatus\"[])
GROUP BY
tr."runtimeEnvironmentId";`;

const sortedEnvironments = sortEnvironments(environments).map((environment) => ({
...displayableEnvironment(environment, userId),
concurrencyLimit: environment.maximumConcurrencyLimit,
concurrency: environmentConcurrency[environment.id] ?? 0,
queued: Number(queued.find((q) => q.runtimeEnvironmentId === environment.id)?.count ?? 0),
}));

return sortedEnvironments;
}
}
12 changes: 2 additions & 10 deletions apps/webapp/app/presenters/v3/RunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { sqlDatabaseSchema } from "~/db.server";
import { displayableEnvironment } from "~/models/runtimeEnvironment.server";
import { isCancellableRunStatus } from "~/v3/taskStatus";
import { BasePresenter } from "./basePresenter.server";
import { getAllTaskIdentifiers } from "~/models/task.server";

export type RunListOptions = {
userId?: string;
Expand Down Expand Up @@ -97,16 +98,7 @@ export class RunListPresenter extends BasePresenter {
});

//get all possible tasks
const possibleTasksAsync = this._replica.$queryRaw<
{
slug: string;
triggerSource: TaskTriggerSource;
}[]
>`
SELECT DISTINCT(slug), "triggerSource"
FROM ${sqlDatabaseSchema}."BackgroundWorkerTask"
WHERE "projectId" = ${project.id}
ORDER BY slug ASC;`;
const possibleTasksAsync = getAllTaskIdentifiers(this._replica, project.id);

//get possible bulk actions
const bulkActionsAsync = this._replica.bulkActionGroup.findMany({
Expand Down
56 changes: 23 additions & 33 deletions apps/webapp/app/presenters/v3/TaskListPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { Prisma } from "@trigger.dev/database";
import type {
RuntimeEnvironmentType,
TaskTriggerSource,
TaskRunStatus as TaskRunStatusType,
} from "@trigger.dev/database";
import { Prisma } from "@trigger.dev/database";
import { QUEUED_STATUSES, RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
import { sqlDatabaseSchema } from "~/db.server";
import type { Organization } from "~/models/organization.server";
Expand All @@ -20,6 +20,7 @@ import { logger } from "~/services/logger.server";
import { BasePresenter } from "./basePresenter.server";
import { TaskRunStatus } from "~/database-types";
import { CURRENT_DEPLOYMENT_LABEL } from "~/consts";
import { concurrencyTracker } from "~/v3/services/taskRunConcurrencyTracker.server";

export type Task = {
slug: string;
Expand Down Expand Up @@ -114,7 +115,7 @@ export class TaskListPresenter extends BasePresenter {
JOIN ${sqlDatabaseSchema}."BackgroundWorkerTask" tasks ON tasks."workerId" = workers.id
ORDER BY slug ASC;`;

//group by the task identifier (task.slug). Add the latestRun and add all the environments.
//group by the task identifier (task.slug).
const outputTasks = tasks.reduce((acc, task) => {
const environment = project.environments.find((env) => env.id === task.runtimeEnvironmentId);
if (!environment) {
Expand Down Expand Up @@ -251,51 +252,40 @@ export class TaskListPresenter extends BasePresenter {
return {};
}

const statuses = await this._replica.$queryRaw<
const concurrencies = await concurrencyTracker.taskConcurrentRunCounts(projectId, tasks);

const queued = await this._replica.$queryRaw<
{
taskIdentifier: string;
status: TaskRunStatusType;
count: BigInt;
}[]
>`
SELECT
tr."taskIdentifier",
tr."status",
tr."taskIdentifier",
COUNT(*)
FROM
${sqlDatabaseSchema}."TaskRun" as tr
WHERE
tr."taskIdentifier" IN (${Prisma.join(tasks)})
AND tr."projectId" = ${projectId}
AND tr."status" IN ('PENDING', 'WAITING_FOR_DEPLOY', 'EXECUTING', 'RETRYING_AFTER_FAILURE', 'WAITING_TO_RESUME')
AND tr."status" = ANY(ARRAY[${Prisma.join(QUEUED_STATUSES)}]::\"TaskRunStatus\"[])
GROUP BY
tr."taskIdentifier",
tr."status"
tr."taskIdentifier"
ORDER BY
tr."taskIdentifier" ASC,
tr."status" ASC;`;

return statuses.reduce((acc, a) => {
let existingTask = acc[a.taskIdentifier];

if (!existingTask) {
existingTask = {
queued: 0,
running: 0,
};

acc[a.taskIdentifier] = existingTask;
}

if (QUEUED_STATUSES.includes(a.status)) {
existingTask.queued += Number(a.count);
}
if (RUNNING_STATUSES.includes(a.status)) {
existingTask.running += Number(a.count);
}

return acc;
}, {} as Record<string, { queued: number; running: number }>);
tr."taskIdentifier" ASC`;

//create an object combining the queued and concurrency counts
const result: Record<string, { queued: number; running: number }> = {};
for (const task of tasks) {
const concurrency = concurrencies[task] ?? 0;
const queuedCount = queued.find((q) => q.taskIdentifier === task)?.count ?? 0;

result[task] = {
queued: Number(queuedCount),
running: concurrency,
};
}
return result;
}

async #getAverageDurations(tasks: string[], projectId: string) {
Expand Down
Loading
Loading