Skip to content

v3: task run status and canceling runs #941

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions apps/coordinator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"version": "0.0.1",
"description": "",
"main": "dist/index.cjs",
"type": "module",
"scripts": {
"build": "npm run build:bundle",
"build:bundle": "esbuild src/index.ts --bundle --outfile=dist/index.mjs --platform=node --format=esm --target=esnext --banner:js=\"const require = createRequire(import.meta.url);\"",
Expand All @@ -31,4 +30,4 @@
"tsx": "^4.7.0",
"typescript": "^5.3.3"
}
}
}
20 changes: 20 additions & 0 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,16 @@ class TaskCoordinator {

taskSocket.emit("RESUME_AFTER_DURATION", message);
},
REQUEST_ATTEMPT_CANCELLATION: async (message) => {
const taskSocket = await this.#getAttemptSocket(message.attemptId);

if (!taskSocket) {
logger.log("Socket for attempt not found", { attemptId: message.attemptId });
return;
}

taskSocket.emit("REQUEST_ATTEMPT_CANCELLATION", message);
},
},
});

Expand Down Expand Up @@ -385,11 +395,21 @@ class TaskCoordinator {

if (!executionAck) {
logger.error("no execution ack", { attemptId: socket.data.attemptId });

socket.emit("REQUEST_EXIT", {
version: "v1",
});

return;
}

if (!executionAck.success) {
logger.error("execution unsuccessful", { attemptId: socket.data.attemptId });

socket.emit("REQUEST_EXIT", {
version: "v1",
});

return;
}

Expand Down
8 changes: 4 additions & 4 deletions apps/coordinator/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
"strict": true,
"skipLibCheck": true,
"paths": {
"@trigger.dev/core/v3": ["../core/src/v3"],
"@trigger.dev/core/v3/*": ["../core/src/v3/*"],
"@trigger.dev/core-apps": ["../core-apps/src"],
"@trigger.dev/core-apps/*": ["../core-apps/src/*"]
"@trigger.dev/core/v3": ["../../packages/core/src/v3"],
"@trigger.dev/core/v3/*": ["../../packages/core/src/v3/*"],
"@trigger.dev/core-apps": ["../../packages/core-apps/src"],
"@trigger.dev/core-apps/*": ["../../packages/core-apps/src/*"]
}
}
}
3 changes: 1 addition & 2 deletions apps/docker-provider/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"version": "0.0.1",
"description": "",
"main": "dist/index.cjs",
"type": "module",
"scripts": {
"build": "npm run build:bundle",
"build:bundle": "esbuild src/index.ts --bundle --outfile=dist/index.mjs --platform=node --format=esm --target=esnext --banner:js=\"const require = createRequire(import.meta.url);\"",
Expand All @@ -29,4 +28,4 @@
"tsx": "^4.7.0",
"typescript": "^5.3.3"
}
}
}
8 changes: 4 additions & 4 deletions apps/docker-provider/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
"strict": true,
"skipLibCheck": true,
"paths": {
"@trigger.dev/core/v3": ["../core/src/v3"],
"@trigger.dev/core/v3/*": ["../core/src/v3/*"],
"@trigger.dev/core-apps": ["../core-apps/src"],
"@trigger.dev/core-apps/*": ["../core-apps/src/*"]
"@trigger.dev/core/v3": ["../../packages/core/src/v3"],
"@trigger.dev/core/v3/*": ["../../packages/core/src/v3/*"],
"@trigger.dev/core-apps": ["../../packages/core-apps/src"],
"@trigger.dev/core-apps/*": ["../../packages/core-apps/src/*"]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ function TasksTreeView({
onClick={(e) => {
e.stopPropagation();
toggleExpandNode(node.id);
selectNode(node.id);
scrollToNode(node.id);
}}
>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export default function Page() {
<div className={cn("grid h-full max-h-full grid-cols-1")}>
<ResizablePanelGroup direction="horizontal" className="h-full max-h-full">
<ResizablePanel order={1} minSize={20} defaultSize={30}>
<div className="flex flex-col px-3">
<div className="flex h-full max-h-full flex-col overflow-hidden px-3">
{tasks.length === 0 ? (
<NoTaskInstructions />
) : (
Expand Down Expand Up @@ -81,7 +81,7 @@ function TaskSelector({ tasks }: { tasks: TaskListItem[] }) {
const project = useProject();

return (
<div className="flex flex-col divide-y divide-charcoal-800">
<div className="flex flex-col divide-y divide-charcoal-800 overflow-y-auto">
{tasks.map((t) => (
<NavLink key={t.id} to={v3TestTaskPath(organization, project, t)}>
{({ isActive, isPending }) => (
Expand Down
52 changes: 52 additions & 0 deletions apps/webapp/app/routes/api.v2.runs.$runParam.cancel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { prisma } from "~/db.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server";

const ParamsSchema = z.object({
runParam: z.string(),
});

export async function action({ request, params }: ActionFunctionArgs) {
// Ensure this is a POST request
if (request.method.toUpperCase() !== "POST") {
return { status: 405, body: "Method Not Allowed" };
}

// Authenticate the request
const authenticationResult = await authenticateApiRequest(request);

if (!authenticationResult) {
return json({ error: "Invalid or Missing API Key" }, { status: 401 });
}

const parsed = ParamsSchema.safeParse(params);

if (!parsed.success) {
return json({ error: "Invalid or Missing runId" }, { status: 400 });
}

const { runParam } = parsed.data;

const taskRun = await prisma.taskRun.findUnique({
where: {
friendlyId: runParam,
},
});

if (!taskRun) {
return json({ error: "Run not found" }, { status: 404 });
}

const service = new CancelTaskRunService();

try {
await service.call(taskRun);
} catch (error) {
return json({ error: "Internal Server Error" }, { status: 500 });
}

return json({ message: "Run cancelled" }, { status: 200 });
}
41 changes: 41 additions & 0 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import { DeliverWebhookRequestService } from "./sources/deliverWebhookRequest.se
import { PerformTaskOperationService } from "./tasks/performTaskOperation.server";
import { ProcessCallbackTimeoutService } from "./tasks/processCallbackTimeout.server";
import { ResumeTaskService } from "./tasks/resumeTask.server";
import { ResumeTaskRunDependenciesService } from "~/v3/services/resumeTaskRunDependencies.server";
import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server";
import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server";

const workerCatalog = {
indexEndpoint: z.object({
Expand Down Expand Up @@ -107,6 +110,17 @@ const workerCatalog = {
"v3.indexDeployment": z.object({
id: z.string(),
}),
"v3.resumeTaskRunDependencies": z.object({
attemptId: z.string(),
}),
"v3.resumeBatchRun": z.object({
batchRunId: z.string(),
sourceTaskAttemptId: z.string(),
}),
"v3.resumeTaskDependency": z.object({
dependencyId: z.string(),
sourceTaskAttemptId: z.string(),
}),
};

const executionWorkerCatalog = {
Expand Down Expand Up @@ -443,6 +457,33 @@ function getWorkerQueue() {
return await service.call(payload.id);
},
},
"v3.resumeTaskRunDependencies": {
priority: 0,
maxAttempts: 5,
handler: async (payload, job) => {
const service = new ResumeTaskRunDependenciesService();

return await service.call(payload.attemptId);
},
},
"v3.resumeBatchRun": {
priority: 0,
maxAttempts: 5,
handler: async (payload, job) => {
const service = new ResumeBatchRunService();

return await service.call(payload.batchRunId, payload.sourceTaskAttemptId);
},
},
"v3.resumeTaskDependency": {
priority: 0,
maxAttempts: 5,
handler: async (payload, job) => {
const service = new ResumeTaskDependencyService();

return await service.call(payload.dependencyId, payload.sourceTaskAttemptId);
},
},
},
});
}
Expand Down
7 changes: 7 additions & 0 deletions apps/webapp/app/v3/authenticatedSocketConnection.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { DevQueueConsumer } from "./marqs/devQueueConsumer.server";
import type { WebSocket, MessageEvent, CloseEvent, ErrorEvent } from "ws";
import { env } from "~/env.server";

export class AuthenticatedSocketConnection {
public id: string;
Expand All @@ -26,6 +27,10 @@ export class AuthenticatedSocketConnection {
schema: serverWebsocketMessages,
sender: async (message) => {
return new Promise((resolve, reject) => {
if (!ws.OPEN) {
return reject(new Error("Websocket is not open"));
}

ws.send(JSON.stringify(message), {}, (err) => {
if (err) {
reject(err);
Expand Down Expand Up @@ -84,6 +89,8 @@ export class AuthenticatedSocketConnection {
}

async #handleClose(ev: CloseEvent) {
logger.debug("[AuthenticatedSocketConnection] Websocket closed", { ev });

await this._consumer.stop();

this.onClose.post(ev);
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export type TraceAttributes = Partial<
CreatableEvent,
| "attemptId"
| "isError"
| "isCancelled"
| "runId"
| "runIsTest"
| "output"
Expand Down
38 changes: 38 additions & 0 deletions apps/webapp/app/v3/marqs/devPubSub.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { z } from "zod";
import { singleton } from "~/utils/singleton";
import { ZodPubSub, ZodSubscriber } from "../utils/zodPubSub.server";
import { env } from "~/env.server";

const messageCatalog = {
CANCEL_ATTEMPT: z.object({
version: z.literal("v1").default("v1"),
backgroundWorkerId: z.string(),
attemptId: z.string(),
taskRunId: z.string(),
}),
};

export type DevSubscriber = ZodSubscriber<typeof messageCatalog>;

export const devPubSub = singleton("devPubSub", initializeDevPubSub);

function initializeDevPubSub() {
return new ZodPubSub({
redis: {
port: env.REDIS_PORT,
host: env.REDIS_HOST,
username: env.REDIS_USERNAME,
password: env.REDIS_PASSWORD,
enableAutoPipelining: true,
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
},
schema: {
CANCEL_ATTEMPT: z.object({
version: z.literal("v1").default("v1"),
backgroundWorkerId: z.string(),
attemptId: z.string(),
taskRunId: z.string(),
}),
},
});
}
Loading