Skip to content

Commit 2c80e9a

Browse files
committed
Concurrency page and more accurate tracking (#1252)
* Initial TaskRunConcurrencyTracker implementation * MARQS calls a subscriber to events * When enqueuing add the extra required metadata * Track concurrency per environment for tasks too * Admin page for global concurrency * Use the new concurrency tracker on the tasks page * Useful performance test task * getAllTaskIdentifiers() * New page for concurrency * BackgroundWorkerTask index for quick lookup of task identifiers * Added a way to get concurrency for environments * Added upgrade/request more concurrency button * Queued task column working * Use defer and suspense * Added queue column to the concurrency environments table * Some comments added for clarity * Fixed bad log message * Sidemenu: move lower and rename to “Concurrency limits” * Only show the environments, not tasks. Renamed to “Concurrency limits”
1 parent 37fd088 commit 2c80e9a

File tree

23 files changed

+667
-67
lines changed

23 files changed

+667
-67
lines changed

apps/webapp/app/components/admin/debugTooltip.tsx

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,16 @@
1-
import * as Property from "~/components/primitives/PropertyTable";
21
import { ShieldCheckIcon } from "@heroicons/react/20/solid";
2+
import * as Property from "~/components/primitives/PropertyTable";
33
import {
44
Tooltip,
55
TooltipContent,
66
TooltipProvider,
77
TooltipTrigger,
88
} from "~/components/primitives/Tooltip";
9-
import {
10-
useIsImpersonating,
11-
useOptionalOrganization,
12-
useOrganization,
13-
} from "~/hooks/useOrganizations";
14-
import { useOptionalProject, useProject } from "~/hooks/useProject";
9+
import { useIsImpersonating, useOptionalOrganization } from "~/hooks/useOrganizations";
10+
import { useOptionalProject } from "~/hooks/useProject";
1511
import { useHasAdminAccess, useUser } from "~/hooks/useUser";
1612

17-
export function AdminDebugTooltip({ children }: { children: React.ReactNode }) {
13+
export function AdminDebugTooltip({ children }: { children?: React.ReactNode }) {
1814
const hasAdminAccess = useHasAdminAccess();
1915
const isImpersonating = useIsImpersonating();
2016

apps/webapp/app/components/navigation/SideMenu.tsx

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
CursorArrowRaysIcon,
1010
IdentificationIcon,
1111
KeyIcon,
12+
RectangleStackIcon,
1213
ServerStackIcon,
1314
ShieldCheckIcon,
1415
SignalIcon,
@@ -46,6 +47,7 @@ import {
4647
projectTriggersPath,
4748
v3ApiKeysPath,
4849
v3BillingPath,
50+
v3ConcurrencyPath,
4951
v3DeploymentsPath,
5052
v3EnvironmentVariablesPath,
5153
v3ProjectAlertsPath,
@@ -605,6 +607,7 @@ function V3ProjectSideMenu({
605607
to={v3EnvironmentVariablesPath(organization, project)}
606608
data-action="environment variables"
607609
/>
610+
608611
<SideMenuItem
609612
name="Deployments"
610613
icon={ServerStackIcon}
@@ -621,6 +624,13 @@ function V3ProjectSideMenu({
621624
data-action="alerts"
622625
/>
623626
)}
627+
<SideMenuItem
628+
name="Concurrency limits"
629+
icon={RectangleStackIcon}
630+
iconColor="text-indigo-500"
631+
to={v3ConcurrencyPath(organization, project)}
632+
data-action="concurrency"
633+
/>
624634
<SideMenuItem
625635
name="Project settings"
626636
icon="settings"

apps/webapp/app/models/runtimeEnvironment.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ export async function disconnectSession(environmentId: string) {
116116
return session;
117117
}
118118

119-
type DisplayableInputEnvironment = Prisma.RuntimeEnvironmentGetPayload<{
119+
export type DisplayableInputEnvironment = Prisma.RuntimeEnvironmentGetPayload<{
120120
select: {
121121
id: true;
122122
type: true;

apps/webapp/app/models/task.server.ts

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
import type { JobRun, Task, TaskAttempt } from "@trigger.dev/database";
1+
import type { JobRun, Task, TaskAttempt, TaskTriggerSource } from "@trigger.dev/database";
22
import { CachedTask, ServerTask } from "@trigger.dev/core";
3+
import { PrismaClientOrTransaction, sqlDatabaseSchema } from "~/db.server";
34

45
export type TaskWithAttempts = Task & {
56
attempts: TaskAttempt[];
@@ -116,3 +117,22 @@ function prepareTaskForCaching(task: TaskForCaching): CachedTask {
116117
function calculateCachedTaskSize(task: CachedTask): number {
117118
return JSON.stringify(task).length;
118119
}
120+
121+
/**
122+
*
123+
* @param prisma An efficient query to get all task identifiers for a project.
124+
* It has indexes for fast performance.
125+
* It does NOT care about versions, so includes all tasks ever created.
126+
*/
127+
export function getAllTaskIdentifiers(prisma: PrismaClientOrTransaction, projectId: string) {
128+
return prisma.$queryRaw<
129+
{
130+
slug: string;
131+
triggerSource: TaskTriggerSource;
132+
}[]
133+
>`
134+
SELECT DISTINCT(slug), "triggerSource"
135+
FROM ${sqlDatabaseSchema}."BackgroundWorkerTask"
136+
WHERE "projectId" = ${projectId}
137+
ORDER BY slug ASC;`;
138+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import { QUEUED_STATUSES } from "~/components/runs/v3/TaskRunStatus";
2+
import { Prisma, sqlDatabaseSchema } from "~/db.server";
3+
import { type Project } from "~/models/project.server";
4+
import {
5+
displayableEnvironment,
6+
type DisplayableInputEnvironment,
7+
} from "~/models/runtimeEnvironment.server";
8+
import { type User } from "~/models/user.server";
9+
import { getLimit } from "~/services/platform.v3.server";
10+
import { sortEnvironments } from "~/utils/environmentSort";
11+
import { concurrencyTracker } from "~/v3/services/taskRunConcurrencyTracker.server";
12+
import { BasePresenter } from "./basePresenter.server";
13+
14+
export type Environment = Awaited<
15+
ReturnType<ConcurrencyPresenter["environmentConcurrency"]>
16+
>[number];
17+
18+
export class ConcurrencyPresenter extends BasePresenter {
19+
public async call({ userId, projectSlug }: { userId: User["id"]; projectSlug: Project["slug"] }) {
20+
const project = await this._replica.project.findFirst({
21+
select: {
22+
id: true,
23+
organizationId: true,
24+
environments: {
25+
select: {
26+
id: true,
27+
apiKey: true,
28+
pkApiKey: true,
29+
type: true,
30+
slug: true,
31+
updatedAt: true,
32+
orgMember: {
33+
select: {
34+
user: { select: { id: true, name: true, displayName: true } },
35+
},
36+
},
37+
maximumConcurrencyLimit: true,
38+
},
39+
},
40+
},
41+
where: {
42+
slug: projectSlug,
43+
organization: {
44+
members: {
45+
some: {
46+
userId,
47+
},
48+
},
49+
},
50+
},
51+
});
52+
53+
if (!project) {
54+
throw new Error(`Project not found: ${projectSlug}`);
55+
}
56+
57+
return {
58+
environments: this.environmentConcurrency(project.id, userId, project.environments),
59+
};
60+
}
61+
62+
async environmentConcurrency(
63+
projectId: string,
64+
userId: string,
65+
environments: (DisplayableInputEnvironment & { maximumConcurrencyLimit: number })[]
66+
) {
67+
const environmentConcurrency = await concurrencyTracker.environmentConcurrentRunCounts(
68+
projectId,
69+
environments.map((env) => env.id)
70+
);
71+
72+
const queued = await this._replica.$queryRaw<
73+
{
74+
runtimeEnvironmentId: string;
75+
count: BigInt;
76+
}[]
77+
>`
78+
SELECT
79+
"runtimeEnvironmentId",
80+
COUNT(*)
81+
FROM
82+
${sqlDatabaseSchema}."TaskRun" as tr
83+
WHERE
84+
tr."projectId" = ${projectId}
85+
AND tr."status" = ANY(ARRAY[${Prisma.join(QUEUED_STATUSES)}]::\"TaskRunStatus\"[])
86+
GROUP BY
87+
tr."runtimeEnvironmentId";`;
88+
89+
const sortedEnvironments = sortEnvironments(environments).map((environment) => ({
90+
...displayableEnvironment(environment, userId),
91+
concurrencyLimit: environment.maximumConcurrencyLimit,
92+
concurrency: environmentConcurrency[environment.id] ?? 0,
93+
queued: Number(queued.find((q) => q.runtimeEnvironmentId === environment.id)?.count ?? 0),
94+
}));
95+
96+
return sortedEnvironments;
97+
}
98+
}

apps/webapp/app/presenters/v3/RunListPresenter.server.ts

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { sqlDatabaseSchema } from "~/db.server";
66
import { displayableEnvironment } from "~/models/runtimeEnvironment.server";
77
import { isCancellableRunStatus } from "~/v3/taskStatus";
88
import { BasePresenter } from "./basePresenter.server";
9+
import { getAllTaskIdentifiers } from "~/models/task.server";
910

1011
export type RunListOptions = {
1112
userId?: string;
@@ -97,16 +98,7 @@ export class RunListPresenter extends BasePresenter {
9798
});
9899

99100
//get all possible tasks
100-
const possibleTasksAsync = this._replica.$queryRaw<
101-
{
102-
slug: string;
103-
triggerSource: TaskTriggerSource;
104-
}[]
105-
>`
106-
SELECT DISTINCT(slug), "triggerSource"
107-
FROM ${sqlDatabaseSchema}."BackgroundWorkerTask"
108-
WHERE "projectId" = ${project.id}
109-
ORDER BY slug ASC;`;
101+
const possibleTasksAsync = getAllTaskIdentifiers(this._replica, project.id);
110102

111103
//get possible bulk actions
112104
const bulkActionsAsync = this._replica.bulkActionGroup.findMany({

apps/webapp/app/presenters/v3/TaskListPresenter.server.ts

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1+
import { Prisma } from "@trigger.dev/database";
12
import type {
23
RuntimeEnvironmentType,
34
TaskTriggerSource,
45
TaskRunStatus as TaskRunStatusType,
56
} from "@trigger.dev/database";
6-
import { Prisma } from "@trigger.dev/database";
77
import { QUEUED_STATUSES, RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
88
import { sqlDatabaseSchema } from "~/db.server";
99
import type { Organization } from "~/models/organization.server";
@@ -20,6 +20,7 @@ import { logger } from "~/services/logger.server";
2020
import { BasePresenter } from "./basePresenter.server";
2121
import { TaskRunStatus } from "~/database-types";
2222
import { CURRENT_DEPLOYMENT_LABEL } from "~/consts";
23+
import { concurrencyTracker } from "~/v3/services/taskRunConcurrencyTracker.server";
2324

2425
export type Task = {
2526
slug: string;
@@ -114,7 +115,7 @@ export class TaskListPresenter extends BasePresenter {
114115
JOIN ${sqlDatabaseSchema}."BackgroundWorkerTask" tasks ON tasks."workerId" = workers.id
115116
ORDER BY slug ASC;`;
116117

117-
//group by the task identifier (task.slug). Add the latestRun and add all the environments.
118+
//group by the task identifier (task.slug).
118119
const outputTasks = tasks.reduce((acc, task) => {
119120
const environment = project.environments.find((env) => env.id === task.runtimeEnvironmentId);
120121
if (!environment) {
@@ -251,51 +252,40 @@ export class TaskListPresenter extends BasePresenter {
251252
return {};
252253
}
253254

254-
const statuses = await this._replica.$queryRaw<
255+
const concurrencies = await concurrencyTracker.taskConcurrentRunCounts(projectId, tasks);
256+
257+
const queued = await this._replica.$queryRaw<
255258
{
256259
taskIdentifier: string;
257-
status: TaskRunStatusType;
258260
count: BigInt;
259261
}[]
260262
>`
261263
SELECT
262-
tr."taskIdentifier",
263-
tr."status",
264+
tr."taskIdentifier",
264265
COUNT(*)
265266
FROM
266267
${sqlDatabaseSchema}."TaskRun" as tr
267268
WHERE
268269
tr."taskIdentifier" IN (${Prisma.join(tasks)})
269270
AND tr."projectId" = ${projectId}
270-
AND tr."status" IN ('PENDING', 'WAITING_FOR_DEPLOY', 'EXECUTING', 'RETRYING_AFTER_FAILURE', 'WAITING_TO_RESUME')
271+
AND tr."status" = ANY(ARRAY[${Prisma.join(QUEUED_STATUSES)}]::\"TaskRunStatus\"[])
271272
GROUP BY
272-
tr."taskIdentifier",
273-
tr."status"
273+
tr."taskIdentifier"
274274
ORDER BY
275-
tr."taskIdentifier" ASC,
276-
tr."status" ASC;`;
277-
278-
return statuses.reduce((acc, a) => {
279-
let existingTask = acc[a.taskIdentifier];
280-
281-
if (!existingTask) {
282-
existingTask = {
283-
queued: 0,
284-
running: 0,
285-
};
286-
287-
acc[a.taskIdentifier] = existingTask;
288-
}
289-
290-
if (QUEUED_STATUSES.includes(a.status)) {
291-
existingTask.queued += Number(a.count);
292-
}
293-
if (RUNNING_STATUSES.includes(a.status)) {
294-
existingTask.running += Number(a.count);
295-
}
296-
297-
return acc;
298-
}, {} as Record<string, { queued: number; running: number }>);
275+
tr."taskIdentifier" ASC`;
276+
277+
//create an object combining the queued and concurrency counts
278+
const result: Record<string, { queued: number; running: number }> = {};
279+
for (const task of tasks) {
280+
const concurrency = concurrencies[task] ?? 0;
281+
const queuedCount = queued.find((q) => q.taskIdentifier === task)?.count ?? 0;
282+
283+
result[task] = {
284+
queued: Number(queuedCount),
285+
running: concurrency,
286+
};
287+
}
288+
return result;
299289
}
300290

301291
async #getAverageDurations(tasks: string[], projectId: string) {

0 commit comments

Comments
 (0)