Skip to content

Commit 0a18e1d

Browse files
authored
Additional dev queue consumer logging (#1606)
1 parent ca51f65 commit 0a18e1d

File tree

2 files changed

+90
-3
lines changed

2 files changed

+90
-3
lines changed

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ export class DevQueueConsumer {
7474
return;
7575
}
7676

77+
logger.debug("[DevQueueConsumer] Deprecating background worker", {
78+
backgroundWorker: backgroundWorker.id,
79+
env: this.env.id,
80+
});
81+
7782
this._deprecatedWorkers.set(id, backgroundWorker);
7883
this._backgroundWorkers.delete(id);
7984
}
@@ -96,9 +101,10 @@ export class DevQueueConsumer {
96101

97102
this._backgroundWorkers.set(backgroundWorker.id, backgroundWorker);
98103

99-
logger.debug("Registered background worker", {
104+
logger.debug("[DevQueueConsumer] Registered background worker", {
100105
backgroundWorker: backgroundWorker.id,
101106
inProgressRuns,
107+
env: this.env.id,
102108
});
103109

104110
const subscriber = await devPubSub.subscribe(`backgroundWorker:${backgroundWorker.id}:*`);
@@ -138,6 +144,7 @@ export class DevQueueConsumer {
138144
logger.debug("[DevQueueConsumer] taskAttemptCompleted()", {
139145
taskRunCompletion: completion,
140146
execution,
147+
env: this.env.id,
141148
});
142149

143150
const service = new CompleteAttemptService();
@@ -151,7 +158,7 @@ export class DevQueueConsumer {
151158
public async taskRunFailed(workerId: string, completion: TaskRunFailedExecutionResult) {
152159
this._taskFailures++;
153160

154-
logger.debug("[DevQueueConsumer] taskRunFailed()", { completion });
161+
logger.debug("[DevQueueConsumer] taskRunFailed()", { completion, env: this.env.id });
155162

156163
this._inProgressRuns.delete(completion.id);
157164

@@ -188,7 +195,7 @@ export class DevQueueConsumer {
188195
return;
189196
}
190197

191-
logger.debug("Stopping dev queue consumer", { env: this.env });
198+
logger.debug("[DevQueueConsumer] Stopping dev queue consumer", { env: this.env });
192199

193200
this._enabled = false;
194201

@@ -335,6 +342,10 @@ export class DevQueueConsumer {
335342
});
336343

337344
if (!existingTaskRun) {
345+
logger.debug("Failed to find existing task run, acking", {
346+
messageId: message.messageId,
347+
});
348+
338349
await marqs?.acknowledgeMessage(message.messageId);
339350
setTimeout(() => this.#doWork(), 100);
340351
return;
@@ -346,6 +357,14 @@ export class DevQueueConsumer {
346357
: this.#getLatestBackgroundWorker();
347358

348359
if (!backgroundWorker) {
360+
logger.debug("Failed to find background worker, acking", {
361+
messageId: message.messageId,
362+
lockedToVersionId: existingTaskRun.lockedToVersionId,
363+
deprecatedWorkers: Array.from(this._deprecatedWorkers.keys()),
364+
backgroundWorkers: Array.from(this._backgroundWorkers.keys()),
365+
latestWorker: this.#getLatestBackgroundWorker(),
366+
});
367+
349368
await marqs?.acknowledgeMessage(message.messageId);
350369
setTimeout(() => this.#doWork(), 100);
351370
return;

scripts/unpack-worker.js

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
const fs = require("fs");
2+
const zlib = require("zlib");
3+
const path = require("path");
4+
5+
// Get the file paths from command line arguments
6+
let [jsonFilePath, destDir] = process.argv.slice(2);
7+
8+
if (!jsonFilePath || !destDir) {
9+
console.error("Usage: node script.js <json-file-path> <destination-directory>");
10+
process.exit(1);
11+
}
12+
13+
// Function to decompress the content
14+
function decompressContent(base64Encoded) {
15+
// Decode base64 string to buffer
16+
const compressedData = Buffer.from(base64Encoded, "base64");
17+
18+
// Decompress the data
19+
const decompressedData = zlib.inflateSync(compressedData);
20+
21+
// Convert buffer to string
22+
return decompressedData.toString();
23+
}
24+
25+
try {
26+
// Read and parse the JSON file
27+
const jsonContent = fs.readFileSync(jsonFilePath, "utf8");
28+
29+
const data = JSON.parse(jsonContent)[0];
30+
31+
console.log(data);
32+
33+
const id = data.id;
34+
35+
console.log(`Extracting files for: ${id} to ${destDir}`);
36+
37+
destDir = path.join(destDir, id);
38+
39+
console.log(`Extracting files to: ${destDir}`);
40+
41+
// Create the destination directory if it doesn't exist
42+
fs.mkdirSync(destDir, { recursive: true });
43+
44+
// Process each item in the array
45+
const sourceFiles = data.metadata.sourceFiles;
46+
47+
sourceFiles.forEach((file) => {
48+
// Decompress the contents
49+
const decompressedContent = decompressContent(file.contents);
50+
51+
// Combine destination directory with file path
52+
const fullPath = path.join(destDir, file.filePath);
53+
54+
// Create directory structure if it doesn't exist
55+
const dirPath = path.dirname(fullPath);
56+
fs.mkdirSync(dirPath, { recursive: true });
57+
58+
// Write the decompressed content to the file
59+
fs.writeFileSync(fullPath, decompressedContent);
60+
61+
console.log(`Created file: ${fullPath}`);
62+
});
63+
64+
console.log(`\nAll files have been extracted to: ${destDir}`);
65+
} catch (error) {
66+
console.error(error);
67+
process.exit(1);
68+
}

0 commit comments

Comments
 (0)