Skip to content

Commit b2f2689

Browse files
committed
Completed batch waitpoints when we completed the BatchTaskRun
1 parent 4a83032 commit b2f2689

File tree

3 files changed

+27
-48
lines changed

3 files changed

+27
-48
lines changed

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -571,17 +571,6 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
571571

572572
//triggered all the runs
573573
if (updatedBatch.runIds.length === updatedBatch.runCount) {
574-
//unblock the parent run from the batch
575-
//this prevents the parent continuing before all the runs are created
576-
if (parentRunId && resumeParentOnCompletion) {
577-
await this._engine.unblockRunForCreatedBatch({
578-
runId: RunId.fromFriendlyId(parentRunId),
579-
batchId: batch.id,
580-
environmentId: environment.id,
581-
projectId: environment.projectId,
582-
});
583-
}
584-
585574
//if all the runs were idempotent, it's possible the batch is already completed
586575
await this._engine.tryCompleteBatch({ batchId: batch.id });
587576
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ export class RunEngine {
290290

291291
this.batchSystem = new BatchSystem({
292292
resources,
293+
waitpointSystem: this.waitpointSystem,
293294
});
294295

295296
this.runAttemptSystem = new RunAttemptSystem({
@@ -905,43 +906,6 @@ export class RunEngine {
905906
}
906907
}
907908

908-
/**
909-
* This is called when all the runs for a batch have been created.
910-
* This does NOT mean that all the runs for the batch are completed.
911-
*/
912-
async unblockRunForCreatedBatch({
913-
runId,
914-
batchId,
915-
tx,
916-
}: {
917-
runId: string;
918-
batchId: string;
919-
environmentId: string;
920-
projectId: string;
921-
tx?: PrismaClientOrTransaction;
922-
}): Promise<void> {
923-
const prisma = tx ?? this.prisma;
924-
925-
const waitpoint = await prisma.waitpoint.findFirst({
926-
where: {
927-
completedByBatchId: batchId,
928-
},
929-
});
930-
931-
if (!waitpoint) {
932-
this.logger.error("RunEngine.unblockRunForBatch(): Waitpoint not found", {
933-
runId,
934-
batchId,
935-
});
936-
throw new ServiceValidationError("Waitpoint not found for batch", 404);
937-
}
938-
939-
await this.completeWaitpoint({
940-
id: waitpoint.id,
941-
output: { value: "Batch waitpoint completed", isError: false },
942-
});
943-
}
944-
945909
async tryCompleteBatch({ batchId }: { batchId: string }): Promise<void> {
946910
return this.batchSystem.scheduleCompleteBatch({ batchId });
947911
}

internal-packages/run-engine/src/engine/systems/batchSystem.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
import { startSpan } from "@internal/tracing";
22
import { isFinalRunStatus } from "../statuses.js";
33
import { SystemResources } from "./systems.js";
4+
import { WaitpointSystem } from "./waitpointSystem.js";
45

56
export type BatchSystemOptions = {
67
resources: SystemResources;
8+
waitpointSystem: WaitpointSystem;
79
};
810

911
export class BatchSystem {
1012
private readonly $: SystemResources;
13+
private readonly waitpointSystem: WaitpointSystem;
1114

1215
constructor(private readonly options: BatchSystemOptions) {
1316
this.$ = options.resources;
17+
this.waitpointSystem = options.waitpointSystem;
1418
}
1519

1620
public async scheduleCompleteBatch({ batchId }: { batchId: string }): Promise<void> {
@@ -75,6 +79,28 @@ export class BatchSystem {
7579
status: "COMPLETED",
7680
},
7781
});
82+
83+
//get waitpoint (if there is one)
84+
const waitpoint = await this.$.prisma.waitpoint.findFirst({
85+
where: {
86+
completedByBatchId: batchId,
87+
},
88+
});
89+
90+
if (!waitpoint) {
91+
this.$.logger.debug(
92+
"RunEngine.unblockRunForBatch(): Waitpoint not found. This is ok, because only batchTriggerAndWait has waitpoints",
93+
{
94+
batchId,
95+
}
96+
);
97+
return;
98+
}
99+
100+
await this.waitpointSystem.completeWaitpoint({
101+
id: waitpoint.id,
102+
output: { value: "Batch waitpoint completed", isError: false },
103+
});
78104
} else {
79105
this.$.logger.debug("#tryCompleteBatch: Not all runs are completed", { batchId });
80106
}

0 commit comments

Comments
 (0)