Skip to content

Commit 3ecfa22

Browse files
committed
detect and execute tasks waiting for deploy
1 parent 4e84eee commit 3ecfa22

File tree

4 files changed

+191
-16
lines changed

4 files changed

+191
-16
lines changed

apps/webapp/app/services/worker.server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server";
3535
import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server";
3636
import { TimeoutDeploymentService } from "~/v3/services/timeoutDeployment.server";
3737
import { eventRepository } from "~/v3/eventRepository.server";
38+
import { ExecuteTasksWaitingForDeployService } from "~/v3/services/executeTasksWaitingForDeploy";
3839

3940
const workerCatalog = {
4041
indexEndpoint: z.object({
@@ -128,6 +129,9 @@ const workerCatalog = {
128129
fromStatus: z.string(),
129130
errorMessage: z.string(),
130131
}),
132+
"v3.executeTasksWaitingForDeploy": z.object({
133+
backgroundWorkerId: z.string(),
134+
}),
131135
};
132136

133137
const executionWorkerCatalog = {
@@ -507,6 +511,15 @@ function getWorkerQueue() {
507511
return await service.call(payload.deploymentId, payload.fromStatus, payload.errorMessage);
508512
},
509513
},
514+
"v3.executeTasksWaitingForDeploy": {
515+
priority: 0,
516+
maxAttempts: 5,
517+
handler: async (payload, job) => {
518+
const service = new ExecuteTasksWaitingForDeployService();
519+
520+
return await service.call(payload.backgroundWorkerId);
521+
},
522+
},
510523
},
511524
});
512525
}

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,12 @@ export class SharedQueueConsumer {
275275
error: messageBody.error,
276276
});
277277

278-
this.#ackAndDoMoreWork(message.messageId);
278+
await this.#ackAndDoMoreWork(message.messageId);
279279
return;
280280
}
281281

282+
// TODO: For every ACK, decide what should be done with the existing run and attempts. Make sure to check the current statuses first.
283+
282284
switch (messageBody.data.type) {
283285
case "EXECUTE": {
284286
const existingTaskRun = await prisma.taskRun.findUnique({
@@ -293,7 +295,7 @@ export class SharedQueueConsumer {
293295
messageId: message.messageId,
294296
});
295297

296-
this.#ackAndDoMoreWork(message.messageId);
298+
await this.#ackAndDoMoreWork(message.messageId);
297299
return;
298300
}
299301

@@ -321,7 +323,7 @@ export class SharedQueueConsumer {
321323
retryingFromCheckpoint,
322324
});
323325

324-
this.#ackAndDoMoreWork(message.messageId);
326+
await this.#ackAndDoMoreWork(message.messageId);
325327
return;
326328
}
327329

@@ -333,7 +335,9 @@ export class SharedQueueConsumer {
333335
messageId: message.messageId,
334336
});
335337

336-
this.#ackAndDoMoreWork(message.messageId);
338+
await this.#markRunAsWaitingForDeploy(existingTaskRun.id);
339+
340+
await this.#ackAndDoMoreWork(message.messageId);
337341
return;
338342
}
339343

@@ -344,7 +348,9 @@ export class SharedQueueConsumer {
344348
deployment: deployment.id,
345349
});
346350

347-
this.#ackAndDoMoreWork(message.messageId);
351+
await this.#markRunAsWaitingForDeploy(existingTaskRun.id);
352+
353+
await this.#ackAndDoMoreWork(message.messageId);
348354
return;
349355
}
350356

@@ -353,15 +359,39 @@ export class SharedQueueConsumer {
353359
);
354360

355361
if (!backgroundTask) {
356-
logger.warn("No matching background task found for task run", {
357-
taskRun: existingTaskRun.id,
358-
taskIdentifier: existingTaskRun.taskIdentifier,
359-
deployment: deployment.id,
360-
backgroundWorker: deployment.worker.id,
361-
taskSlugs: deployment.worker.tasks.map((task) => task.slug),
362+
const nonCurrentTask = await prisma.backgroundWorkerTask.findFirst({
363+
where: {
364+
slug: existingTaskRun.taskIdentifier,
365+
projectId: existingTaskRun.projectId,
366+
runtimeEnvironmentId: existingTaskRun.runtimeEnvironmentId,
367+
},
368+
include: {
369+
worker: {
370+
include: {
371+
deployment: {
372+
include: {},
373+
},
374+
},
375+
},
376+
},
362377
});
363378

364-
this.#ackAndDoMoreWork(message.messageId);
379+
if (nonCurrentTask) {
380+
logger.warn("Task for this run exists but is not part of the current deploy", {
381+
taskRun: existingTaskRun.id,
382+
taskIdentifier: existingTaskRun.taskIdentifier,
383+
});
384+
} else {
385+
logger.warn("Task for this run has never been deployed", {
386+
taskRun: existingTaskRun.id,
387+
taskIdentifier: existingTaskRun.taskIdentifier,
388+
});
389+
}
390+
391+
await this.#markRunAsWaitingForDeploy(existingTaskRun.id);
392+
393+
// If this task is ever deployed, a new message will be enqueued after successful indexing
394+
await this.#ackAndDoMoreWork(message.messageId);
365395
return;
366396
}
367397

@@ -398,7 +428,7 @@ export class SharedQueueConsumer {
398428
messageId: message.messageId,
399429
});
400430

401-
this.#ackAndDoMoreWork(message.messageId);
431+
await this.#ackAndDoMoreWork(message.messageId);
402432
return;
403433
}
404434

@@ -757,9 +787,23 @@ export class SharedQueueConsumer {
757787
this.#doMoreWork(intervalInMs);
758788
}
759789

760-
async #nackAndDoMoreWork(messageId: string, intervalInMs?: number) {
761-
await marqs?.nackMessage(messageId);
762-
this.#doMoreWork(intervalInMs);
790+
async #nackAndDoMoreWork(messageId: string, queueIntervalInMs?: number, nackRetryInMs?: number) {
791+
const retryAt = nackRetryInMs ? Date.now() + nackRetryInMs : undefined;
792+
await marqs?.nackMessage(messageId, retryAt);
793+
this.#doMoreWork(queueIntervalInMs);
794+
}
795+
796+
async #markRunAsWaitingForDeploy(runId: string) {
797+
logger.debug("Marking run as waiting for deploy", { runId });
798+
799+
return await prisma.taskRun.update({
800+
where: {
801+
id: runId,
802+
},
803+
data: {
804+
status: "WAITING_FOR_DEPLOY",
805+
},
806+
});
763807
}
764808
}
765809

apps/webapp/app/v3/services/createDeployedBackgroundWorker.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { CURRENT_DEPLOYMENT_LABEL } from "~/consts";
88
import { projectPubSub } from "./projectPubSub.server";
99
import { marqs } from "~/v3/marqs/index.server";
1010
import { logger } from "~/services/logger.server";
11+
import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy";
1112

1213
export class CreateDeployedBackgroundWorkerService extends BaseService {
1314
public async call(
@@ -96,6 +97,8 @@ export class CreateDeployedBackgroundWorkerService extends BaseService {
9697
logger.error("Failed to publish WORKER_CREATED event", { err });
9798
}
9899

100+
await ExecuteTasksWaitingForDeployService.enqueue(backgroundWorker.id, this._prisma);
101+
99102
return backgroundWorker;
100103
});
101104
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
import { PrismaClientOrTransaction } from "~/db.server";
2+
import { workerQueue } from "~/services/worker.server";
3+
import { marqs } from "~/v3/marqs/index.server";
4+
import { BaseService } from "./baseService.server";
5+
import { logger } from "~/services/logger.server";
6+
7+
export class ExecuteTasksWaitingForDeployService extends BaseService {
8+
public async call(backgroundWorkerId: string) {
9+
const backgroundWorker = await this._prisma.backgroundWorker.findFirst({
10+
where: {
11+
id: backgroundWorkerId,
12+
},
13+
include: {
14+
runtimeEnvironment: {
15+
include: {
16+
project: true,
17+
organization: true,
18+
},
19+
},
20+
tasks: true,
21+
},
22+
});
23+
24+
if (!backgroundWorker) {
25+
logger.error("Background worker not found", { id: backgroundWorkerId });
26+
return;
27+
}
28+
29+
const runsWaitingForDeploy = await this._prisma.taskRun.findMany({
30+
where: {
31+
runtimeEnvironmentId: backgroundWorker.runtimeEnvironmentId,
32+
projectId: backgroundWorker.projectId,
33+
status: "WAITING_FOR_DEPLOY",
34+
taskIdentifier: {
35+
in: backgroundWorker.tasks.map((task) => task.slug),
36+
},
37+
},
38+
});
39+
40+
if (!runsWaitingForDeploy.length) {
41+
return;
42+
}
43+
44+
// Clear any runs awaiting deployment for execution
45+
const pendingRuns = await this._prisma.taskRun.updateMany({
46+
where: {
47+
id: {
48+
in: runsWaitingForDeploy.map((run) => run.id),
49+
},
50+
},
51+
data: {
52+
status: "PENDING",
53+
},
54+
});
55+
56+
if (pendingRuns.count) {
57+
logger.debug("Task runs waiting for deploy are now ready for execution", {
58+
tasks: runsWaitingForDeploy.map((run) => run.id),
59+
total: pendingRuns.count,
60+
});
61+
}
62+
63+
if (!marqs) {
64+
return;
65+
}
66+
67+
const enqueues: (Promise<any> | undefined)[] = [];
68+
69+
for (const run of runsWaitingForDeploy) {
70+
enqueues.push(
71+
marqs.enqueueMessage(
72+
backgroundWorker.runtimeEnvironment,
73+
run.queue,
74+
run.id,
75+
{
76+
type: "EXECUTE",
77+
taskIdentifier: run.taskIdentifier,
78+
},
79+
run.concurrencyKey ?? undefined
80+
)
81+
);
82+
}
83+
84+
const settled = await Promise.allSettled(enqueues);
85+
86+
if (settled.some((s) => s.status === "rejected")) {
87+
const rejectedRuns: { id: string; reason: any }[] = [];
88+
89+
runsWaitingForDeploy.forEach((run, i) => {
90+
if (settled[i].status === "rejected") {
91+
const rejected = settled[i] as PromiseRejectedResult;
92+
93+
rejectedRuns.push({ id: run.id, reason: rejected.reason });
94+
}
95+
});
96+
97+
logger.error("Failed to requeue task runs for immediate execution", {
98+
rejectedRuns,
99+
});
100+
}
101+
}
102+
103+
static async enqueue(backgroundWorkerId: string, tx: PrismaClientOrTransaction, runAt?: Date) {
104+
return await workerQueue.enqueue(
105+
"v3.executeTasksWaitingForDeploy",
106+
{
107+
backgroundWorkerId,
108+
},
109+
{
110+
tx,
111+
runAt,
112+
}
113+
);
114+
}
115+
}

0 commit comments

Comments
 (0)