Skip to content

v3: correctly handle triggering tasks prior to deploy #1019

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 13 additions & 10 deletions apps/webapp/app/components/runs/v3/RunFilters.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { TaskRunStatusCombo, descriptionForTaskRunStatus } from "./TaskRunStatus

export const allTaskRunStatuses = [
"PENDING",
"WAITING_FOR_DEPLOY",
"EXECUTING",
"RETRYING_AFTER_FAILURE",
"WAITING_TO_RESUME",
Expand Down Expand Up @@ -228,16 +229,18 @@ export function RunsFilters({ possibleEnvironments, possibleTasks }: RunFiltersP
All tasks
</Paragraph>
</SelectItem>
{possibleTasks.map((task) => (
<SelectItem key={task} value={task}>
<Paragraph
variant="extra-small"
className="pl-0.5 transition group-hover:text-text-bright"
>
{task}
</Paragraph>
</SelectItem>
))}
{possibleTasks
.sort((a, b) => a.localeCompare(b)) // 🔤
.map((task) => (
<SelectItem key={task} value={task}>
<Paragraph
variant="extra-small"
className="pl-0.5 transition group-hover:text-text-bright"
>
{task}
</Paragraph>
</SelectItem>
))}
</SelectContent>
</Select>
</SelectGroup>
Expand Down
7 changes: 7 additions & 0 deletions apps/webapp/app/components/runs/v3/TaskRunStatus.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { cn } from "~/utils/cn";

const taskRunStatusDescriptions: Record<TaskRunStatus, string> = {
PENDING: "Task is waiting to be executed",
WAITING_FOR_DEPLOY: "Task needs to be deployed first to start executing",
EXECUTING: "Task is currently being executed",
RETRYING_AFTER_FAILURE: "Task is being reattempted after a failure",
WAITING_TO_RESUME: "Task has been frozen and is waiting to be resumed",
Expand Down Expand Up @@ -63,6 +64,8 @@ export function TaskRunStatusIcon({
switch (status) {
case "PENDING":
return <RectangleStackIcon className={cn(runStatusClassNameColor(status), className)} />;
case "WAITING_FOR_DEPLOY":
return <RectangleStackIcon className={cn(runStatusClassNameColor(status), className)} />;
case "EXECUTING":
return <Spinner className={cn(runStatusClassNameColor(status), className)} />;
case "WAITING_TO_RESUME":
Expand Down Expand Up @@ -95,6 +98,8 @@ export function runStatusClassNameColor(status: TaskRunStatus): string {
switch (status) {
case "PENDING":
return "text-charcoal-500";
case "WAITING_FOR_DEPLOY":
return "text-amber-500";
case "EXECUTING":
case "RETRYING_AFTER_FAILURE":
return "text-pending";
Expand Down Expand Up @@ -125,6 +130,8 @@ export function runStatusTitle(status: TaskRunStatus): string {
switch (status) {
case "PENDING":
return "Queued";
case "WAITING_FOR_DEPLOY":
return "Waiting for deploy";
case "EXECUTING":
return "Executing";
case "WAITING_TO_RESUME":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,12 @@ export default function Page() {
<PageTitle title="Runs" />
</NavBar>
<PageBody>
{list.possibleTasks.length === 0 ? (
<CreateFirstTaskInstructions />
) : list.runs.length === 0 && !list.hasFilters ? (
<RunTaskInstructions />
{list.runs.length === 0 && !list.hasFilters ? (
list.possibleTasks.length === 0 ? (
<CreateFirstTaskInstructions />
) : (
<RunTaskInstructions />
)
) : (
<div className={cn("grid h-fit grid-cols-1 gap-4")}>
<div>
Expand Down
13 changes: 13 additions & 0 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server";
import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server";
import { TimeoutDeploymentService } from "~/v3/services/timeoutDeployment.server";
import { eventRepository } from "~/v3/eventRepository.server";
import { ExecuteTasksWaitingForDeployService } from "~/v3/services/executeTasksWaitingForDeploy";

const workerCatalog = {
indexEndpoint: z.object({
Expand Down Expand Up @@ -128,6 +129,9 @@ const workerCatalog = {
fromStatus: z.string(),
errorMessage: z.string(),
}),
"v3.executeTasksWaitingForDeploy": z.object({
backgroundWorkerId: z.string(),
}),
};

const executionWorkerCatalog = {
Expand Down Expand Up @@ -507,6 +511,15 @@ function getWorkerQueue() {
return await service.call(payload.deploymentId, payload.fromStatus, payload.errorMessage);
},
},
"v3.executeTasksWaitingForDeploy": {
priority: 0,
maxAttempts: 5,
handler: async (payload, job) => {
const service = new ExecuteTasksWaitingForDeployService();

return await service.call(payload.backgroundWorkerId);
},
},
},
});
}
Expand Down
7 changes: 3 additions & 4 deletions apps/webapp/app/v3/marqs/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,14 @@ export class MarQS {
queue: string,
messageId: string,
messageData: Record<string, unknown>,
concurrencyKey?: string
concurrencyKey?: string,
timestamp?: number
) {
return await this.#trace(
"enqueueMessage",
async (span) => {
const messageQueue = this.keys.queueKey(env, queue, concurrencyKey);

const timestamp = Date.now();

const parentQueue = this.keys.envSharedQueueKey(env);

propagation.inject(context.active(), messageData);
Expand All @@ -110,7 +109,7 @@ export class MarQS {
data: messageData,
queue: messageQueue,
concurrencyKey,
timestamp,
timestamp: timestamp ?? Date.now(),
messageId,
parentQueue,
};
Expand Down
79 changes: 63 additions & 16 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,12 @@ export class SharedQueueConsumer {
error: messageBody.error,
});

this.#ackAndDoMoreWork(message.messageId);
await this.#ackAndDoMoreWork(message.messageId);
return;
}

// TODO: For every ACK, decide what should be done with the existing run and attempts. Make sure to check the current statuses first.

switch (messageBody.data.type) {
case "EXECUTE": {
const existingTaskRun = await prisma.taskRun.findUnique({
Expand All @@ -293,7 +295,10 @@ export class SharedQueueConsumer {
messageId: message.messageId,
});

this.#ackAndDoMoreWork(message.messageId);
// INFO: There used to be a race condition where tasks could be triggered, but execute messages could be dequeued before the run finished being created in the DB
// This should not be happening anymore. In case it does, consider reqeueuing here with a brief delay while limiting total retries.

await this.#ackAndDoMoreWork(message.messageId);
return;
}

Expand Down Expand Up @@ -321,7 +326,7 @@ export class SharedQueueConsumer {
retryingFromCheckpoint,
});

this.#ackAndDoMoreWork(message.messageId);
await this.#ackAndDoMoreWork(message.messageId);
return;
}

Expand All @@ -333,7 +338,9 @@ export class SharedQueueConsumer {
messageId: message.messageId,
});

this.#ackAndDoMoreWork(message.messageId);
await this.#markRunAsWaitingForDeploy(existingTaskRun.id);

await this.#ackAndDoMoreWork(message.messageId);
return;
}

Expand All @@ -344,7 +351,9 @@ export class SharedQueueConsumer {
deployment: deployment.id,
});

this.#ackAndDoMoreWork(message.messageId);
await this.#markRunAsWaitingForDeploy(existingTaskRun.id);

await this.#ackAndDoMoreWork(message.messageId);
return;
}

Expand All @@ -353,15 +362,39 @@ export class SharedQueueConsumer {
);

if (!backgroundTask) {
logger.warn("No matching background task found for task run", {
taskRun: existingTaskRun.id,
taskIdentifier: existingTaskRun.taskIdentifier,
deployment: deployment.id,
backgroundWorker: deployment.worker.id,
taskSlugs: deployment.worker.tasks.map((task) => task.slug),
const nonCurrentTask = await prisma.backgroundWorkerTask.findFirst({
where: {
slug: existingTaskRun.taskIdentifier,
projectId: existingTaskRun.projectId,
runtimeEnvironmentId: existingTaskRun.runtimeEnvironmentId,
},
include: {
worker: {
include: {
deployment: {
include: {},
},
},
},
},
});

this.#ackAndDoMoreWork(message.messageId);
if (nonCurrentTask) {
logger.warn("Task for this run exists but is not part of the current deploy", {
taskRun: existingTaskRun.id,
taskIdentifier: existingTaskRun.taskIdentifier,
});
} else {
logger.warn("Task for this run has never been deployed", {
taskRun: existingTaskRun.id,
taskIdentifier: existingTaskRun.taskIdentifier,
});
}

await this.#markRunAsWaitingForDeploy(existingTaskRun.id);

// If this task is ever deployed, a new message will be enqueued after successful indexing
await this.#ackAndDoMoreWork(message.messageId);
return;
}

Expand Down Expand Up @@ -398,7 +431,7 @@ export class SharedQueueConsumer {
messageId: message.messageId,
});

this.#ackAndDoMoreWork(message.messageId);
await this.#ackAndDoMoreWork(message.messageId);
return;
}

Expand Down Expand Up @@ -757,9 +790,23 @@ export class SharedQueueConsumer {
this.#doMoreWork(intervalInMs);
}

async #nackAndDoMoreWork(messageId: string, intervalInMs?: number) {
await marqs?.nackMessage(messageId);
this.#doMoreWork(intervalInMs);
async #nackAndDoMoreWork(messageId: string, queueIntervalInMs?: number, nackRetryInMs?: number) {
const retryAt = nackRetryInMs ? Date.now() + nackRetryInMs : undefined;
await marqs?.nackMessage(messageId, retryAt);
this.#doMoreWork(queueIntervalInMs);
}

async #markRunAsWaitingForDeploy(runId: string) {
logger.debug("Marking run as waiting for deploy", { runId });

return await prisma.taskRun.update({
where: {
id: runId,
},
data: {
status: "WAITING_FOR_DEPLOY",
},
});
}
}

Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/services/cancelTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { logger } from "~/services/logger.server";

export const CANCELLABLE_STATUSES: Array<TaskRunStatus> = [
"PENDING",
"WAITING_FOR_DEPLOY",
"EXECUTING",
"PAUSED",
"WAITING_TO_RESUME",
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/services/crashTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.se

export const CRASHABLE_RUN_STATUSES: Array<TaskRunStatus> = [
"PENDING",
"WAITING_FOR_DEPLOY",
"EXECUTING",
"PAUSED",
"WAITING_TO_RESUME",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { CURRENT_DEPLOYMENT_LABEL } from "~/consts";
import { projectPubSub } from "./projectPubSub.server";
import { marqs } from "~/v3/marqs/index.server";
import { logger } from "~/services/logger.server";
import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy";

export class CreateDeployedBackgroundWorkerService extends BaseService {
public async call(
Expand Down Expand Up @@ -96,6 +97,8 @@ export class CreateDeployedBackgroundWorkerService extends BaseService {
logger.error("Failed to publish WORKER_CREATED event", { err });
}

await ExecuteTasksWaitingForDeployService.enqueue(backgroundWorker.id, this._prisma);

return backgroundWorker;
});
}
Expand Down
Loading