|
1 |
| -import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; |
| 1 | +import { $transaction, type PrismaClientOrTransaction, prisma } from "~/db.server"; |
| 2 | +import { type AuthenticatedEnvironment } from "~/services/apiAuth.server"; |
2 | 3 | import { logger } from "~/services/logger.server";
|
3 |
| -import { marqs } from "~/v3/marqs/index.server"; |
4 | 4 | import { eventRepository } from "../eventRepository.server";
|
5 |
| -import { BaseService } from "./baseService.server"; |
6 |
| - |
7 |
| -import { PrismaClientOrTransaction, prisma } from "~/db.server"; |
8 | 5 | import { isCancellableRunStatus } from "../taskStatus";
|
| 6 | +import { BaseService } from "./baseService.server"; |
| 7 | +import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; |
9 | 8 | import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
|
10 | 9 |
|
11 | 10 | export class CancelAttemptService extends BaseService {
|
@@ -51,45 +50,27 @@ export class CancelAttemptService extends BaseService {
|
51 | 50 | return;
|
52 | 51 | }
|
53 | 52 |
|
54 |
| - /* |
55 |
| - "INTERRUPTED" (or leave it as is) |
56 |
| - |
57 |
| - Steps: |
58 |
| - 1. marqs ack |
59 |
| - 2. Updates the run *attempt* to canceled AND potentially the run to INTERRUPTED |
60 |
| - 3. Cancels all incomplete OTEL events |
61 |
| - 4. Enqueues resuming task run dependencies |
62 |
| -
|
63 |
| - Inputs: |
64 |
| - - taskRun: id, status, friendlyId |
65 |
| - - taskRunAttempt: friendlyId |
66 |
| - - cancelledAt |
67 |
| - - reason |
68 |
| - - Prisma client/transaction |
69 |
| - */ |
70 |
| - |
71 |
| - await marqs?.acknowledgeMessage(taskRunId); |
72 |
| - |
73 |
| - await this._prisma.taskRunAttempt.update({ |
74 |
| - where: { |
75 |
| - friendlyId: attemptId, |
76 |
| - }, |
77 |
| - data: { |
78 |
| - status: "CANCELED", |
79 |
| - completedAt: cancelledAt, |
80 |
| - taskRun: { |
81 |
| - update: { |
82 |
| - data: { |
83 |
| - status: isCancellableRunStatus(taskRunAttempt.taskRun.status) |
84 |
| - ? "INTERRUPTED" |
85 |
| - : undefined, |
86 |
| - completedAt: isCancellableRunStatus(taskRunAttempt.taskRun.status) |
87 |
| - ? cancelledAt |
88 |
| - : undefined, |
89 |
| - }, |
90 |
| - }, |
| 53 | + const finalizeService = new FinalizeTaskRunService(); |
| 54 | + |
| 55 | + await $transaction(this._prisma, async (tx) => { |
| 56 | + await tx.taskRunAttempt.update({ |
| 57 | + where: { |
| 58 | + friendlyId: attemptId, |
91 | 59 | },
|
92 |
| - }, |
| 60 | + data: { |
| 61 | + status: "CANCELED", |
| 62 | + completedAt: cancelledAt, |
| 63 | + }, |
| 64 | + }); |
| 65 | + |
| 66 | + await finalizeService.call({ |
| 67 | + tx, |
| 68 | + id: taskRunId, |
| 69 | + status: isCancellableRunStatus(taskRunAttempt.taskRun.status) ? "INTERRUPTED" : undefined, |
| 70 | + completedAt: isCancellableRunStatus(taskRunAttempt.taskRun.status) |
| 71 | + ? cancelledAt |
| 72 | + : undefined, |
| 73 | + }); |
93 | 74 | });
|
94 | 75 |
|
95 | 76 | const inProgressEvents = await eventRepository.queryIncompleteEvents({
|
|
0 commit comments