Skip to content

Commit 7e411ac

Browse files
committed
New PENDING_VERSION system which now requires queues to exist at dequeue time
1 parent 7a58439 commit 7e411ac

File tree

40 files changed

+789
-355
lines changed

40 files changed

+789
-355
lines changed

apps/webapp/app/components/runs/v3/TaskRunStatus.tsx

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { cn } from "~/utils/cn";
2121
export const allTaskRunStatuses = [
2222
"DELAYED",
2323
"WAITING_FOR_DEPLOY",
24+
"PENDING_VERSION",
2425
"PENDING",
2526
"EXECUTING",
2627
"RETRYING_AFTER_FAILURE",
@@ -37,7 +38,7 @@ export const allTaskRunStatuses = [
3738
] as const satisfies Readonly<Array<TaskRunStatus>>;
3839

3940
export const filterableTaskRunStatuses = [
40-
"WAITING_FOR_DEPLOY",
41+
"PENDING_VERSION",
4142
"DELAYED",
4243
"PENDING",
4344
"WAITING_TO_RESUME",
@@ -56,7 +57,10 @@ export const filterableTaskRunStatuses = [
5657
const taskRunStatusDescriptions: Record<TaskRunStatus, string> = {
5758
DELAYED: "Task has been delayed and is waiting to be executed.",
5859
PENDING: "Task is waiting to be executed.",
59-
WAITING_FOR_DEPLOY: "Task needs to be deployed first to start executing.",
60+
PENDING_VERSION:
61+
"Task is waiting for a version update because it cannot execute without additional information (task, queue, etc.).",
62+
WAITING_FOR_DEPLOY:
63+
"Task is waiting for a version update because it cannot execute without additional information (task, queue, etc.).",
6064
EXECUTING: "Task is currently being executed.",
6165
RETRYING_AFTER_FAILURE: "Task is being reattempted after a failure.",
6266
WAITING_TO_RESUME: `You have used a "wait" function. When the wait is complete, the task will resume execution.`,
@@ -73,6 +77,7 @@ const taskRunStatusDescriptions: Record<TaskRunStatus, string> = {
7377

7478
export const QUEUED_STATUSES = [
7579
"PENDING",
80+
"PENDING_VERSION",
7681
"WAITING_FOR_DEPLOY",
7782
"DELAYED",
7883
] satisfies TaskRunStatus[];
@@ -120,6 +125,7 @@ export function TaskRunStatusIcon({
120125
return <ClockIcon className={cn(runStatusClassNameColor(status), className)} />;
121126
case "PENDING":
122127
return <RectangleStackIcon className={cn(runStatusClassNameColor(status), className)} />;
128+
case "PENDING_VERSION":
123129
case "WAITING_FOR_DEPLOY":
124130
return <RectangleStackIcon className={cn(runStatusClassNameColor(status), className)} />;
125131
case "EXECUTING":
@@ -158,6 +164,7 @@ export function runStatusClassNameColor(status: TaskRunStatus): string {
158164
case "PENDING":
159165
case "DELAYED":
160166
return "text-charcoal-500";
167+
case "PENDING_VERSION":
161168
case "WAITING_FOR_DEPLOY":
162169
return "text-amber-500";
163170
case "EXECUTING":
@@ -194,8 +201,9 @@ export function runStatusTitle(status: TaskRunStatus): string {
194201
return "Delayed";
195202
case "PENDING":
196203
return "Queued";
204+
case "PENDING_VERSION":
197205
case "WAITING_FOR_DEPLOY":
198-
return "Waiting for deploy";
206+
return "Pending version";
199207
case "EXECUTING":
200208
return "Executing";
201209
case "WAITING_TO_RESUME":

apps/webapp/app/database-types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ export const TaskRunAttemptStatus = {
2929

3030
export const TaskRunStatus = {
3131
PENDING: "PENDING",
32+
PENDING_VERSION: "PENDING_VERSION",
3233
WAITING_FOR_DEPLOY: "WAITING_FOR_DEPLOY",
3334
EXECUTING: "EXECUTING",
3435
WAITING_TO_RESUME: "WAITING_TO_RESUME",

apps/webapp/app/hooks/useFilterTasks.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ type Task = {
44
id: string;
55
friendlyId: string;
66
taskIdentifier: string;
7-
exportName: string;
87
filePath: string;
98
triggerSource: string;
109
};
@@ -17,10 +16,6 @@ export function useFilterTasks<T extends Task>({ tasks }: { tasks: T[] }) {
1716
return true;
1817
}
1918

20-
if (task.exportName.toLowerCase().includes(text.toLowerCase())) {
21-
return true;
22-
}
23-
2419
if (task.filePath.toLowerCase().includes(text.toLowerCase())) {
2520
return true;
2621
}

apps/webapp/app/models/taskQueue.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { QueueOptions } from "@trigger.dev/core/v3/schemas";
1+
import { QueueManifest } from "@trigger.dev/core/v3/schemas";
22
import { TaskQueue } from "@trigger.dev/database";
33
import { prisma } from "~/db.server";
44

@@ -35,7 +35,7 @@ export async function findQueueInEnvironment(
3535
return;
3636
}
3737

38-
const queueConfig = QueueOptions.safeParse(task.queueConfig);
38+
const queueConfig = QueueManifest.safeParse(task.queueConfig);
3939

4040
if (queueConfig.success) {
4141
const taskQueueName = queueConfig.data.name

apps/webapp/app/models/taskRun.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ export function batchTaskRunItemStatusForRunStatus(
125125
case TaskRunStatus.TIMED_OUT:
126126
return BatchTaskRunItemStatus.FAILED;
127127
case TaskRunStatus.PENDING:
128+
case TaskRunStatus.PENDING_VERSION:
128129
case TaskRunStatus.WAITING_FOR_DEPLOY:
129130
case TaskRunStatus.WAITING_TO_RESUME:
130131
case TaskRunStatus.RETRYING_AFTER_FAILURE:

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,9 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
204204
case "DELAYED": {
205205
return "DELAYED";
206206
}
207+
case "PENDING_VERSION": {
208+
return "PENDING_VERSION";
209+
}
207210
case "WAITING_FOR_DEPLOY": {
208211
return "WAITING_FOR_DEPLOY";
209212
}
@@ -257,7 +260,11 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
257260
}
258261

259262
static apiBooleanHelpersFromRunStatus(status: RunStatus) {
260-
const isQueued = status === "QUEUED" || status === "WAITING_FOR_DEPLOY" || status === "DELAYED";
263+
const isQueued =
264+
status === "QUEUED" ||
265+
status === "WAITING_FOR_DEPLOY" ||
266+
status === "DELAYED" ||
267+
status === "PENDING_VERSION";
261268
const isExecuting = status === "EXECUTING" || status === "REATTEMPTING" || status === "FROZEN";
262269
const isCompleted =
263270
status === "COMPLETED" ||

apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,9 @@ export class ApiRunListPresenter extends BasePresenter {
280280
switch (status) {
281281
case "DELAYED":
282282
return "DELAYED";
283+
case "PENDING_VERSION": {
284+
return "PENDING_VERSION";
285+
}
283286
case "WAITING_FOR_DEPLOY": {
284287
return "WAITING_FOR_DEPLOY";
285288
}

apps/webapp/app/presenters/v3/TestTaskPresenter.server.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type Task = {
2121
id: string;
2222
taskIdentifier: string;
2323
filePath: string;
24-
exportName: string;
24+
exportName?: string;
2525
friendlyId: string;
2626
};
2727

@@ -151,7 +151,6 @@ export class TestTaskPresenter {
151151
id: task.id,
152152
taskIdentifier: task.slug,
153153
filePath: task.filePath,
154-
exportName: task.exportName,
155154
friendlyId: task.friendlyId,
156155
};
157156

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam._index/route.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,7 @@ function TaskActivityGraph({ activity }: { activity: TaskActivity }) {
507507
isAnimationActive={false}
508508
/>
509509
<Bar dataKey="PENDING" fill="#5F6570" stackId="a" strokeWidth={0} barSize={10} />
510-
<Bar dataKey="WAITING_FOR_DEPLOY" fill="#F59E0B" stackId="a" strokeWidth={0} barSize={10} />
510+
<Bar dataKey="PENDING_VERSION" fill="#F59E0B" stackId="a" strokeWidth={0} barSize={10} />
511511
<Bar dataKey="EXECUTING" fill="#3B82F6" stackId="a" strokeWidth={0} barSize={10} />
512512
<Bar
513513
dataKey="RETRYING_AFTER_FAILURE"

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.deployments.$deploymentParam/route.tsx

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -236,11 +236,6 @@ export default function Page() {
236236
<TableRow key={t.slug}>
237237
<TableCell>
238238
<div className="inline-flex flex-col gap-0.5">
239-
<TaskFunctionName
240-
variant="extra-small"
241-
functionName={t.exportName}
242-
className="-ml-1 inline-flex"
243-
/>
244239
<Paragraph variant="extra-small" className="text-text-dimmed">
245240
{t.slug}
246241
</Paragraph>

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1688,7 +1688,7 @@ class SharedQueueTasks {
16881688
task: {
16891689
id: backgroundWorkerTask.slug,
16901690
filePath: backgroundWorkerTask.filePath,
1691-
exportName: backgroundWorkerTask.exportName,
1691+
exportName: backgroundWorkerTask.exportName ?? backgroundWorkerTask.slug,
16921692
},
16931693
attempt: {
16941694
id: attempt.friendlyId,

apps/webapp/app/v3/services/alerts/deliverAlert.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ export class DeliverAlertService extends BaseService {
573573
alert.workerDeployment.worker?.tasks.map((task) => ({
574574
id: task.slug,
575575
filePath: task.filePath,
576-
exportName: task.exportName,
576+
exportName: task.exportName ?? "@deprecated",
577577
triggerSource: task.triggerSource,
578578
})) ?? [],
579579
environment: {

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 102 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@ import {
2020
} from "../runQueue.server";
2121
import { calculateNextBuildVersion } from "../utils/calculateNextBuildVersion";
2222
import { clampMaxDuration } from "../utils/maxDuration";
23-
import { BaseService } from "./baseService.server";
23+
import { BaseService, ServiceValidationError } from "./baseService.server";
2424
import { CheckScheduleService } from "./checkSchedule.server";
2525
import { projectPubSub } from "./projectPubSub.server";
2626
import { RegisterNextTaskScheduleInstanceService } from "./registerNextTaskScheduleInstance.server";
27+
import { tryCatch } from "@trigger.dev/core/v3";
28+
import { engine } from "../runEngine.server";
2729

2830
export class CreateBackgroundWorkerService extends BaseService {
2931
public async call(
@@ -96,58 +98,97 @@ export class CreateBackgroundWorkerService extends BaseService {
9698
});
9799
}
98100

99-
const tasksToBackgroundFiles = await createBackgroundFiles(
100-
body.metadata.sourceFiles,
101-
backgroundWorker,
102-
environment,
103-
this._prisma
101+
const [filesError, tasksToBackgroundFiles] = await tryCatch(
102+
createBackgroundFiles(
103+
body.metadata.sourceFiles,
104+
backgroundWorker,
105+
environment,
106+
this._prisma
107+
)
104108
);
105-
await createWorkerResources(
106-
body.metadata,
107-
backgroundWorker,
108-
environment,
109-
this._prisma,
110-
tasksToBackgroundFiles
109+
110+
if (filesError) {
111+
logger.error("Error creating background worker files", {
112+
error: filesError,
113+
backgroundWorker,
114+
environment,
115+
});
116+
117+
throw new ServiceValidationError("Error creating background worker files");
118+
}
119+
120+
const [resourcesError] = await tryCatch(
121+
createWorkerResources(
122+
body.metadata,
123+
backgroundWorker,
124+
environment,
125+
this._prisma,
126+
tasksToBackgroundFiles
127+
)
111128
);
112-
await syncDeclarativeSchedules(
113-
body.metadata.tasks,
114-
backgroundWorker,
115-
environment,
116-
this._prisma
129+
130+
if (resourcesError) {
131+
logger.error("Error creating worker resources", {
132+
error: resourcesError,
133+
backgroundWorker,
134+
environment,
135+
});
136+
throw new ServiceValidationError("Error creating worker resources");
137+
}
138+
139+
const [schedulesError] = await tryCatch(
140+
syncDeclarativeSchedules(body.metadata.tasks, backgroundWorker, environment, this._prisma)
117141
);
118142

119-
try {
120-
//send a notification that a new worker has been created
121-
await projectPubSub.publish(
122-
`project:${project.id}:env:${environment.id}`,
123-
"WORKER_CREATED",
124-
{
125-
environmentId: environment.id,
126-
environmentType: environment.type,
127-
createdAt: backgroundWorker.createdAt,
128-
taskCount: body.metadata.tasks.length,
129-
type: "local",
130-
}
131-
);
143+
if (schedulesError) {
144+
logger.error("Error syncing declarative schedules", {
145+
error: schedulesError,
146+
backgroundWorker,
147+
environment,
148+
});
149+
throw new ServiceValidationError("Error syncing declarative schedules");
150+
}
132151

133-
await updateEnvConcurrencyLimits(environment);
134-
} catch (err) {
135-
logger.error(
136-
"Error publishing WORKER_CREATED event or updating global concurrency limits",
137-
{
138-
error:
139-
err instanceof Error
140-
? {
141-
name: err.name,
142-
message: err.message,
143-
stack: err.stack,
144-
}
145-
: err,
146-
project,
147-
environment,
148-
backgroundWorker,
149-
}
152+
const [updateConcurrencyLimitsError] = await tryCatch(
153+
updateEnvConcurrencyLimits(environment)
154+
);
155+
156+
if (updateConcurrencyLimitsError) {
157+
logger.error("Error updating environment concurrency limits", {
158+
error: updateConcurrencyLimitsError,
159+
backgroundWorker,
160+
environment,
161+
});
162+
}
163+
164+
const [publishError] = await tryCatch(
165+
projectPubSub.publish(`project:${project.id}:env:${environment.id}`, "WORKER_CREATED", {
166+
environmentId: environment.id,
167+
environmentType: environment.type,
168+
createdAt: backgroundWorker.createdAt,
169+
taskCount: body.metadata.tasks.length,
170+
type: "local",
171+
})
172+
);
173+
174+
if (publishError) {
175+
logger.error("Error publishing WORKER_CREATED event", {
176+
error: publishError,
177+
backgroundWorker,
178+
environment,
179+
});
180+
}
181+
182+
if (backgroundWorker.engine === "V2") {
183+
const [schedulePendingVersionsError] = await tryCatch(
184+
engine.scheduleEnqueueRunsForBackgroundWorker(backgroundWorker.id)
150185
);
186+
187+
if (schedulePendingVersionsError) {
188+
logger.error("Error scheduling pending versions", {
189+
error: schedulePendingVersionsError,
190+
});
191+
}
151192
}
152193

153194
return backgroundWorker;
@@ -338,6 +379,20 @@ async function createWorkerQueue(
338379
runtimeEnvironmentId: worker.runtimeEnvironmentId,
339380
projectId: worker.projectId,
340381
type: queueType,
382+
workers: {
383+
connect: {
384+
id: worker.id,
385+
},
386+
},
387+
},
388+
});
389+
} else {
390+
await prisma.taskQueue.update({
391+
where: {
392+
id: taskQueue.id,
393+
},
394+
data: {
395+
workers: { connect: { id: worker.id } },
341396
},
342397
});
343398
}

apps/webapp/app/v3/services/createDeployedBackgroundWorker.server.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,7 @@ export class CreateDeployedBackgroundWorkerService extends BaseService {
6464
}
6565

6666
try {
67-
await createWorkerResources(
68-
body.metadata.tasks,
69-
backgroundWorker,
70-
environment,
71-
this._prisma
72-
);
67+
await createWorkerResources(body.metadata, backgroundWorker, environment, this._prisma);
7368
await syncDeclarativeSchedules(
7469
body.metadata.tasks,
7570
backgroundWorker,

0 commit comments

Comments
 (0)