Skip to content

Commit 8f378b2

Browse files
committed
improve prod worker logging
1 parent fed79e7 commit 8f378b2

File tree

3 files changed

+56
-9
lines changed

3 files changed

+56
-9
lines changed

packages/cli-v3/src/workers/prod/backgroundWorker.ts

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ export class ProdBackgroundWorker {
9090
) {}
9191

9292
async close(gracefulExitTimeoutElapsed = false) {
93+
console.log("Closing worker", { gracefulExitTimeoutElapsed, closed: this._closed });
94+
9395
if (this._closed) {
9496
return;
9597
}
@@ -104,6 +106,8 @@ export class ProdBackgroundWorker {
104106
}
105107

106108
async #killTaskRunProcess(flush = true, initialSignal: number | NodeJS.Signals = "SIGTERM") {
109+
console.log("Killing task run process", { flush, initialSignal, closed: this._closed });
110+
107111
if (this._closed || !this._taskRunProcess) {
108112
return;
109113
}
@@ -119,6 +123,10 @@ export class ProdBackgroundWorker {
119123
console.error("Error while trying graceful exit", error);
120124
});
121125

126+
console.log("Killed task run process, setting closed to true", {
127+
closed: this._closed,
128+
pid: currentTaskRunProcess.pid,
129+
});
122130
this._closed = true;
123131
}
124132

@@ -232,6 +240,9 @@ export class ProdBackgroundWorker {
232240
payload.execution.worker.version
233241
);
234242

243+
console.log("Getting fresh task run process, setting closed to false", {
244+
closed: this._closed,
245+
});
235246
this._closed = false;
236247

237248
await this.#killCurrentTaskRunProcessBeforeAttempt();
@@ -250,6 +261,8 @@ export class ProdBackgroundWorker {
250261
);
251262

252263
taskRunProcess.onExit.attach(({ pid }) => {
264+
console.log("Task run process exited", { pid });
265+
253266
// Only delete the task run process if the pid matches
254267
if (this._taskRunProcess?.pid === pid) {
255268
this._taskRunProcess = undefined;
@@ -320,12 +333,21 @@ export class ProdBackgroundWorker {
320333
}
321334

322335
async #killCurrentTaskRunProcessBeforeAttempt() {
336+
console.log("killCurrentTaskRunProcessBeforeAttempt()", {
337+
hasTaskRunProcess: !!this._taskRunProcess,
338+
});
339+
323340
if (!this._taskRunProcess) {
324341
return;
325342
}
326343

327344
const currentTaskRunProcess = this._taskRunProcess;
328345

346+
console.log("Killing current task run process", {
347+
isBeingKilled: currentTaskRunProcess?.isBeingKilled,
348+
totalBeingKilled: this._taskRunProcessesBeingKilled.size,
349+
});
350+
329351
if (currentTaskRunProcess.isBeingKilled) {
330352
if (this._taskRunProcessesBeingKilled.size > 1) {
331353
await this.#tryGracefulExit(currentTaskRunProcess);
@@ -382,6 +404,11 @@ export class ProdBackgroundWorker {
382404
try {
383405
const taskRunProcess = await this.#getFreshTaskRunProcess(payload, messageId);
384406

407+
console.log("executing task run", {
408+
attempt: payload.execution.attempt.id,
409+
taskRunPid: taskRunProcess.pid,
410+
});
411+
385412
const result = await taskRunProcess.executeTaskRun(payload);
386413

387414
if (result.ok) {
@@ -477,7 +504,12 @@ export class ProdBackgroundWorker {
477504
}
478505

479506
async cancelAttempt(attemptId: string) {
480-
await this._taskRunProcess?.cancel();
507+
if (!this._taskRunProcess) {
508+
console.error("No task run process to cancel attempt", { attemptId });
509+
return;
510+
}
511+
512+
await this._taskRunProcess.cancel();
481513
}
482514

483515
async executeTaskRunLazyAttempt(payload: TaskRunExecutionLazyAttemptPayload) {
@@ -701,6 +733,8 @@ class TaskRunProcess {
701733
}
702734

703735
async cleanup(kill = false, gracefulExitTimeoutElapsed = false) {
736+
console.log("cleanup()", { kill, gracefulExitTimeoutElapsed });
737+
704738
if (kill && this._isBeingKilled) {
705739
return;
706740
}
@@ -715,6 +749,11 @@ class TaskRunProcess {
715749
// Kill parent unless graceful exit timeout has elapsed and we're in the middle of an execution
716750
const killParentProcess = kill && !killChildProcess;
717751

752+
console.log("Cleaning up task run process", {
753+
killChildProcess,
754+
killParentProcess,
755+
});
756+
718757
await this._ipc?.sendWithAck("CLEANUP", {
719758
flush: true,
720759
kill: killParentProcess,
@@ -780,9 +819,13 @@ class TaskRunProcess {
780819
}
781820

782821
async #handleExit(code: number | null, signal: NodeJS.Signals | null) {
822+
console.log("handling child exit", { code, signal });
823+
783824
// Go through all the attempts currently pending and reject them
784825
for (const [id, status] of this._attemptStatuses.entries()) {
785826
if (status === "PENDING") {
827+
console.log("found pending attempt", { id });
828+
786829
this._attemptStatuses.set(id, "REJECTED");
787830

788831
const attemptPromise = this._attemptPromises.get(id);

packages/cli-v3/src/workers/prod/entry-point.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,9 @@ class ProdWorker {
134134
});
135135
}
136136
} catch (error) {
137-
logger.error("taskinfo read error during reconnect", { error });
137+
logger.error("taskinfo read error during reconnect", {
138+
error: error instanceof Error ? error.message : error,
139+
});
138140
} finally {
139141
this.#coordinatorSocket = this.#createCoordinatorSocket(coordinatorHost);
140142
}
@@ -181,6 +183,8 @@ class ProdWorker {
181183
}
182184
);
183185

186+
logger.log("onCancelCheckpoint coordinator response", { checkpointCanceled });
187+
184188
if (checkpointCanceled) {
185189
if (message.reason === "WAIT_FOR_DURATION") {
186190
// Worker will resume immediately
@@ -385,11 +389,8 @@ class ProdWorker {
385389
extraHeaders["x-trigger-attempt-friendly-id"] = this.attemptFriendlyId;
386390
}
387391

388-
logger.log("connecting to coordinator", {
389-
host,
390-
port: COORDINATOR_PORT,
391-
extraHeaders,
392-
});
392+
logger.log(`connecting to coordinator: ${host}:${COORDINATOR_PORT}`);
393+
logger.debug(`connecting with extra headers`, { extraHeaders });
393394

394395
const coordinatorConnection = new ZodSocketConnection({
395396
namespace: "prod-worker",
@@ -574,14 +575,16 @@ class ProdWorker {
574575
},
575576
},
576577
onConnection: async (socket, handler, sender, logger) => {
578+
logger.log("connected to coordinator", { status: this.#status });
579+
577580
if (this.waitForPostStart) {
578581
logger.log("skip connection handler, waiting for post start hook");
579582
return;
580583
}
581584

582585
if (this.paused) {
583586
if (!this.nextResumeAfter) {
584-
logger.error("Missing next resume reason");
587+
logger.error("Missing next resume reason", { status: this.#status });
585588

586589
this.#emitUnrecoverableError(
587590
"NoNextResume",
@@ -592,7 +595,7 @@ class ProdWorker {
592595
}
593596

594597
if (!this.attemptFriendlyId) {
595-
logger.error("Missing friendly ID");
598+
logger.error("Missing friendly ID", { status: this.#status });
596599

597600
this.#emitUnrecoverableError(
598601
"NoAttemptId",

packages/core/src/v3/schemas/messages.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,7 @@ export const ProdWorkerToCoordinatorMessages = {
856856
},
857857
};
858858

859+
// TODO: The coordinator can only safely use v1 worker messages, higher versions will need a new flag, e.g. SUPPORTS_VERSIONED_MESSAGES
859860
export const CoordinatorToProdWorkerMessages = {
860861
RESUME_AFTER_DEPENDENCY: {
861862
message: z.object({

0 commit comments

Comments
 (0)