|
8 | 8 | flattenAttributes,
|
9 | 9 | sanitizeError,
|
10 | 10 | } from "@trigger.dev/core/v3";
|
11 |
| -import { PrismaClientOrTransaction } from "~/db.server"; |
| 11 | +import { $transaction, PrismaClientOrTransaction } from "~/db.server"; |
12 | 12 | import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
|
13 | 13 | import { logger } from "~/services/logger.server";
|
14 | 14 | import { safeJsonParse } from "~/utils/json";
|
@@ -111,49 +111,27 @@ export class CompleteAttemptService extends BaseService {
|
111 | 111 | taskRunAttempt: NonNullable<FoundAttempt>,
|
112 | 112 | env?: AuthenticatedEnvironment
|
113 | 113 | ): Promise<"COMPLETED"> {
|
114 |
| - /* |
115 |
| - "COMPLETED" |
116 |
| -
|
117 |
| - Steps: |
118 |
| - 1. Updates the run *attempt* to completed AND the run to completed successfully |
119 |
| - 2. marqs ack |
120 |
| - 3. Complete the run span OTEL event |
121 |
| - 4. Resume the task dependencies |
122 |
| -
|
123 |
| - Inputs: |
124 |
| - - taskRunAttempt: id |
125 |
| - - taskRun: id, spanId |
126 |
| - - output |
127 |
| - - outputType |
128 |
| - - usage |
129 |
| - - Prisma client/transaction |
130 |
| -
|
131 |
| - Questions: |
132 |
| - - Why do we ack after the db update? |
133 |
| - */ |
| 114 | + const finalizeService = new FinalizeTaskRunService(); |
134 | 115 |
|
135 |
| - await this._prisma.taskRunAttempt.update({ |
136 |
| - where: { id: taskRunAttempt.id }, |
137 |
| - data: { |
138 |
| - status: "COMPLETED", |
139 |
| - completedAt: new Date(), |
140 |
| - output: completion.output, |
141 |
| - outputType: completion.outputType, |
142 |
| - usageDurationMs: completion.usage?.durationMs, |
143 |
| - taskRun: { |
144 |
| - update: { |
145 |
| - data: { |
146 |
| - status: "COMPLETED_SUCCESSFULLY", |
147 |
| - completedAt: new Date(), |
148 |
| - }, |
149 |
| - }, |
| 116 | + await $transaction(this._prisma, async (tx) => { |
| 117 | + await tx.taskRunAttempt.update({ |
| 118 | + where: { id: taskRunAttempt.id }, |
| 119 | + data: { |
| 120 | + status: "COMPLETED", |
| 121 | + completedAt: new Date(), |
| 122 | + output: completion.output, |
| 123 | + outputType: completion.outputType, |
| 124 | + usageDurationMs: completion.usage?.durationMs, |
150 | 125 | },
|
151 |
| - }, |
152 |
| - }); |
153 |
| - |
154 |
| - logger.debug("Completed attempt successfully, ACKing message"); |
| 126 | + }); |
155 | 127 |
|
156 |
| - await marqs?.acknowledgeMessage(taskRunAttempt.taskRunId); |
| 128 | + await finalizeService.call({ |
| 129 | + tx, |
| 130 | + id: taskRunAttempt.taskRun.id, |
| 131 | + status: "COMPLETED_SUCCESSFULLY", |
| 132 | + completedAt: new Date(), |
| 133 | + }); |
| 134 | + }); |
157 | 135 |
|
158 | 136 | // Now we need to "complete" the task run event/span
|
159 | 137 | await eventRepository.completeEvent(taskRunAttempt.taskRun.spanId, {
|
|
0 commit comments