Skip to content

v3: fix dependency trigger and wait #1030

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/smart-needles-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

Remove unimplemented batchOptions
1 change: 1 addition & 0 deletions apps/docker-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ class DockerTaskOperations implements TaskOperations {
return await execa("docker", [
"exec",
containerName,
"busybox",
"wget",
"-q",
"-O-",
Expand Down
2 changes: 1 addition & 1 deletion apps/kubernetes-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ class KubernetesTaskOperations implements TaskOperations {
`for i in $(seq ${retries}); do sleep 1; busybox wget -q -O- 127.0.0.1:8000/${type}?cause=${cause} && break; done`,
];

logger.log("getLifecycleCommand()", { exec });
logger.debug("getLifecycleCommand()", { exec });

return exec;
}
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/components/runs/v3/RunFilters.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { TrashIcon } from "@heroicons/react/20/solid";
import { XMarkIcon } from "@heroicons/react/20/solid";
import { useNavigate } from "@remix-run/react";
import type { TaskRunStatus as TaskRunStatusType } from "@trigger.dev/database";
import { RuntimeEnvironment, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database";
Expand Down Expand Up @@ -247,7 +247,7 @@ export function RunsFilters({ possibleEnvironments, possibleTasks }: RunFiltersP

<TimeFrameFilter from={from} to={to} onRangeChanged={handleTimeFrameChange} />

<Button variant="minimal/small" onClick={() => clearFilters()} LeadingIcon={TrashIcon} />
<Button variant="minimal/small" onClick={() => clearFilters()} LeadingIcon={XMarkIcon} />
</div>
);
}
235 changes: 119 additions & 116 deletions apps/webapp/app/v3/services/resumeAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import {
TaskRunExecution,
TaskRunExecutionResult,
} from "@trigger.dev/core/v3";
import { $transaction } from "~/db.server";
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";
import { socketIo } from "../handleSocketIo.server";
import { sharedQueueTasks } from "../marqs/sharedQueueConsumer.server";
import { BaseService } from "./baseService.server";
import { TaskRunAttempt } from "@trigger.dev/database";

export class ResumeAttemptService extends BaseService {
public async call(
Expand All @@ -24,7 +25,7 @@ export class ResumeAttemptService extends BaseService {
},
include: {
taskRun: true,
taskRunDependency: {
dependencies: {
include: {
taskRun: {
include: {
Expand All @@ -40,8 +41,12 @@ export class ResumeAttemptService extends BaseService {
},
},
},
orderBy: {
createdAt: "desc",
},
take: 1,
},
batchTaskRunDependency: {
batchDependencies: {
include: {
items: {
include: {
Expand All @@ -61,6 +66,10 @@ export class ResumeAttemptService extends BaseService {
},
},
},
orderBy: {
createdAt: "desc",
},
take: 1,
},
},
});
Expand All @@ -78,6 +87,8 @@ export class ResumeAttemptService extends BaseService {
return;
}

let completedAttemptIds: string[] = [];

switch (params.type) {
case "WAIT_FOR_DURATION": {
logger.error(
Expand All @@ -93,148 +104,140 @@ export class ResumeAttemptService extends BaseService {
});
break;
}
case "WAIT_FOR_TASK":
case "WAIT_FOR_BATCH": {
let completedAttemptIds: string[] = [];

if (attempt.taskRunDependency) {
const dependentAttempt = attempt.taskRunDependency.taskRun.attempts[0];
case "WAIT_FOR_TASK": {
if (attempt.dependencies.length) {
// We only care about the latest dependency
const dependentAttempt = attempt.dependencies[0].taskRun.attempts[0];

if (!dependentAttempt) {
logger.error("No dependent attempt", { attemptId: attempt.id });
return;
}

completedAttemptIds = [dependentAttempt.id];

await tx.taskRunAttempt.update({
where: {
id: attempt.id,
},
data: {
taskRunDependency: {
disconnect: true,
},
},
});
} else if (attempt.batchTaskRunDependency) {
const dependentBatchItems = attempt.batchTaskRunDependency.items;
} else {
logger.error("No task dependency", { attemptId: attempt.id });
return;
}
break;
}
case "WAIT_FOR_BATCH": {
if (attempt.batchDependencies) {
// We only care about the latest batch dependency
const dependentBatchItems = attempt.batchDependencies[0].items;

if (!dependentBatchItems) {
logger.error("No dependent batch items", { attemptId: attempt.id });
return;
}

completedAttemptIds = dependentBatchItems.map((item) => item.taskRun.attempts[0]?.id);

await tx.taskRunAttempt.update({
where: {
id: attempt.id,
},
data: {
batchTaskRunDependency: {
disconnect: true,
},
},
});
} else {
logger.error("No dependencies", { attemptId: attempt.id });
return;
}

if (completedAttemptIds.length === 0) {
logger.error("No completed attempt IDs", { attemptId: attempt.id });
logger.error("No batch dependency", { attemptId: attempt.id });
return;
}
break;
}
default: {
break;
}
}

const completions: TaskRunExecutionResult[] = [];
const executions: TaskRunExecution[] = [];
await this.#handleDependencyResume(attempt, completedAttemptIds, tx);
});
}

for (const completedAttemptId of completedAttemptIds) {
const completedAttempt = await tx.taskRunAttempt.findUnique({
where: {
id: completedAttemptId,
taskRun: {
lockedAt: {
not: null,
},
lockedById: {
not: null,
},
},
},
});

if (!completedAttempt) {
logger.error("Completed attempt not found", {
attemptId: attempt.id,
completedAttemptId,
});
await marqs?.acknowledgeMessage(attempt.taskRunId);
return;
}
async #handleDependencyResume(
attempt: TaskRunAttempt,
completedAttemptIds: string[],
tx: PrismaClientOrTransaction
) {
if (completedAttemptIds.length === 0) {
logger.error("No completed attempt IDs", { attemptId: attempt.id });
return;
}

const completions: TaskRunExecutionResult[] = [];
const executions: TaskRunExecution[] = [];

for (const completedAttemptId of completedAttemptIds) {
const completedAttempt = await tx.taskRunAttempt.findUnique({
where: {
id: completedAttemptId,
taskRun: {
lockedAt: {
not: null,
},
lockedById: {
not: null,
},
},
},
});

const completion = await sharedQueueTasks.getCompletionPayloadFromAttempt(
completedAttempt.id
);
if (!completedAttempt) {
logger.error("Completed attempt not found", {
attemptId: attempt.id,
completedAttemptId,
});
await marqs?.acknowledgeMessage(attempt.taskRunId);
return;
}

if (!completion) {
logger.error("Failed to get completion payload", {
attemptId: attempt.id,
completedAttemptId,
});
await marqs?.acknowledgeMessage(attempt.taskRunId);
return;
}
const completion = await sharedQueueTasks.getCompletionPayloadFromAttempt(
completedAttempt.id
);

completions.push(completion);
if (!completion) {
logger.error("Failed to get completion payload", {
attemptId: attempt.id,
completedAttemptId,
});
await marqs?.acknowledgeMessage(attempt.taskRunId);
return;
}

const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt(
completedAttempt.id
);
completions.push(completion);

if (!executionPayload) {
logger.error("Failed to get execution payload", {
attemptId: attempt.id,
completedAttemptId,
});
await marqs?.acknowledgeMessage(attempt.taskRunId);
return;
}
const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt(
completedAttempt.id
);

executions.push(executionPayload.execution);
}
if (!executionPayload) {
logger.error("Failed to get execution payload", {
attemptId: attempt.id,
completedAttemptId,
});
await marqs?.acknowledgeMessage(attempt.taskRunId);
return;
}

const updated = await tx.taskRunAttempt.update({
where: {
id: attempt.id,
},
executions.push(executionPayload.execution);
}

const updated = await tx.taskRunAttempt.update({
where: {
id: attempt.id,
},
data: {
status: "EXECUTING",
taskRun: {
update: {
data: {
status: "EXECUTING",
taskRun: {
update: {
data: {
status: attempt.number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING",
},
},
},
status: attempt.number > 1 ? "RETRYING_AFTER_FAILURE" : "EXECUTING",
},
});
},
},
},
});

socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
version: "v1",
runId: attempt.taskRunId,
attemptId: attempt.id,
attemptFriendlyId: attempt.friendlyId,
completions,
executions,
});
break;
}
default: {
break;
}
}
socketIo.coordinatorNamespace.emit("RESUME_AFTER_DEPENDENCY", {
version: "v1",
runId: attempt.taskRunId,
attemptId: attempt.id,
attemptFriendlyId: attempt.friendlyId,
completions,
executions,
});
}
}
2 changes: 1 addition & 1 deletion docs/mint.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
},
{
"name": "v2",
"url": "https://trigger.dev/docs",
"url": "https://trigger.dev/docs/documentation",
"version": "v3 (Developer Preview)"
},
{
Expand Down
Loading