Skip to content

Commit 82d24d6

Browse files
committed
Merge branch 'main' into v3/fix-unfreezable-states
2 parents 67b9f84 + bb38261 commit 82d24d6

30 files changed

+408
-208
lines changed

apps/webapp/app/components/runs/v3/TaskRunStatus.tsx

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,23 +67,29 @@ const taskRunStatusDescriptions: Record<TaskRunStatus, string> = {
6767
EXPIRED: "Task has surpassed its ttl and won't be executed",
6868
};
6969

70-
export const QUEUED_STATUSES: TaskRunStatus[] = ["PENDING", "WAITING_FOR_DEPLOY", "DELAYED"];
70+
export const QUEUED_STATUSES = [
71+
"PENDING",
72+
"WAITING_FOR_DEPLOY",
73+
"DELAYED",
74+
] satisfies TaskRunStatus[];
7175

72-
export const RUNNING_STATUSES: TaskRunStatus[] = [
76+
export const RUNNING_STATUSES = [
7377
"EXECUTING",
7478
"RETRYING_AFTER_FAILURE",
7579
"WAITING_TO_RESUME",
76-
];
80+
] satisfies TaskRunStatus[];
7781

78-
export const FINISHED_STATUSES: TaskRunStatus[] = [
82+
export const FINISHED_STATUSES = [
7983
"COMPLETED_SUCCESSFULLY",
8084
"CANCELED",
8185
"COMPLETED_WITH_ERRORS",
8286
"INTERRUPTED",
8387
"SYSTEM_FAILURE",
8488
"CRASHED",
8589
"EXPIRED",
86-
];
90+
] satisfies TaskRunStatus[];
91+
92+
export type FINISHED_STATUSES = (typeof FINISHED_STATUSES)[number];
8793

8894
export function descriptionForTaskRunStatus(status: TaskRunStatus): string {
8995
return taskRunStatusDescriptions[status];

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import { Context, MachinePresetName, prettyPrintPacket } from "@trigger.dev/core/v3";
1+
import { MachinePresetName, prettyPrintPacket, TaskRunError } from "@trigger.dev/core/v3";
22
import { FINISHED_STATUSES, RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
33
import { eventRepository } from "~/v3/eventRepository.server";
4-
import { BasePresenter } from "./basePresenter.server";
5-
import { machineDefinition } from "@trigger.dev/platform/v3";
64
import { machinePresetFromName } from "~/v3/machinePresets.server";
5+
import { FINAL_ATTEMPT_STATUSES } from "~/v3/taskStatus";
6+
import { BasePresenter } from "./basePresenter.server";
77

88
type Result = Awaited<ReturnType<SpanPresenter["call"]>>;
99
export type Span = NonNullable<NonNullable<Result>["span"]>;
@@ -57,6 +57,7 @@ export class SpanPresenter extends BasePresenter {
5757
async getRun(spanId: string) {
5858
const run = await this._replica.taskRun.findFirst({
5959
select: {
60+
id: true,
6061
traceId: true,
6162
//metadata
6263
number: true,
@@ -113,17 +114,6 @@ export class SpanPresenter extends BasePresenter {
113114
payload: true,
114115
payloadType: true,
115116
maxAttempts: true,
116-
//finished attempt
117-
attempts: {
118-
select: {
119-
output: true,
120-
outputType: true,
121-
error: true,
122-
},
123-
where: {
124-
status: "COMPLETED",
125-
},
126-
},
127117
project: {
128118
include: {
129119
organization: true,
@@ -145,9 +135,23 @@ export class SpanPresenter extends BasePresenter {
145135
return;
146136
}
147137

148-
const finishedAttempt = run.attempts.at(0);
138+
const finishedAttempt = await this._replica.taskRunAttempt.findFirst({
139+
select: {
140+
output: true,
141+
outputType: true,
142+
error: true,
143+
},
144+
where: {
145+
status: { in: FINAL_ATTEMPT_STATUSES },
146+
taskRunId: run.id,
147+
},
148+
orderBy: {
149+
createdAt: "desc",
150+
},
151+
});
152+
149153
const output =
150-
finishedAttempt === undefined
154+
finishedAttempt === null
151155
? undefined
152156
: finishedAttempt.outputType === "application/store"
153157
? `/resources/packets/${run.runtimeEnvironment.id}/${finishedAttempt.output}`
@@ -162,6 +166,19 @@ export class SpanPresenter extends BasePresenter {
162166
? await prettyPrintPacket(run.payload, run.payloadType ?? undefined)
163167
: undefined;
164168

169+
let error: TaskRunError | undefined = undefined;
170+
if (finishedAttempt?.error) {
171+
const result = TaskRunError.safeParse(finishedAttempt.error);
172+
if (result.success) {
173+
error = result.data;
174+
} else {
175+
error = {
176+
type: "CUSTOM_ERROR",
177+
raw: JSON.stringify(finishedAttempt.error),
178+
};
179+
}
180+
}
181+
165182
const span = await eventRepository.getSpan(spanId, run.traceId);
166183

167184
const context = {
@@ -247,8 +264,8 @@ export class SpanPresenter extends BasePresenter {
247264
payloadType: run.payloadType,
248265
output,
249266
outputType: finishedAttempt?.outputType ?? "application/json",
267+
error,
250268
links: span?.links,
251-
events: span?.events,
252269
context: JSON.stringify(context, null, 2),
253270
};
254271
}

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,17 @@ import {
55
formatDuration,
66
formatDurationMilliseconds,
77
nanosecondsToMilliseconds,
8+
TaskRunError,
89
} from "@trigger.dev/core/v3";
910
import { ReactNode, useEffect } from "react";
1011
import { typedjson, useTypedFetcher } from "remix-typedjson";
1112
import { ExitIcon } from "~/assets/icons/ExitIcon";
1213
import { CodeBlock } from "~/components/code/CodeBlock";
1314
import { EnvironmentLabel } from "~/components/environments/EnvironmentLabel";
1415
import { Button, LinkButton } from "~/components/primitives/Buttons";
16+
import { Callout } from "~/components/primitives/Callout";
1517
import { DateTime, DateTimeAccurate } from "~/components/primitives/DateTime";
16-
import { Header2 } from "~/components/primitives/Headers";
18+
import { Header2, Header3 } from "~/components/primitives/Headers";
1719
import { Paragraph } from "~/components/primitives/Paragraph";
1820
import * as Property from "~/components/primitives/PropertyTable";
1921
import { Spinner } from "~/components/primitives/Spinner";
@@ -606,8 +608,8 @@ function RunBody({
606608
{run.payload !== undefined && (
607609
<PacketDisplay data={run.payload} dataType={run.payloadType} title="Payload" />
608610
)}
609-
{run.events !== undefined && run.events.length > 0 ? (
610-
<SpanEvents spanEvents={run.events} />
611+
{run.error !== undefined ? (
612+
<RunError error={run.error} />
611613
) : run.output !== undefined ? (
612614
<PacketDisplay data={run.output} dataType={run.outputType} title="Output" />
613615
) : null}
@@ -791,6 +793,42 @@ function RunTimelineLine({ title, state }: RunTimelineLineProps) {
791793
);
792794
}
793795

796+
function RunError({ error }: { error: TaskRunError }) {
797+
switch (error.type) {
798+
case "STRING_ERROR":
799+
case "CUSTOM_ERROR": {
800+
return (
801+
<div className="flex flex-col gap-2 rounded-sm border border-rose-500/50 px-3 pb-3 pt-2">
802+
<CodeBlock
803+
showCopyButton={false}
804+
showLineNumbers={false}
805+
code={error.raw}
806+
maxLines={20}
807+
/>
808+
</div>
809+
);
810+
}
811+
case "BUILT_IN_ERROR":
812+
case "INTERNAL_ERROR": {
813+
const name = "name" in error ? error.name : error.code;
814+
return (
815+
<div className="flex flex-col gap-2 rounded-sm border border-rose-500/50 px-3 pb-3 pt-2">
816+
<Header3 className="text-rose-500">{name}</Header3>
817+
{error.message && <Callout variant="error">{error.message}</Callout>}
818+
{error.stackTrace && (
819+
<CodeBlock
820+
showCopyButton={false}
821+
showLineNumbers={false}
822+
code={error.stackTrace}
823+
maxLines={20}
824+
/>
825+
)}
826+
</div>
827+
);
828+
}
829+
}
830+
}
831+
794832
function PacketDisplay({
795833
data,
796834
dataType,

apps/webapp/app/routes/resources.orgs.$organizationSlug.select-plan.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ const pricingDefinitions = {
119119
},
120120
schedules: {
121121
title: "Schedules",
122-
content: "You can attach recurring schedules to tasks using CRON syntax.",
122+
content: "You can attach recurring schedules to tasks using cron syntax.",
123123
},
124124
alerts: {
125125
title: "Alert destination",

apps/webapp/app/v3/failedTaskRun.server.ts

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import { TaskRunFailedExecutionResult } from "@trigger.dev/core/v3";
2-
import { logger } from "~/services/logger.server";
3-
import { marqs } from "~/v3/marqs/index.server";
4-
52
import { TaskRunStatus } from "@trigger.dev/database";
3+
import { logger } from "~/services/logger.server";
64
import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server";
75
import { BaseService } from "./services/baseService.server";
6+
import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server";
87

98
const FAILABLE_TASK_RUN_STATUSES: TaskRunStatus[] = ["EXECUTING", "PENDING", "WAITING_FOR_DEPLOY"];
109

@@ -40,7 +39,12 @@ export class FailedTaskRunService extends BaseService {
4039
// No more retries, we need to fail the task run
4140
logger.debug("[FailedTaskRunService] Failing task run", { taskRun, completion });
4241

43-
await marqs?.acknowledgeMessage(taskRun.id);
42+
const finalizeService = new FinalizeTaskRunService();
43+
await finalizeService.call({
44+
id: taskRun.id,
45+
status: "SYSTEM_FAILURE",
46+
completedAt: new Date(),
47+
});
4448

4549
// Now we need to "complete" the task run event/span
4650
await eventRepository.completeEvent(taskRun.spanId, {
@@ -58,15 +62,5 @@ export class FailedTaskRunService extends BaseService {
5862
},
5963
],
6064
});
61-
62-
await this._prisma.taskRun.update({
63-
where: {
64-
id: taskRun.id,
65-
},
66-
data: {
67-
status: "SYSTEM_FAILURE",
68-
completedAt: new Date(),
69-
},
70-
});
7165
}
7266
}

apps/webapp/app/v3/services/cancelAttempt.server.ts

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1+
import { $transaction, type PrismaClientOrTransaction, prisma } from "~/db.server";
2+
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
23
import { logger } from "~/services/logger.server";
3-
import { marqs } from "~/v3/marqs/index.server";
44
import { eventRepository } from "../eventRepository.server";
5-
import { BaseService } from "./baseService.server";
6-
7-
import { PrismaClientOrTransaction, prisma } from "~/db.server";
85
import { isCancellableRunStatus } from "../taskStatus";
6+
import { BaseService } from "./baseService.server";
7+
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
98
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
109

1110
export class CancelAttemptService extends BaseService {
@@ -51,28 +50,25 @@ export class CancelAttemptService extends BaseService {
5150
return;
5251
}
5352

54-
await marqs?.acknowledgeMessage(taskRunId);
55-
56-
await this._prisma.taskRunAttempt.update({
57-
where: {
58-
friendlyId: attemptId,
59-
},
60-
data: {
61-
status: "CANCELED",
62-
completedAt: cancelledAt,
63-
taskRun: {
64-
update: {
65-
data: {
66-
status: isCancellableRunStatus(taskRunAttempt.taskRun.status)
67-
? "INTERRUPTED"
68-
: undefined,
69-
completedAt: isCancellableRunStatus(taskRunAttempt.taskRun.status)
70-
? cancelledAt
71-
: undefined,
72-
},
73-
},
53+
await $transaction(this._prisma, async (tx) => {
54+
await tx.taskRunAttempt.update({
55+
where: {
56+
friendlyId: attemptId,
7457
},
75-
},
58+
data: {
59+
status: "CANCELED",
60+
completedAt: cancelledAt,
61+
},
62+
});
63+
64+
const finalizeService = new FinalizeTaskRunService(tx);
65+
await finalizeService.call({
66+
id: taskRunId,
67+
status: isCancellableRunStatus(taskRunAttempt.taskRun.status) ? "INTERRUPTED" : undefined,
68+
completedAt: isCancellableRunStatus(taskRunAttempt.taskRun.status)
69+
? cancelledAt
70+
: undefined,
71+
});
7672
});
7773

7874
const inProgressEvents = await eventRepository.queryIncompleteEvents({

apps/webapp/app/v3/services/cancelTaskRun.server.ts

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
import { Prisma, TaskRun } from "@trigger.dev/database";
1+
import { type Prisma, type TaskRun } from "@trigger.dev/database";
22
import assertNever from "assert-never";
33
import { logger } from "~/services/logger.server";
4-
import { marqs } from "~/v3/marqs/index.server";
54
import { eventRepository } from "../eventRepository.server";
65
import { socketIo } from "../handleSocketIo.server";
76
import { devPubSub } from "../marqs/devPubSub.server";
7+
import { CANCELLABLE_ATTEMPT_STATUSES, isCancellableRunStatus } from "../taskStatus";
88
import { BaseService } from "./baseService.server";
99
import { CancelAttemptService } from "./cancelAttempt.server";
10-
import { CANCELLABLE_ATTEMPT_STATUSES, isCancellableRunStatus } from "../taskStatus";
1110
import { CancelTaskAttemptDependenciesService } from "./cancelTaskAttemptDependencies.server";
11+
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
1212

1313
type ExtendedTaskRun = Prisma.TaskRunGetPayload<{
1414
include: {
@@ -47,18 +47,11 @@ export class CancelTaskRunService extends BaseService {
4747
return;
4848
}
4949

50-
// Remove the task run from the queue if it's there for some reason
51-
await marqs?.acknowledgeMessage(taskRun.id);
52-
53-
// Set the task run status to cancelled
54-
const cancelledTaskRun = await this._prisma.taskRun.update({
55-
where: {
56-
id: taskRun.id,
57-
},
58-
data: {
59-
status: "CANCELED",
60-
completedAt: opts.cancelledAt,
61-
},
50+
const finalizeService = new FinalizeTaskRunService();
51+
const cancelledTaskRun = await finalizeService.call({
52+
id: taskRun.id,
53+
status: "CANCELED",
54+
completedAt: opts.cancelledAt,
6255
include: {
6356
attempts: {
6457
where: {

0 commit comments

Comments
 (0)