Skip to content

Commit ad8686e

Browse files
committed
Merge remote-tracking branch 'origin/main' into feat/set-machine-at-trigger-time
2 parents c9cd1d7 + 14ce599 commit ad8686e

File tree

8 files changed

+142
-18
lines changed

8 files changed

+142
-18
lines changed

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ export class DevQueueConsumer {
7474
return;
7575
}
7676

77+
logger.debug("[DevQueueConsumer] Deprecating background worker", {
78+
backgroundWorker: backgroundWorker.id,
79+
env: this.env.id,
80+
});
81+
7782
this._deprecatedWorkers.set(id, backgroundWorker);
7883
this._backgroundWorkers.delete(id);
7984
}
@@ -96,9 +101,10 @@ export class DevQueueConsumer {
96101

97102
this._backgroundWorkers.set(backgroundWorker.id, backgroundWorker);
98103

99-
logger.debug("Registered background worker", {
104+
logger.debug("[DevQueueConsumer] Registered background worker", {
100105
backgroundWorker: backgroundWorker.id,
101106
inProgressRuns,
107+
env: this.env.id,
102108
});
103109

104110
const subscriber = await devPubSub.subscribe(`backgroundWorker:${backgroundWorker.id}:*`);
@@ -138,6 +144,7 @@ export class DevQueueConsumer {
138144
logger.debug("[DevQueueConsumer] taskAttemptCompleted()", {
139145
taskRunCompletion: completion,
140146
execution,
147+
env: this.env.id,
141148
});
142149

143150
const service = new CompleteAttemptService();
@@ -151,7 +158,7 @@ export class DevQueueConsumer {
151158
public async taskRunFailed(workerId: string, completion: TaskRunFailedExecutionResult) {
152159
this._taskFailures++;
153160

154-
logger.debug("[DevQueueConsumer] taskRunFailed()", { completion });
161+
logger.debug("[DevQueueConsumer] taskRunFailed()", { completion, env: this.env.id });
155162

156163
this._inProgressRuns.delete(completion.id);
157164

@@ -188,7 +195,7 @@ export class DevQueueConsumer {
188195
return;
189196
}
190197

191-
logger.debug("Stopping dev queue consumer", { env: this.env });
198+
logger.debug("[DevQueueConsumer] Stopping dev queue consumer", { env: this.env });
192199

193200
this._enabled = false;
194201

@@ -322,20 +329,30 @@ export class DevQueueConsumer {
322329
env: this.env,
323330
});
324331

325-
await marqs?.acknowledgeMessage(message.messageId);
332+
await marqs?.acknowledgeMessage(
333+
message.messageId,
334+
"Failed to parse message.data with MessageBody schema in DevQueueConsumer"
335+
);
326336

327337
setTimeout(() => this.#doWork(), 100);
328338
return;
329339
}
330340

331-
const existingTaskRun = await prisma.taskRun.findUnique({
341+
const existingTaskRun = await prisma.taskRun.findFirst({
332342
where: {
333343
id: message.messageId,
334344
},
335345
});
336346

337347
if (!existingTaskRun) {
338-
await marqs?.acknowledgeMessage(message.messageId);
348+
logger.debug("Failed to find existing task run, acking", {
349+
messageId: message.messageId,
350+
});
351+
352+
await marqs?.acknowledgeMessage(
353+
message.messageId,
354+
"Failed to find task run in DevQueueConsumer"
355+
);
339356
setTimeout(() => this.#doWork(), 100);
340357
return;
341358
}
@@ -346,7 +363,18 @@ export class DevQueueConsumer {
346363
: this.#getLatestBackgroundWorker();
347364

348365
if (!backgroundWorker) {
349-
await marqs?.acknowledgeMessage(message.messageId);
366+
logger.debug("Failed to find background worker, acking", {
367+
messageId: message.messageId,
368+
lockedToVersionId: existingTaskRun.lockedToVersionId,
369+
deprecatedWorkers: Array.from(this._deprecatedWorkers.keys()),
370+
backgroundWorkers: Array.from(this._backgroundWorkers.keys()),
371+
latestWorker: this.#getLatestBackgroundWorker(),
372+
});
373+
374+
await marqs?.acknowledgeMessage(
375+
message.messageId,
376+
"Failed to find background worker in DevQueueConsumer"
377+
);
350378
setTimeout(() => this.#doWork(), 100);
351379
return;
352380
}
@@ -363,7 +391,10 @@ export class DevQueueConsumer {
363391
taskSlugs: backgroundWorker.tasks.map((task) => task.slug),
364392
});
365393

366-
await marqs?.acknowledgeMessage(message.messageId);
394+
await marqs?.acknowledgeMessage(
395+
message.messageId,
396+
"No matching background task found in DevQueueConsumer"
397+
);
367398

368399
setTimeout(() => this.#doWork(), 100);
369400
return;
@@ -397,7 +428,10 @@ export class DevQueueConsumer {
397428
messageId: message.messageId,
398429
});
399430

400-
await marqs?.acknowledgeMessage(message.messageId);
431+
await marqs?.acknowledgeMessage(
432+
message.messageId,
433+
"Failed to lock task run in DevQueueConsumer"
434+
);
401435

402436
setTimeout(() => this.#doWork(), 100);
403437
return;
@@ -496,7 +530,10 @@ export class DevQueueConsumer {
496530
messageId: message.messageId,
497531
backgroundWorker,
498532
});
499-
await marqs?.acknowledgeMessage(message.messageId);
533+
await marqs?.acknowledgeMessage(
534+
message.messageId,
535+
"Non-lazy attempts are no longer supported in DevQueueConsumer"
536+
);
500537

501538
setTimeout(() => this.#doWork(), 100);
502539
}

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,8 @@ export class MarQS {
291291
orgConcurrencyKey: this.keys.orgCurrentConcurrencyKeyFromQueue(messageQueue),
292292
messageId: messageData.messageId,
293293
});
294+
295+
return;
294296
}
295297

296298
await this.options.visibilityTimeoutStrategy.heartbeat(
@@ -411,7 +413,7 @@ export class MarQS {
411413
);
412414
}
413415

414-
public async acknowledgeMessage(messageId: string) {
416+
public async acknowledgeMessage(messageId: string, reason: string = "unknown") {
415417
return this.#trace(
416418
"acknowledgeMessage",
417419
async (span) => {
@@ -421,6 +423,7 @@ export class MarQS {
421423
logger.log(`[${this.name}].acknowledgeMessage() message not found`, {
422424
messageId,
423425
service: this.name,
426+
reason,
424427
});
425428
return;
426429
}
@@ -430,6 +433,7 @@ export class MarQS {
430433
[SemanticAttributes.MESSAGE_ID]: message.messageId,
431434
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
432435
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
436+
["marqs.reason"]: reason,
433437
});
434438

435439
await this.options.visibilityTimeoutStrategy.cancelHeartbeat(messageId);

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -978,7 +978,7 @@ export class SharedQueueConsumer {
978978
}
979979

980980
async #ackAndDoMoreWork(messageId: string, intervalInMs?: number) {
981-
await marqs?.acknowledgeMessage(messageId);
981+
await marqs?.acknowledgeMessage(messageId, "Acking and doing more work in SharedQueueConsumer");
982982
this.#doMoreWork(intervalInMs);
983983
}
984984

apps/webapp/app/v3/requeueTaskRun.server.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,10 @@ export class RequeueTaskRunService extends BaseService {
7171
case "WAITING_FOR_DEPLOY": {
7272
logger.debug("[RequeueTaskRunService] Removing task run from queue", { taskRun });
7373

74-
await marqs?.acknowledgeMessage(taskRun.id);
74+
await marqs?.acknowledgeMessage(
75+
taskRun.id,
76+
"Run is either DELAYED or WAITING_FOR_DEPLOY so we cannot requeue it in RequeueTaskRunService"
77+
);
7578

7679
break;
7780
}
@@ -93,7 +96,10 @@ export class RequeueTaskRunService extends BaseService {
9396
case "CANCELED": {
9497
logger.debug("[RequeueTaskRunService] Task run is completed", { taskRun });
9598

96-
await marqs?.acknowledgeMessage(taskRun.id);
99+
await marqs?.acknowledgeMessage(
100+
taskRun.id,
101+
"Task run is already completed in RequeueTaskRunService"
102+
);
97103

98104
try {
99105
if (taskRun.runtimeEnvironment.type === "DEVELOPMENT") {

apps/webapp/app/v3/services/createCheckpoint.server.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,10 @@ export class CreateCheckpointService extends BaseService {
307307
checkpointId: checkpoint.id,
308308
params,
309309
});
310-
await marqs?.acknowledgeMessage(attempt.taskRunId);
310+
await marqs?.acknowledgeMessage(
311+
attempt.taskRunId,
312+
"No checkpoint event in CreateCheckpointService"
313+
);
311314

312315
return {
313316
success: false,

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ export class FinalizeTaskRunService extends BaseService {
6161
expiredAt,
6262
completedAt,
6363
});
64-
await marqs?.acknowledgeMessage(id);
64+
await marqs?.acknowledgeMessage(id, "FinalTaskRunService call");
6565

6666
logger.debug("Finalizing run updating run status", {
6767
id,

apps/webapp/app/v3/services/resumeAttempt.server.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,10 @@ export class ResumeAttemptService extends BaseService {
172172

173173
if (!completedAttempt) {
174174
this._logger.error("Completed attempt not found", { completedAttemptId });
175-
await marqs?.acknowledgeMessage(attempt.taskRunId);
175+
await marqs?.acknowledgeMessage(
176+
attempt.taskRunId,
177+
"Cannot find completed attempt in ResumeAttemptService"
178+
);
176179
return;
177180
}
178181

@@ -186,7 +189,10 @@ export class ResumeAttemptService extends BaseService {
186189

187190
if (!resumePayload) {
188191
logger.error("Failed to get resume payload");
189-
await marqs?.acknowledgeMessage(attempt.taskRunId);
192+
await marqs?.acknowledgeMessage(
193+
attempt.taskRunId,
194+
"Failed to get resume payload in ResumeAttemptService"
195+
);
190196
return;
191197
}
192198

scripts/unpack-worker.js

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
const fs = require("fs");
2+
const zlib = require("zlib");
3+
const path = require("path");
4+
5+
// Get the file paths from command line arguments
6+
let [jsonFilePath, destDir] = process.argv.slice(2);
7+
8+
if (!jsonFilePath || !destDir) {
9+
console.error("Usage: node script.js <json-file-path> <destination-directory>");
10+
process.exit(1);
11+
}
12+
13+
// Function to decompress the content
14+
function decompressContent(base64Encoded) {
15+
// Decode base64 string to buffer
16+
const compressedData = Buffer.from(base64Encoded, "base64");
17+
18+
// Decompress the data
19+
const decompressedData = zlib.inflateSync(compressedData);
20+
21+
// Convert buffer to string
22+
return decompressedData.toString();
23+
}
24+
25+
try {
26+
// Read and parse the JSON file
27+
const jsonContent = fs.readFileSync(jsonFilePath, "utf8");
28+
29+
const data = JSON.parse(jsonContent)[0];
30+
31+
console.log(data);
32+
33+
const id = data.id;
34+
35+
console.log(`Extracting files for: ${id} to ${destDir}`);
36+
37+
destDir = path.join(destDir, id);
38+
39+
console.log(`Extracting files to: ${destDir}`);
40+
41+
// Create the destination directory if it doesn't exist
42+
fs.mkdirSync(destDir, { recursive: true });
43+
44+
// Process each item in the array
45+
const sourceFiles = data.metadata.sourceFiles;
46+
47+
sourceFiles.forEach((file) => {
48+
// Decompress the contents
49+
const decompressedContent = decompressContent(file.contents);
50+
51+
// Combine destination directory with file path
52+
const fullPath = path.join(destDir, file.filePath);
53+
54+
// Create directory structure if it doesn't exist
55+
const dirPath = path.dirname(fullPath);
56+
fs.mkdirSync(dirPath, { recursive: true });
57+
58+
// Write the decompressed content to the file
59+
fs.writeFileSync(fullPath, decompressedContent);
60+
61+
console.log(`Created file: ${fullPath}`);
62+
});
63+
64+
console.log(`\nAll files have been extracted to: ${destDir}`);
65+
} catch (error) {
66+
console.error(error);
67+
process.exit(1);
68+
}

0 commit comments

Comments
 (0)