Skip to content

Commit 584c7da

Browse files
authored
v3: prod worker graceful shutdown (#1034)
* graceful exit with timeout * handle and display graceful timeout errors * fix for very long waits * changeset * increase termination grace period to an hour
1 parent c9e1a3e commit 584c7da

File tree

10 files changed

+150
-20
lines changed

10 files changed

+150
-20
lines changed

.changeset/tiny-elephants-scream.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"trigger.dev": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
- Add graceful exit for prod workers
7+
- Prevent overflow in long waits

apps/kubernetes-provider/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ class KubernetesTaskOperations implements TaskOperations {
133133
},
134134
spec: {
135135
...this.#defaultPodSpec,
136+
terminationGracePeriodSeconds: 60 * 60,
136137
containers: [
137138
{
138139
name: this.#getRunContainerName(opts.runId),

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -248,14 +248,52 @@ export class CompleteAttemptService extends BaseService {
248248
},
249249
});
250250

251-
await this._prisma.taskRun.update({
252-
where: {
253-
id: taskRunAttempt.taskRunId,
254-
},
255-
data: {
256-
status: "COMPLETED_WITH_ERRORS",
257-
},
258-
});
251+
if (
252+
completion.error.type === "INTERNAL_ERROR" &&
253+
completion.error.code === "GRACEFUL_EXIT_TIMEOUT"
254+
) {
255+
// We need to fail all incomplete spans
256+
const inProgressEvents = await eventRepository.queryIncompleteEvents({
257+
attemptId: execution.attempt.id,
258+
});
259+
260+
logger.debug("Failing in-progress events", {
261+
inProgressEvents: inProgressEvents.map((event) => event.id),
262+
});
263+
264+
const exception = {
265+
type: "Graceful exit timeout",
266+
message: completion.error.message,
267+
};
268+
269+
await Promise.all(
270+
inProgressEvents.map((event) => {
271+
return eventRepository.crashEvent({
272+
event: event,
273+
crashedAt: new Date(),
274+
exception,
275+
});
276+
})
277+
);
278+
279+
await this._prisma.taskRun.update({
280+
where: {
281+
id: taskRunAttempt.taskRunId,
282+
},
283+
data: {
284+
status: "SYSTEM_FAILURE",
285+
},
286+
});
287+
} else {
288+
await this._prisma.taskRun.update({
289+
where: {
290+
id: taskRunAttempt.taskRunId,
291+
},
292+
data: {
293+
status: "COMPLETED_WITH_ERRORS",
294+
},
295+
});
296+
}
259297

260298
if (!env || env.type !== "DEVELOPMENT") {
261299
await ResumeTaskRunDependenciesService.enqueue(taskRunAttempt.id, this._prisma);

packages/cli-v3/src/workers/prod/entry-point.ts

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import {
1111
import { HttpReply, SimpleLogger, getRandomPortNumber } from "@trigger.dev/core-apps";
1212
import { readFile } from "node:fs/promises";
1313
import { createServer } from "node:http";
14-
import { z } from "zod";
1514
import { ProdBackgroundWorker } from "./backgroundWorker";
1615
import { TaskMetadataParseError, UncaughtExceptionError } from "../common/errors";
1716
import { setTimeout } from "node:timers/promises";
@@ -58,6 +57,8 @@ class ProdWorker {
5857
port: number,
5958
private host = "0.0.0.0"
6059
) {
60+
process.on("SIGTERM", this.#handleSignal.bind(this, "SIGTERM"));
61+
6162
this.#coordinatorSocket = this.#createCoordinatorSocket(COORDINATOR_HOST);
6263

6364
this.#backgroundWorker = new ProdBackgroundWorker("worker.js", {
@@ -150,6 +151,36 @@ class ProdWorker {
150151
this.#httpServer = this.#createHttpServer();
151152
}
152153

154+
async #handleSignal(signal: NodeJS.Signals) {
155+
logger.log("Received signal", { signal });
156+
157+
if (signal === "SIGTERM") {
158+
if (this.executing) {
159+
const terminationGracePeriodSeconds = 60 * 60;
160+
161+
logger.log("Waiting for attempt to complete before exiting", {
162+
terminationGracePeriodSeconds,
163+
});
164+
165+
// Wait for termination grace period minus 5s to give cleanup a chance to complete
166+
await setTimeout(terminationGracePeriodSeconds * 1000 - 5000);
167+
168+
logger.log("Termination timeout reached, exiting gracefully.");
169+
} else {
170+
logger.log("Not executing, exiting immediately.");
171+
}
172+
173+
await this.#exitGracefully();
174+
}
175+
176+
logger.log("Unhandled signal", { signal });
177+
}
178+
179+
async #exitGracefully() {
180+
await this.#backgroundWorker.close();
181+
process.exit(0);
182+
}
183+
153184
async #reconnect(isPostStart = false, reconnectImmediately = false) {
154185
if (isPostStart) {
155186
this.waitForPostStart = false;
@@ -206,8 +237,7 @@ class ProdWorker {
206237
logger.log("WARNING: Will checkpoint but also requested exit. This won't end well.");
207238
}
208239

209-
await this.#backgroundWorker.close();
210-
process.exit(0);
240+
await this.#exitGracefully();
211241
}
212242

213243
this.executing = false;
@@ -605,7 +635,6 @@ class ProdWorker {
605635
break;
606636
}
607637
}
608-
logger.log("preStop", { url: req.url });
609638

610639
return reply.text("preStop ok");
611640
}

packages/cli-v3/src/workers/prod/worker-facade.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,23 @@ const zodIpc = new ZodIpcConnection({
175175
CLEANUP: async ({ flush, kill }, sender) => {
176176
if (kill) {
177177
await tracingSDK.flush();
178+
179+
if (_execution) {
180+
// Fail currently executing attempt
181+
await sender.send("TASK_RUN_COMPLETED", {
182+
execution: _execution,
183+
result: {
184+
ok: false,
185+
id: _execution.attempt.id,
186+
error: {
187+
type: "INTERNAL_ERROR",
188+
code: TaskRunErrorCodes.GRACEFUL_EXIT_TIMEOUT,
189+
message: "Worker process killed while attempt in progress.",
190+
},
191+
},
192+
});
193+
}
194+
178195
// Now we need to exit the process
179196
await sender.send("READY_TO_DISPOSE", undefined);
180197
} else {
@@ -186,6 +203,9 @@ const zodIpc = new ZodIpcConnection({
186203
},
187204
});
188205

206+
// Ignore SIGTERM, handled by entry point
207+
process.on("SIGTERM", async () => {});
208+
189209
const prodRuntimeManager = new ProdRuntimeManager(zodIpc, {
190210
waitThresholdInMs: parseInt(process.env.TRIGGER_RUNTIME_WAIT_THRESHOLD_IN_MS ?? "30000", 10),
191211
});

packages/core/src/v3/runtime/devRuntimeManager.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
TaskRunExecutionResult,
66
} from "../schemas";
77
import { RuntimeManager } from "./manager";
8+
import { unboundedTimeout } from "../utils/timers";
89

910
export class DevRuntimeManager implements RuntimeManager {
1011
_taskWaits: Map<
@@ -24,15 +25,11 @@ export class DevRuntimeManager implements RuntimeManager {
2425
}
2526

2627
async waitForDuration(ms: number): Promise<void> {
27-
return new Promise((resolve) => {
28-
setTimeout(resolve, ms);
29-
});
28+
await unboundedTimeout(ms);
3029
}
3130

3231
async waitUntil(date: Date): Promise<void> {
33-
return new Promise((resolve) => {
34-
setTimeout(resolve, date.getTime() - Date.now());
35-
});
32+
return this.waitForDuration(date.getTime() - Date.now());
3633
}
3734

3835
async waitForTask(params: { id: string; ctx: TaskRunContext }): Promise<TaskRunExecutionResult> {

packages/core/src/v3/runtime/prodRuntimeManager.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
} from "../schemas";
1111
import { ZodIpcConnection } from "../zodIpc";
1212
import { RuntimeManager } from "./manager";
13+
import { unboundedTimeout } from "../utils/timers";
1314

1415
export type ProdRuntimeManagerOptions = {
1516
waitThresholdInMs?: number;
@@ -43,7 +44,7 @@ export class ProdRuntimeManager implements RuntimeManager {
4344
async waitForDuration(ms: number): Promise<void> {
4445
const now = Date.now();
4546

46-
const resolveAfterDuration = setTimeout(ms, "duration" as const);
47+
const resolveAfterDuration = unboundedTimeout(ms, "duration" as const);
4748

4849
if (ms <= this.waitThresholdInMs) {
4950
await resolveAfterDuration;

packages/core/src/v3/schemas/common.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ export const TaskRunErrorCodes = {
3434
TASK_RUN_CANCELLED: "TASK_RUN_CANCELLED",
3535
TASK_OUTPUT_ERROR: "TASK_OUTPUT_ERROR",
3636
HANDLE_ERROR_ERROR: "HANDLE_ERROR_ERROR",
37+
GRACEFUL_EXIT_TIMEOUT: "GRACEFUL_EXIT_TIMEOUT",
3738
} as const;
3839

3940
export const TaskRunInternalError = z.object({
@@ -49,6 +50,7 @@ export const TaskRunInternalError = z.object({
4950
"TASK_RUN_CANCELLED",
5051
"TASK_OUTPUT_ERROR",
5152
"HANDLE_ERROR_ERROR",
53+
"GRACEFUL_EXIT_TIMEOUT"
5254
]),
5355
message: z.string().optional(),
5456
});

packages/core/src/v3/utils/timers.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { TimerOptions } from "node:timers";
2+
import { setTimeout } from "node:timers/promises";
3+
4+
export async function unboundedTimeout<T = void>(
5+
delay: number = 0,
6+
value?: T,
7+
options?: TimerOptions
8+
): Promise<T> {
9+
const maxDelay = 2147483647; // Highest value that will fit in a 32-bit signed integer
10+
11+
const fullTimeouts = Math.floor(delay / maxDelay);
12+
const remainingDelay = delay % maxDelay;
13+
14+
let lastTimeoutResult = await setTimeout(remainingDelay, value, options);
15+
16+
for (let i = 0; i < fullTimeouts; i++) {
17+
lastTimeoutResult = await setTimeout(maxDelay, value, options);
18+
}
19+
20+
return lastTimeoutResult;
21+
}
Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,22 @@
1-
import { task } from "@trigger.dev/sdk/v3";
1+
import { logger, task, wait } from "@trigger.dev/sdk/v3";
2+
import { setTimeout } from "node:timers/promises";
23

34
export const loggingTask = task({
45
id: "logging-task-2",
56
run: async () => {
67
console.log("Hello world");
78
},
89
});
10+
11+
export const waitForever = task({
12+
id: "wait-forever",
13+
run: async (payload: { freeze?: boolean }) => {
14+
if (payload.freeze) {
15+
await wait.for({ years: 9999 });
16+
} else {
17+
await logger.trace("Waiting..", async () => {
18+
await setTimeout(2147483647);
19+
});
20+
}
21+
},
22+
});

0 commit comments

Comments
 (0)