Skip to content

Commit 169cd5b

Browse files
committed
WIP
1 parent 8f43aec commit 169cd5b

File tree

7 files changed

+151
-12
lines changed

7 files changed

+151
-12
lines changed

apps/webapp/app/entry.server.tsx

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import {
1616
} from "./components/primitives/OperatingSystemProvider";
1717
import { getSharedSqsEventConsumer } from "./services/events/sqsEventConsumer";
1818
import { singleton } from "./utils/singleton";
19-
import { logger } from "./services/logger.server";
2019

2120
const ABORT_DELAY = 30000;
2221

apps/webapp/app/services/worker.server.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,18 @@ import { prisma } from "~/db.server";
55
import { env } from "~/env.server";
66
import { ZodWorker } from "~/platform/zodWorker.server";
77
import { eventRepository } from "~/v3/eventRepository.server";
8+
import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
9+
import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
10+
import { reportUsageEvent } from "~/v3/openMeter.server";
811
import { RequeueTaskRunService } from "~/v3/requeueTaskRun.server";
912
import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server";
1013
import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server";
1114
import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server";
1215
import { PerformBulkActionService } from "~/v3/services/bulk/performBulkAction.server";
16+
import { CancelTaskAttemptDependenciesService } from "~/v3/services/cancelTaskAttemptDependencies.server";
17+
import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server";
1318
import { ExecuteTasksWaitingForDeployService } from "~/v3/services/executeTasksWaitingForDeploy";
19+
import { ExpireEnqueuedRunService } from "~/v3/services/expireEnqueuedRun.server";
1420
import { IndexDeploymentService } from "~/v3/services/indexDeployment.server";
1521
import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server";
1622
import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server";
@@ -44,11 +50,6 @@ import { DeliverWebhookRequestService } from "./sources/deliverWebhookRequest.se
4450
import { PerformTaskOperationService } from "./tasks/performTaskOperation.server";
4551
import { ProcessCallbackTimeoutService } from "./tasks/processCallbackTimeout.server";
4652
import { ResumeTaskService } from "./tasks/resumeTask.server";
47-
import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
48-
import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
49-
import { reportUsageEvent } from "~/v3/openMeter.server";
50-
import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server";
51-
import { ExpireEnqueuedRunService } from "~/v3/services/expireEnqueuedRun.server";
5253

5354
const workerCatalog = {
5455
indexEndpoint: z.object({
@@ -185,6 +186,9 @@ const workerCatalog = {
185186
"v3.expireRun": z.object({
186187
runId: z.string(),
187188
}),
189+
"v3.cancelTaskAttemptDependencies": z.object({
190+
attemptId: z.string(),
191+
}),
188192
};
189193

190194
const executionWorkerCatalog = {
@@ -698,6 +702,15 @@ function getWorkerQueue() {
698702
return await service.call(payload.runId);
699703
},
700704
},
705+
"v3.cancelTaskAttemptDependencies": {
706+
priority: 0,
707+
maxAttempts: 8,
708+
handler: async (payload, job) => {
709+
const service = new CancelTaskAttemptDependenciesService();
710+
711+
return await service.call(payload.attemptId);
712+
},
713+
},
701714
},
702715
});
703716
}

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1411,7 +1411,9 @@ function filteredAttributes(attributes: Attributes, prefix: string): Attributes
14111411
}
14121412

14131413
function calculateDurationFromStart(startTime: bigint, endTime: Date = new Date()) {
1414-
return Number(BigInt(endTime.getTime() * 1_000_000) - startTime);
1414+
const $endtime = typeof endTime === "string" ? new Date(endTime) : endTime;
1415+
1416+
return Number(BigInt($endtime.getTime() * 1_000_000) - startTime);
14151417
}
14161418

14171419
function getNowInNanoseconds(): bigint {

apps/webapp/app/v3/services/cancelAttempt.server.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
2-
import { eventRepository } from "../eventRepository.server";
2+
import { logger } from "~/services/logger.server";
33
import { marqs } from "~/v3/marqs/index.server";
4+
import { eventRepository } from "../eventRepository.server";
45
import { BaseService } from "./baseService.server";
5-
import { logger } from "~/services/logger.server";
66

77
import { PrismaClientOrTransaction, prisma } from "~/db.server";
8-
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
98
import { isCancellableRunStatus } from "../taskStatus";
9+
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
1010

1111
export class CancelAttemptService extends BaseService {
1212
public async call(
@@ -43,6 +43,14 @@ export class CancelAttemptService extends BaseService {
4343
return;
4444
}
4545

46+
if (taskRunAttempt.status === "CANCELED") {
47+
logger.warn("Task run attempt is already cancelled", {
48+
attemptId,
49+
});
50+
51+
return;
52+
}
53+
4654
await marqs?.acknowledgeMessage(taskRunId);
4755

4856
await this._prisma.taskRunAttempt.update({
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import { PrismaClientOrTransaction } from "~/db.server";
2+
import { workerQueue } from "~/services/worker.server";
3+
import { BaseService } from "./baseService.server";
4+
import { logger } from "~/services/logger.server";
5+
import { CancelTaskRunService } from "./cancelTaskRun.server";
6+
7+
export class CancelTaskAttemptDependenciesService extends BaseService {
8+
public async call(attemptId: string) {
9+
const taskAttempt = await this._prisma.taskRunAttempt.findUnique({
10+
where: { id: attemptId },
11+
include: {
12+
dependencies: {
13+
include: {
14+
taskRun: true,
15+
},
16+
},
17+
},
18+
});
19+
20+
if (!taskAttempt) {
21+
return;
22+
}
23+
24+
if (!taskAttempt.dependencies.length) {
25+
return;
26+
}
27+
28+
if (taskAttempt.status !== "CANCELED") {
29+
logger.debug("Task attempt is not cancelled, continuing anyway", {
30+
attemptId,
31+
status: taskAttempt.status,
32+
});
33+
}
34+
35+
const cancelRunService = new CancelTaskRunService();
36+
37+
for (const dependency of taskAttempt.dependencies) {
38+
await cancelRunService.call(dependency.taskRun);
39+
}
40+
}
41+
42+
static async enqueue(attemptId: string, tx: PrismaClientOrTransaction, runAt?: Date) {
43+
return await workerQueue.enqueue(
44+
"v3.cancelTaskAttemptDependencies",
45+
{
46+
attemptId,
47+
},
48+
{
49+
tx,
50+
runAt,
51+
jobKey: `cancelTaskAttemptDependencies:${attemptId}`,
52+
}
53+
);
54+
}
55+
}

apps/webapp/app/v3/services/cancelTaskRun.server.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { devPubSub } from "../marqs/devPubSub.server";
88
import { BaseService } from "./baseService.server";
99
import { CancelAttemptService } from "./cancelAttempt.server";
1010
import { CANCELLABLE_ATTEMPT_STATUSES, isCancellableRunStatus } from "../taskStatus";
11+
import { CancelTaskAttemptDependenciesService } from "./cancelTaskAttemptDependencies.server";
1112

1213
type ExtendedTaskRun = Prisma.TaskRunGetPayload<{
1314
include: {
@@ -66,6 +67,16 @@ export class CancelTaskRunService extends BaseService {
6667
},
6768
include: {
6869
backgroundWorker: true,
70+
dependencies: {
71+
include: {
72+
taskRun: true,
73+
},
74+
},
75+
batchTaskRunItems: {
76+
include: {
77+
taskRun: true,
78+
},
79+
},
6980
},
7081
},
7182
runtimeEnvironment: true,
@@ -93,6 +104,18 @@ export class CancelTaskRunService extends BaseService {
93104
await this.#cancelRemainingRunWorkers(cancelledTaskRun);
94105
}
95106

107+
const cancelService = new CancelTaskRunService();
108+
109+
// Cancel any dependent task runs
110+
for (const attempt of cancelledTaskRun.attempts) {
111+
for (const dependency of attempt.dependencies) {
112+
await cancelService.call(dependency.taskRun, {
113+
...opts,
114+
reason: `Parent task run was cancelled`,
115+
});
116+
}
117+
}
118+
96119
return {
97120
id: cancelledTaskRun.id,
98121
};
@@ -103,6 +126,8 @@ export class CancelTaskRunService extends BaseService {
103126
attempts: ExtendedTaskRunAttempt[]
104127
) {
105128
for (const attempt of attempts) {
129+
await CancelTaskAttemptDependenciesService.enqueue(attempt.id, this._prisma);
130+
106131
if (run.runtimeEnvironment.type === "DEVELOPMENT") {
107132
// Signal the task run attempt to stop
108133
await devPubSub.publish(

references/v3-catalog/src/trigger/subtasks.ts

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ export const simpleChildTask = task({
1919
run: async (payload: { message: string }, { ctx }) => {
2020
logger.log("Simple child task payload", { payload, ctx });
2121

22-
await wait.for({ seconds: 6 });
22+
await wait.for({ seconds: 30 });
2323
},
2424
});
2525

@@ -162,7 +162,9 @@ export const deeplyNestedTaskChild = task({
162162
export const deeplyNestedTaskGrandchild = task({
163163
id: "deeply-nested-task-grandchild",
164164
run: async ({ message = "test" }: { message?: string }) => {
165-
await deeplyNestedTaskGreatGrandchild.triggerAndWait({ message: `${message} - 3` });
165+
await deeplyNestedTaskGreatGrandchild.batchTriggerAndWait(
166+
Array.from({ length: 100 }, (_, i) => ({ payload: { message: `${message} - ${i}` } }))
167+
);
166168

167169
return {
168170
hello: "world",
@@ -173,6 +175,41 @@ export const deeplyNestedTaskGrandchild = task({
173175
export const deeplyNestedTaskGreatGrandchild = task({
174176
id: "deeply-nested-task-great-grandchild",
175177
run: async ({ message = "test" }: { message?: string }) => {
178+
await new Promise((resolve) => setTimeout(resolve, 10000));
179+
180+
return {
181+
hello: "world",
182+
};
183+
},
184+
});
185+
186+
export const dependencyCancellationParent = task({
187+
id: "dependency-cancellation-parent",
188+
run: async ({ message = "test" }: { message?: string }) => {
189+
const handle = await dependencyCancellationChild.triggerAndWait({ message: `${message} - 1` });
190+
191+
return {
192+
hello: "world",
193+
};
194+
},
195+
});
196+
197+
export const dependencyCancellationChild = task({
198+
id: "dependency-cancellation-child",
199+
run: async ({ message = "test" }: { message?: string }) => {
200+
await dependencyCancellationGrandchild.triggerAndWait({ message: `${message} - 2` });
201+
202+
return {
203+
hello: "world",
204+
};
205+
},
206+
});
207+
208+
export const dependencyCancellationGrandchild = task({
209+
id: "dependency-cancellation-grandchild",
210+
run: async ({ message = "test" }: { message?: string }) => {
211+
await wait.for({ seconds: 30 });
212+
176213
return {
177214
hello: "world",
178215
};

0 commit comments

Comments
 (0)