Skip to content

Commit 188d4d6

Browse files
committed
WIP task run status, revamped resuming task dependencies
1 parent fe9435f commit 188d4d6

File tree

17 files changed

+554
-247
lines changed

17 files changed

+554
-247
lines changed

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.v3.$projectParam.test/route.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ export default function Page() {
5252
<div className={cn("grid h-full max-h-full grid-cols-1")}>
5353
<ResizablePanelGroup direction="horizontal" className="h-full max-h-full">
5454
<ResizablePanel order={1} minSize={20} defaultSize={30}>
55-
<div className="flex flex-col px-3">
55+
<div className="flex h-full max-h-full flex-col overflow-hidden px-3">
5656
{tasks.length === 0 ? (
5757
<NoTaskInstructions />
5858
) : (
@@ -81,7 +81,7 @@ function TaskSelector({ tasks }: { tasks: TaskListItem[] }) {
8181
const project = useProject();
8282

8383
return (
84-
<div className="flex flex-col divide-y divide-charcoal-800">
84+
<div className="flex flex-col divide-y divide-charcoal-800 overflow-y-auto">
8585
{tasks.map((t) => (
8686
<NavLink key={t.id} to={v3TestTaskPath(organization, project, t)}>
8787
{({ isActive, isPending }) => (

apps/webapp/app/services/worker.server.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ import { DeliverWebhookRequestService } from "./sources/deliverWebhookRequest.se
3030
import { PerformTaskOperationService } from "./tasks/performTaskOperation.server";
3131
import { ProcessCallbackTimeoutService } from "./tasks/processCallbackTimeout.server";
3232
import { ResumeTaskService } from "./tasks/resumeTask.server";
33+
import { ResumeTaskRunDependenciesService } from "~/v3/services/resumeTaskRunDependencies.server";
34+
import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server";
35+
import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server";
3336

3437
const workerCatalog = {
3538
indexEndpoint: z.object({
@@ -107,6 +110,17 @@ const workerCatalog = {
107110
"v3.indexDeployment": z.object({
108111
id: z.string(),
109112
}),
113+
"v3.resumeTaskRunDependencies": z.object({
114+
attemptId: z.string(),
115+
}),
116+
"v3.resumeBatchRun": z.object({
117+
batchRunId: z.string(),
118+
sourceTaskAttemptId: z.string(),
119+
}),
120+
"v3.resumeTaskDependency": z.object({
121+
dependencyId: z.string(),
122+
sourceTaskAttemptId: z.string(),
123+
}),
110124
};
111125

112126
const executionWorkerCatalog = {
@@ -443,6 +457,33 @@ function getWorkerQueue() {
443457
return await service.call(payload.id);
444458
},
445459
},
460+
"v3.resumeTaskRunDependencies": {
461+
priority: 0,
462+
maxAttempts: 5,
463+
handler: async (payload, job) => {
464+
const service = new ResumeTaskRunDependenciesService();
465+
466+
return await service.call(payload.attemptId);
467+
},
468+
},
469+
"v3.resumeBatchRun": {
470+
priority: 0,
471+
maxAttempts: 5,
472+
handler: async (payload, job) => {
473+
const service = new ResumeBatchRunService();
474+
475+
return await service.call(payload.batchRunId, payload.sourceTaskAttemptId);
476+
},
477+
},
478+
"v3.resumeTaskDependency": {
479+
priority: 0,
480+
maxAttempts: 5,
481+
handler: async (payload, job) => {
482+
const service = new ResumeTaskDependencyService();
483+
484+
return await service.call(payload.dependencyId, payload.sourceTaskAttemptId);
485+
},
486+
},
446487
},
447488
});
448489
}

apps/webapp/app/v3/marqs/devQueueConsumer.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ export class DevQueueConsumer {
309309
data: {
310310
lockedAt: new Date(),
311311
lockedById: backgroundTask.id,
312+
status: "EXECUTING",
312313
},
313314
include: {
314315
attempts: {

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@ const MessageBody = z.discriminatedUnion("type", [
3131
z.object({
3232
type: z.literal("RESUME"),
3333
completedAttemptIds: z.string().array(),
34+
resumableAttemptId: z.string(),
3435
}),
3536
z.object({
3637
type: z.literal("RESUME_AFTER_DURATION"),
38+
resumableAttemptId: z.string(),
3739
}),
3840
]);
3941

@@ -478,30 +480,36 @@ export class SharedQueueConsumer {
478480
return;
479481
}
480482

481-
const resumableRun = await prisma.taskRun.findFirst({
483+
const resumableRun = await prisma.taskRun.findUnique({
482484
where: {
483485
id: message.messageId,
484486
},
487+
});
488+
489+
if (!resumableRun) {
490+
logger.error("Resumable run not found", {
491+
queueMessage: message.data,
492+
messageId: message.messageId,
493+
});
494+
await marqs?.acknowledgeMessage(message.messageId);
495+
setTimeout(() => this.#doWork(), 100);
496+
return;
497+
}
498+
499+
const resumableAttempt = await prisma.taskRunAttempt.findUnique({
500+
where: {
501+
id: messageBody.data.resumableAttemptId,
502+
},
485503
include: {
486-
attempts: {
504+
checkpoints: {
505+
take: 1,
487506
orderBy: {
488507
createdAt: "desc",
489508
},
490-
take: 1,
491-
include: {
492-
checkpoints: {
493-
take: 1,
494-
orderBy: {
495-
createdAt: "desc",
496-
},
497-
},
498-
},
499509
},
500510
},
501511
});
502512

503-
const resumableAttempt = resumableRun?.attempts[0];
504-
505513
if (!resumableAttempt) {
506514
logger.error("Resumable attempt not found", {
507515
queueMessage: message.data,
@@ -553,6 +561,13 @@ export class SharedQueueConsumer {
553561
},
554562
data: {
555563
status: "EXECUTING",
564+
taskRun: {
565+
update: {
566+
data: {
567+
status: "EXECUTING",
568+
},
569+
},
570+
},
556571
},
557572
});
558573

@@ -647,30 +662,21 @@ export class SharedQueueConsumer {
647662
}
648663
// Resume after duration-based wait
649664
case "RESUME_AFTER_DURATION": {
650-
const resumableRun = await prisma.taskRun.findFirst({
665+
const resumableAttempt = await prisma.taskRunAttempt.findUnique({
651666
where: {
652-
id: message.messageId,
667+
id: messageBody.data.resumableAttemptId,
653668
},
654669
include: {
655-
attempts: {
670+
checkpoints: {
671+
take: 1,
656672
orderBy: {
657673
createdAt: "desc",
658674
},
659-
take: 1,
660-
include: {
661-
checkpoints: {
662-
take: 1,
663-
orderBy: {
664-
createdAt: "desc",
665-
},
666-
},
667-
},
668675
},
676+
taskRun: true,
669677
},
670678
});
671679

672-
const resumableAttempt = resumableRun?.attempts[0];
673-
674680
if (!resumableAttempt) {
675681
logger.error("Resumable attempt not found", {
676682
queueMessage: message.data,

0 commit comments

Comments
 (0)