Skip to content

v3: handle errors and customizing retrying #943

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 4 additions & 19 deletions apps/webapp/app/components/primitives/TreeView/TreeView.tsx
Original file line number Diff line number Diff line change
@@ -1,25 +1,9 @@
import { VirtualItem, Virtualizer, useVirtualizer } from "@tanstack/react-virtual";
import {
Fragment,
MutableRefObject,
RefObject,
useCallback,
useEffect,
useImperativeHandle,
useReducer,
useRef,
useState,
} from "react";
import { MutableRefObject, RefObject, useCallback, useEffect, useReducer, useRef } from "react";
import { UnmountClosed } from "react-collapse";
import { cn } from "~/utils/cn";
import { Changes, NodeState, NodesState, reducer } from "./reducer";
import {
applyFilterToState,
concreteStateFromInput,
firstVisibleNode,
lastVisibleNode,
selectedIdFromState,
} from "./utils";
import { NodeState, NodesState, reducer } from "./reducer";
import { applyFilterToState, concreteStateFromInput, selectedIdFromState } from "./utils";

export type TreeViewProps<TData> = {
tree: FlatTree<TData>;
Expand Down Expand Up @@ -232,6 +216,7 @@ export function useTree<TData>({
index,
});
},
overscan: 20,
});

const scrollToNodeFn = useCallback(
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/consts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ export const RUN_CHUNK_EXECUTION_BUFFER = 350;
export const MAX_RUN_CHUNK_EXECUTION_LIMIT = 120000; // 2 minutes
export const VERCEL_RESPONSE_TIMEOUT_STATUS_CODES = [408, 504];
export const MAX_BATCH_TRIGGER_ITEMS = 100;
export const MAX_TASK_RUN_ATTEMPTS = 250;
8 changes: 4 additions & 4 deletions apps/webapp/app/v3/marqs.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -611,14 +611,14 @@ export class MarQS {
String(this.options.defaultConcurrency ?? 10)
);

logger.debug("Dequeue message result", {
result,
});

if (!result) {
return;
}

logger.debug("Dequeue message result", {
result,
});

if (result.length !== 2) {
return;
}
Expand Down
72 changes: 66 additions & 6 deletions apps/webapp/app/v3/marqs/devQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { CancelAttemptService } from "../services/cancelAttempt.server";
import { CompleteAttemptService } from "../services/completeAttempt.server";
import { attributesFromAuthenticatedEnv } from "../tracer.server";
import { DevSubscriber, devPubSub } from "./devPubSub.server";
import { CancelTaskRunService } from "../services/cancelTaskRun.server";

const tracer = trace.getTracer("devQueueConsumer");

Expand Down Expand Up @@ -50,6 +51,7 @@ export class DevQueueConsumer {
private _currentSpan: Span | undefined;
private _endSpanInNextIteration = false;
private _inProgressAttempts: Map<string, string> = new Map(); // Keys are task attempt friendly IDs, values are TaskRun ids/queue message ids
private _inProgressRuns: Map<string, string> = new Map(); // Keys are task run friendly IDs, values are TaskRun internal ids/queue message ids

constructor(
public env: AuthenticatedEnvironment,
Expand Down Expand Up @@ -123,7 +125,11 @@ export class DevQueueConsumer {
logger.debug("Task run completed", { taskRunCompletion: completion, execution });

const service = new CompleteAttemptService();
await service.call(completion, execution, this.env);
const result = await service.call(completion, execution, this.env);

if (result === "COMPLETED") {
this._inProgressRuns.delete(execution.run.id);
}
}

public async taskHeartbeat(workerId: string, id: string, seconds: number = 60) {
Expand All @@ -148,7 +154,7 @@ export class DevQueueConsumer {
this._enabled = false;

// We need to cancel all the in progress task run attempts and ack the messages so they will stop processing
await this.#cancelInProgressAttempts(reason);
await this.#cancelInProgressRunsAndAttempts(reason);

// We need to unsubscribe from the background worker channels
for (const [id, subscriber] of this._backgroundWorkerSubscriber) {
Expand All @@ -161,21 +167,44 @@ export class DevQueueConsumer {
}
}

async #cancelInProgressAttempts(reason: string) {
const service = new CancelAttemptService();
async #cancelInProgressRunsAndAttempts(reason: string) {
const cancelAttemptService = new CancelAttemptService();
const cancelTaskRunService = new CancelTaskRunService();

const cancelledAt = new Date();

const inProgressAttempts = new Map(this._inProgressAttempts);
const inProgressRuns = new Map(this._inProgressRuns);

this._inProgressAttempts.clear();
this._inProgressRuns.clear();

logger.debug("Cancelling in progress attempts", {
const inProgressRunsWithNoInProgressAttempts: string[] = [];
const inProgressAttemptRunIds = new Set(inProgressAttempts.values());

for (const [runId, messageId] of inProgressRuns) {
if (!inProgressAttemptRunIds.has(messageId)) {
inProgressRunsWithNoInProgressAttempts.push(messageId);
}
}

logger.debug("Cancelling in progress runs and attempts", {
attempts: Array.from(inProgressAttempts.keys()),
runs: Array.from(inProgressRuns.keys()),
});

for (const [attemptId, messageId] of inProgressAttempts) {
await this.#cancelInProgressAttempt(attemptId, messageId, service, cancelledAt, reason);
await this.#cancelInProgressAttempt(
attemptId,
messageId,
cancelAttemptService,
cancelledAt,
reason
);
}

for (const runId of inProgressRunsWithNoInProgressAttempts) {
await this.#cancelInProgressRun(runId, cancelTaskRunService, cancelledAt, reason);
}
}

Expand All @@ -199,6 +228,32 @@ export class DevQueueConsumer {
}
}

async #cancelInProgressRun(
runId: string,
service: CancelTaskRunService,
cancelledAt: Date,
reason: string
) {
logger.debug("Cancelling in progress run", { runId });

const taskRun = await prisma.taskRun.findUnique({
where: { id: runId },
});

if (!taskRun) {
return;
}

try {
await service.call(taskRun, { reason, cancelAttempts: false, cancelledAt });
} catch (e) {
logger.error("Failed to cancel in progress run", {
runId,
error: e,
});
}
}

#enable() {
if (this._enabled) {
return;
Expand Down Expand Up @@ -481,6 +536,7 @@ export class DevQueueConsumer {
});

this._inProgressAttempts.set(taskRunAttempt.friendlyId, message.messageId);
this._inProgressRuns.set(lockedTaskRun.friendlyId, message.messageId);
} catch (e) {
if (e instanceof Error) {
this._currentSpan?.recordException(e);
Expand All @@ -499,6 +555,7 @@ export class DevQueueConsumer {
data: {
lockedAt: null,
lockedById: null,
status: "PENDING",
},
}),
prisma.taskRunAttempt.delete({
Expand All @@ -508,6 +565,9 @@ export class DevQueueConsumer {
}),
]);

this._inProgressAttempts.delete(taskRunAttempt.friendlyId);
this._inProgressRuns.delete(lockedTaskRun.friendlyId);

// Finally we need to nack the message so it can be retried
await marqs?.nackMessage(message.messageId);
} finally {
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,10 @@ export class SharedQueueConsumer {
return;
}

if (existingTaskRun.status !== "PENDING") {
if (
existingTaskRun.status !== "PENDING" &&
existingTaskRun.status !== "RETRYING_AFTER_FAILURE"
) {
logger.debug("Task run is not pending, aborting", {
queueMessage: message.data,
messageId: message.messageId,
Expand Down
107 changes: 61 additions & 46 deletions apps/webapp/app/v3/services/cancelTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,21 @@ const CANCELLABLE_ATTEMPT_STATUSES: Array<TaskRunAttemptStatus> = [
"PENDING",
];

export type CancelTaskRunServiceOptions = {
reason?: string;
cancelAttempts?: boolean;
cancelledAt?: Date;
};

export class CancelTaskRunService extends BaseService {
public async call(taskRun: TaskRun) {
public async call(taskRun: TaskRun, options?: CancelTaskRunServiceOptions) {
const opts = {
reason: "Task run was cancelled by user",
cancelAttempts: true,
cancelledAt: new Date(),
...options,
};

// Make sure the task run is in a cancellable state
if (!CANCELLABLE_STATUSES.includes(taskRun.status)) {
return;
Expand Down Expand Up @@ -68,59 +81,61 @@ export class CancelTaskRunService extends BaseService {

await Promise.all(
inProgressEvents.map((event) => {
return eventRepository.cancelEvent(event, new Date(), "Task run was cancelled by user");
return eventRepository.cancelEvent(event, opts.cancelledAt, opts.reason);
})
);

// Cancel any in progress attempts
for (const attempt of cancelledTaskRun.attempts) {
if (attempt.runtimeEnvironment.type === "DEVELOPMENT") {
// Signal the task run attempt to stop
await devPubSub.publish(
`backgroundWorker:${attempt.backgroundWorkerId}:${attempt.id}`,
"CANCEL_ATTEMPT",
{
attemptId: attempt.friendlyId,
backgroundWorkerId: attempt.backgroundWorker.friendlyId,
taskRunId: cancelledTaskRun.friendlyId,
}
);
} else {
switch (attempt.status) {
case "EXECUTING": {
// We need to send a cancel message to the coordinator
socketIo.coordinatorNamespace.emit("REQUEST_ATTEMPT_CANCELLATION", {
version: "v1",
attemptId: attempt.id,
});
if (opts.cancelAttempts) {
for (const attempt of cancelledTaskRun.attempts) {
if (attempt.runtimeEnvironment.type === "DEVELOPMENT") {
// Signal the task run attempt to stop
await devPubSub.publish(
`backgroundWorker:${attempt.backgroundWorkerId}:${attempt.id}`,
"CANCEL_ATTEMPT",
{
attemptId: attempt.friendlyId,
backgroundWorkerId: attempt.backgroundWorker.friendlyId,
taskRunId: cancelledTaskRun.friendlyId,
}
);
} else {
switch (attempt.status) {
case "EXECUTING": {
// We need to send a cancel message to the coordinator
socketIo.coordinatorNamespace.emit("REQUEST_ATTEMPT_CANCELLATION", {
version: "v1",
attemptId: attempt.id,
});

break;
}
case "PENDING":
case "PAUSED": {
logger.debug("Cancelling pending or paused attempt", {
attempt,
});
break;
}
case "PENDING":
case "PAUSED": {
logger.debug("Cancelling pending or paused attempt", {
attempt,
});

const service = new CancelAttemptService();
const service = new CancelAttemptService();

await service.call(
attempt.friendlyId,
taskRun.id,
new Date(),
"Task run was cancelled by user"
);
await service.call(
attempt.friendlyId,
taskRun.id,
new Date(),
"Task run was cancelled by user"
);

break;
}
case "CANCELED":
case "COMPLETED":
case "FAILED": {
// Do nothing
break;
}
default: {
assertUnreachable(attempt.status);
break;
}
case "CANCELED":
case "COMPLETED":
case "FAILED": {
// Do nothing
break;
}
default: {
assertUnreachable(attempt.status);
}
}
}
}
Expand Down
Loading