Skip to content

Commit 41ec450

Browse files
committed
Revert "Remove the RequeueTaskRunService"
This reverts commit 3421f5e.
1 parent 9a9bdc2 commit 41ec450

File tree

1 file changed

+84
-3
lines changed

1 file changed

+84
-3
lines changed

apps/webapp/app/v3/requeueTaskRun.server.ts

Lines changed: 84 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,90 @@ import { PrismaClientOrTransaction } from "~/db.server";
88
import { workerQueue } from "~/services/worker.server";
99

1010
export class RequeueTaskRunService extends BaseService {
11-
public async call(runId: string) {}
11+
public async call(runId: string) {
12+
const taskRun = await this._prisma.taskRun.findUnique({
13+
where: { id: runId },
14+
});
1215

13-
public static async enqueue(runId: string, runAt?: Date, tx?: PrismaClientOrTransaction) {}
16+
if (!taskRun) {
17+
logger.error("[RequeueTaskRunService] Task run not found", {
18+
runId,
19+
});
1420

15-
public static async dequeue(runId: string, tx?: PrismaClientOrTransaction) {}
21+
return;
22+
}
23+
24+
switch (taskRun.status) {
25+
case "PENDING": {
26+
logger.debug("[RequeueTaskRunService] Requeueing task run", { taskRun });
27+
28+
await marqs?.nackMessage(taskRun.id);
29+
30+
break;
31+
}
32+
case "EXECUTING":
33+
case "RETRYING_AFTER_FAILURE": {
34+
logger.debug("[RequeueTaskRunService] Failing task run", { taskRun });
35+
36+
const service = new FailedTaskRunService();
37+
38+
await service.call(taskRun.friendlyId, {
39+
ok: false,
40+
id: taskRun.friendlyId,
41+
retry: undefined,
42+
error: {
43+
type: "INTERNAL_ERROR",
44+
code: "TASK_RUN_HEARTBEAT_TIMEOUT",
45+
message: "Did not receive a heartbeat from the worker in time",
46+
},
47+
});
48+
49+
break;
50+
}
51+
case "DELAYED":
52+
case "WAITING_FOR_DEPLOY": {
53+
logger.debug("[RequeueTaskRunService] Removing task run from queue", { taskRun });
54+
55+
await marqs?.acknowledgeMessage(taskRun.id);
56+
57+
break;
58+
}
59+
case "WAITING_TO_RESUME":
60+
case "PAUSED": {
61+
logger.debug("[RequeueTaskRunService] Requeueing task run", { taskRun });
62+
63+
await marqs?.nackMessage(taskRun.id);
64+
65+
break;
66+
}
67+
case "SYSTEM_FAILURE":
68+
case "INTERRUPTED":
69+
case "CRASHED":
70+
case "COMPLETED_WITH_ERRORS":
71+
case "COMPLETED_SUCCESSFULLY":
72+
case "EXPIRED":
73+
case "CANCELED": {
74+
logger.debug("[RequeueTaskRunService] Task run is completed", { taskRun });
75+
76+
await marqs?.acknowledgeMessage(taskRun.id);
77+
78+
break;
79+
}
80+
default: {
81+
assertNever(taskRun.status);
82+
}
83+
}
84+
}
85+
86+
public static async enqueue(runId: string, runAt?: Date, tx?: PrismaClientOrTransaction) {
87+
return await workerQueue.enqueue(
88+
"v3.requeueTaskRun",
89+
{ runId },
90+
{ runAt, jobKey: `requeueTaskRun:${runId}` }
91+
);
92+
}
93+
94+
public static async dequeue(runId: string, tx?: PrismaClientOrTransaction) {
95+
return await workerQueue.dequeue(`requeueTaskRun:${runId}`, { tx });
96+
}
1697
}

0 commit comments

Comments
 (0)