Skip to content

Commit 478ce00

Browse files
authored
v3: task run status and canceling runs (#941)
* WIP task run status, revamped resuming task dependencies * Don’t select a span when toggling collapsing * Cancel runs and attempts, in prod and dev
1 parent fe9435f commit 478ce00

File tree

42 files changed

+1325
-364
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1325
-364
lines changed

apps/coordinator/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
"version": "0.0.1",
55
"description": "",
66
"main": "dist/index.cjs",
7-
"type": "module",
87
"scripts": {
98
"build": "npm run build:bundle",
109
"build:bundle": "esbuild src/index.ts --bundle --outfile=dist/index.mjs --platform=node --format=esm --target=esnext --banner:js=\"const require = createRequire(import.meta.url);\"",
@@ -31,4 +30,4 @@
3130
"tsx": "^4.7.0",
3231
"typescript": "^5.3.3"
3332
}
34-
}
33+
}

apps/coordinator/src/index.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,16 @@ class TaskCoordinator {
301301

302302
taskSocket.emit("RESUME_AFTER_DURATION", message);
303303
},
304+
REQUEST_ATTEMPT_CANCELLATION: async (message) => {
305+
const taskSocket = await this.#getAttemptSocket(message.attemptId);
306+
307+
if (!taskSocket) {
308+
logger.log("Socket for attempt not found", { attemptId: message.attemptId });
309+
return;
310+
}
311+
312+
taskSocket.emit("REQUEST_ATTEMPT_CANCELLATION", message);
313+
},
304314
},
305315
});
306316

@@ -385,11 +395,21 @@ class TaskCoordinator {
385395

386396
if (!executionAck) {
387397
logger.error("no execution ack", { attemptId: socket.data.attemptId });
398+
399+
socket.emit("REQUEST_EXIT", {
400+
version: "v1",
401+
});
402+
388403
return;
389404
}
390405

391406
if (!executionAck.success) {
392407
logger.error("execution unsuccessful", { attemptId: socket.data.attemptId });
408+
409+
socket.emit("REQUEST_EXIT", {
410+
version: "v1",
411+
});
412+
393413
return;
394414
}
395415

apps/coordinator/tsconfig.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@
99
"strict": true,
1010
"skipLibCheck": true,
1111
"paths": {
12-
"@trigger.dev/core/v3": ["../core/src/v3"],
13-
"@trigger.dev/core/v3/*": ["../core/src/v3/*"],
14-
"@trigger.dev/core-apps": ["../core-apps/src"],
15-
"@trigger.dev/core-apps/*": ["../core-apps/src/*"]
12+
"@trigger.dev/core/v3": ["../../packages/core/src/v3"],
13+
"@trigger.dev/core/v3/*": ["../../packages/core/src/v3/*"],
14+
"@trigger.dev/core-apps": ["../../packages/core-apps/src"],
15+
"@trigger.dev/core-apps/*": ["../../packages/core-apps/src/*"]
1616
}
1717
}
1818
}

apps/docker-provider/package.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
"version": "0.0.1",
55
"description": "",
66
"main": "dist/index.cjs",
7-
"type": "module",
87
"scripts": {
98
"build": "npm run build:bundle",
109
"build:bundle": "esbuild src/index.ts --bundle --outfile=dist/index.mjs --platform=node --format=esm --target=esnext --banner:js=\"const require = createRequire(import.meta.url);\"",
@@ -29,4 +28,4 @@
2928
"tsx": "^4.7.0",
3029
"typescript": "^5.3.3"
3130
}
32-
}
31+
}

apps/docker-provider/tsconfig.json

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
"strict": true,
88
"skipLibCheck": true,
99
"paths": {
10-
"@trigger.dev/core/v3": ["../core/src/v3"],
11-
"@trigger.dev/core/v3/*": ["../core/src/v3/*"],
12-
"@trigger.dev/core-apps": ["../core-apps/src"],
13-
"@trigger.dev/core-apps/*": ["../core-apps/src/*"]
10+
"@trigger.dev/core/v3": ["../../packages/core/src/v3"],
11+
"@trigger.dev/core/v3/*": ["../../packages/core/src/v3/*"],
12+
"@trigger.dev/core-apps": ["../../packages/core-apps/src"],
13+
"@trigger.dev/core-apps/*": ["../../packages/core-apps/src/*"]
1414
}
1515
}
1616
}

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam/route.tsx

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,6 @@ function TasksTreeView({
334334
onClick={(e) => {
335335
e.stopPropagation();
336336
toggleExpandNode(node.id);
337-
selectNode(node.id);
338337
scrollToNode(node.id);
339338
}}
340339
>

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.v3.$projectParam.test/route.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ export default function Page() {
5252
<div className={cn("grid h-full max-h-full grid-cols-1")}>
5353
<ResizablePanelGroup direction="horizontal" className="h-full max-h-full">
5454
<ResizablePanel order={1} minSize={20} defaultSize={30}>
55-
<div className="flex flex-col px-3">
55+
<div className="flex h-full max-h-full flex-col overflow-hidden px-3">
5656
{tasks.length === 0 ? (
5757
<NoTaskInstructions />
5858
) : (
@@ -81,7 +81,7 @@ function TaskSelector({ tasks }: { tasks: TaskListItem[] }) {
8181
const project = useProject();
8282

8383
return (
84-
<div className="flex flex-col divide-y divide-charcoal-800">
84+
<div className="flex flex-col divide-y divide-charcoal-800 overflow-y-auto">
8585
{tasks.map((t) => (
8686
<NavLink key={t.id} to={v3TestTaskPath(organization, project, t)}>
8787
{({ isActive, isPending }) => (
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
2+
import { json } from "@remix-run/server-runtime";
3+
import { z } from "zod";
4+
import { prisma } from "~/db.server";
5+
import { authenticateApiRequest } from "~/services/apiAuth.server";
6+
import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server";
7+
8+
const ParamsSchema = z.object({
9+
runParam: z.string(),
10+
});
11+
12+
export async function action({ request, params }: ActionFunctionArgs) {
13+
// Ensure this is a POST request
14+
if (request.method.toUpperCase() !== "POST") {
15+
return { status: 405, body: "Method Not Allowed" };
16+
}
17+
18+
// Authenticate the request
19+
const authenticationResult = await authenticateApiRequest(request);
20+
21+
if (!authenticationResult) {
22+
return json({ error: "Invalid or Missing API Key" }, { status: 401 });
23+
}
24+
25+
const parsed = ParamsSchema.safeParse(params);
26+
27+
if (!parsed.success) {
28+
return json({ error: "Invalid or Missing runId" }, { status: 400 });
29+
}
30+
31+
const { runParam } = parsed.data;
32+
33+
const taskRun = await prisma.taskRun.findUnique({
34+
where: {
35+
friendlyId: runParam,
36+
},
37+
});
38+
39+
if (!taskRun) {
40+
return json({ error: "Run not found" }, { status: 404 });
41+
}
42+
43+
const service = new CancelTaskRunService();
44+
45+
try {
46+
await service.call(taskRun);
47+
} catch (error) {
48+
return json({ error: "Internal Server Error" }, { status: 500 });
49+
}
50+
51+
return json({ message: "Run cancelled" }, { status: 200 });
52+
}

apps/webapp/app/services/worker.server.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ import { DeliverWebhookRequestService } from "./sources/deliverWebhookRequest.se
3030
import { PerformTaskOperationService } from "./tasks/performTaskOperation.server";
3131
import { ProcessCallbackTimeoutService } from "./tasks/processCallbackTimeout.server";
3232
import { ResumeTaskService } from "./tasks/resumeTask.server";
33+
import { ResumeTaskRunDependenciesService } from "~/v3/services/resumeTaskRunDependencies.server";
34+
import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server";
35+
import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server";
3336

3437
const workerCatalog = {
3538
indexEndpoint: z.object({
@@ -107,6 +110,17 @@ const workerCatalog = {
107110
"v3.indexDeployment": z.object({
108111
id: z.string(),
109112
}),
113+
"v3.resumeTaskRunDependencies": z.object({
114+
attemptId: z.string(),
115+
}),
116+
"v3.resumeBatchRun": z.object({
117+
batchRunId: z.string(),
118+
sourceTaskAttemptId: z.string(),
119+
}),
120+
"v3.resumeTaskDependency": z.object({
121+
dependencyId: z.string(),
122+
sourceTaskAttemptId: z.string(),
123+
}),
110124
};
111125

112126
const executionWorkerCatalog = {
@@ -443,6 +457,33 @@ function getWorkerQueue() {
443457
return await service.call(payload.id);
444458
},
445459
},
460+
"v3.resumeTaskRunDependencies": {
461+
priority: 0,
462+
maxAttempts: 5,
463+
handler: async (payload, job) => {
464+
const service = new ResumeTaskRunDependenciesService();
465+
466+
return await service.call(payload.attemptId);
467+
},
468+
},
469+
"v3.resumeBatchRun": {
470+
priority: 0,
471+
maxAttempts: 5,
472+
handler: async (payload, job) => {
473+
const service = new ResumeBatchRunService();
474+
475+
return await service.call(payload.batchRunId, payload.sourceTaskAttemptId);
476+
},
477+
},
478+
"v3.resumeTaskDependency": {
479+
priority: 0,
480+
maxAttempts: 5,
481+
handler: async (payload, job) => {
482+
const service = new ResumeTaskDependencyService();
483+
484+
return await service.call(payload.dependencyId, payload.sourceTaskAttemptId);
485+
},
486+
},
446487
},
447488
});
448489
}

apps/webapp/app/v3/authenticatedSocketConnection.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1010
import { logger } from "~/services/logger.server";
1111
import { DevQueueConsumer } from "./marqs/devQueueConsumer.server";
1212
import type { WebSocket, MessageEvent, CloseEvent, ErrorEvent } from "ws";
13+
import { env } from "~/env.server";
1314

1415
export class AuthenticatedSocketConnection {
1516
public id: string;
@@ -26,6 +27,10 @@ export class AuthenticatedSocketConnection {
2627
schema: serverWebsocketMessages,
2728
sender: async (message) => {
2829
return new Promise((resolve, reject) => {
30+
if (!ws.OPEN) {
31+
return reject(new Error("Websocket is not open"));
32+
}
33+
2934
ws.send(JSON.stringify(message), {}, (err) => {
3035
if (err) {
3136
reject(err);
@@ -84,6 +89,8 @@ export class AuthenticatedSocketConnection {
8489
}
8590

8691
async #handleClose(ev: CloseEvent) {
92+
logger.debug("[AuthenticatedSocketConnection] Websocket closed", { ev });
93+
8794
await this._consumer.stop();
8895

8996
this.onClose.post(ev);

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ export type TraceAttributes = Partial<
4444
CreatableEvent,
4545
| "attemptId"
4646
| "isError"
47+
| "isCancelled"
4748
| "runId"
4849
| "runIsTest"
4950
| "output"
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import { z } from "zod";
2+
import { singleton } from "~/utils/singleton";
3+
import { ZodPubSub, ZodSubscriber } from "../utils/zodPubSub.server";
4+
import { env } from "~/env.server";
5+
6+
const messageCatalog = {
7+
CANCEL_ATTEMPT: z.object({
8+
version: z.literal("v1").default("v1"),
9+
backgroundWorkerId: z.string(),
10+
attemptId: z.string(),
11+
taskRunId: z.string(),
12+
}),
13+
};
14+
15+
export type DevSubscriber = ZodSubscriber<typeof messageCatalog>;
16+
17+
export const devPubSub = singleton("devPubSub", initializeDevPubSub);
18+
19+
function initializeDevPubSub() {
20+
return new ZodPubSub({
21+
redis: {
22+
port: env.REDIS_PORT,
23+
host: env.REDIS_HOST,
24+
username: env.REDIS_USERNAME,
25+
password: env.REDIS_PASSWORD,
26+
enableAutoPipelining: true,
27+
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
28+
},
29+
schema: {
30+
CANCEL_ATTEMPT: z.object({
31+
version: z.literal("v1").default("v1"),
32+
backgroundWorkerId: z.string(),
33+
attemptId: z.string(),
34+
taskRunId: z.string(),
35+
}),
36+
},
37+
});
38+
}

0 commit comments

Comments
 (0)