Skip to content

Commit bbf397e

Browse files
authored
Completed batch waitpoints when we completed the BatchTaskRun (#1945)
* Completed batch waitpoints when we completed the BatchTaskRun * Try complete the batch faster now it’s being used operationally * Fix for tests that were using the old engine.unblockRunForCreatedBatch() function
1 parent 4a83032 commit bbf397e

File tree

5 files changed

+30
-72
lines changed

5 files changed

+30
-72
lines changed

apps/webapp/app/runEngine/services/batchTrigger.server.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -571,17 +571,6 @@ export class RunEngineBatchTriggerService extends WithRunEngine {
571571

572572
//triggered all the runs
573573
if (updatedBatch.runIds.length === updatedBatch.runCount) {
574-
//unblock the parent run from the batch
575-
//this prevents the parent continuing before all the runs are created
576-
if (parentRunId && resumeParentOnCompletion) {
577-
await this._engine.unblockRunForCreatedBatch({
578-
runId: RunId.fromFriendlyId(parentRunId),
579-
batchId: batch.id,
580-
environmentId: environment.id,
581-
projectId: environment.projectId,
582-
});
583-
}
584-
585574
//if all the runs were idempotent, it's possible the batch is already completed
586575
await this._engine.tryCompleteBatch({ batchId: batch.id });
587576
}

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ export class RunEngine {
290290

291291
this.batchSystem = new BatchSystem({
292292
resources,
293+
waitpointSystem: this.waitpointSystem,
293294
});
294295

295296
this.runAttemptSystem = new RunAttemptSystem({
@@ -905,43 +906,6 @@ export class RunEngine {
905906
}
906907
}
907908

908-
/**
909-
* This is called when all the runs for a batch have been created.
910-
* This does NOT mean that all the runs for the batch are completed.
911-
*/
912-
async unblockRunForCreatedBatch({
913-
runId,
914-
batchId,
915-
tx,
916-
}: {
917-
runId: string;
918-
batchId: string;
919-
environmentId: string;
920-
projectId: string;
921-
tx?: PrismaClientOrTransaction;
922-
}): Promise<void> {
923-
const prisma = tx ?? this.prisma;
924-
925-
const waitpoint = await prisma.waitpoint.findFirst({
926-
where: {
927-
completedByBatchId: batchId,
928-
},
929-
});
930-
931-
if (!waitpoint) {
932-
this.logger.error("RunEngine.unblockRunForBatch(): Waitpoint not found", {
933-
runId,
934-
batchId,
935-
});
936-
throw new ServiceValidationError("Waitpoint not found for batch", 404);
937-
}
938-
939-
await this.completeWaitpoint({
940-
id: waitpoint.id,
941-
output: { value: "Batch waitpoint completed", isError: false },
942-
});
943-
}
944-
945909
async tryCompleteBatch({ batchId }: { batchId: string }): Promise<void> {
946910
return this.batchSystem.scheduleCompleteBatch({ batchId });
947911
}

internal-packages/run-engine/src/engine/systems/batchSystem.ts

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
import { startSpan } from "@internal/tracing";
22
import { isFinalRunStatus } from "../statuses.js";
33
import { SystemResources } from "./systems.js";
4+
import { WaitpointSystem } from "./waitpointSystem.js";
45

56
export type BatchSystemOptions = {
67
resources: SystemResources;
8+
waitpointSystem: WaitpointSystem;
79
};
810

911
export class BatchSystem {
1012
private readonly $: SystemResources;
13+
private readonly waitpointSystem: WaitpointSystem;
1114

1215
constructor(private readonly options: BatchSystemOptions) {
1316
this.$ = options.resources;
17+
this.waitpointSystem = options.waitpointSystem;
1418
}
1519

1620
public async scheduleCompleteBatch({ batchId }: { batchId: string }): Promise<void> {
@@ -19,8 +23,8 @@ export class BatchSystem {
1923
id: `tryCompleteBatch:${batchId}`,
2024
job: "tryCompleteBatch",
2125
payload: { batchId: batchId },
22-
//2s in the future
23-
availableAt: new Date(Date.now() + 2_000),
26+
//200ms in the future
27+
availableAt: new Date(Date.now() + 200),
2428
});
2529
}
2630

@@ -75,6 +79,28 @@ export class BatchSystem {
7579
status: "COMPLETED",
7680
},
7781
});
82+
83+
//get waitpoint (if there is one)
84+
const waitpoint = await this.$.prisma.waitpoint.findFirst({
85+
where: {
86+
completedByBatchId: batchId,
87+
},
88+
});
89+
90+
if (!waitpoint) {
91+
this.$.logger.debug(
92+
"RunEngine.unblockRunForBatch(): Waitpoint not found. This is ok, because only batchTriggerAndWait has waitpoints",
93+
{
94+
batchId,
95+
}
96+
);
97+
return;
98+
}
99+
100+
await this.waitpointSystem.completeWaitpoint({
101+
id: waitpoint.id,
102+
output: { value: "Batch waitpoint completed", isError: false },
103+
});
78104
} else {
79105
this.$.logger.debug("#tryCompleteBatch: Not all runs are completed", { batchId });
80106
}

internal-packages/run-engine/src/engine/tests/batchTriggerAndWait.test.ts

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -191,13 +191,6 @@ describe("RunEngine batchTriggerAndWait", () => {
191191
expect(batchWaitpoint?.waitpoint.type).toBe("BATCH");
192192
expect(batchWaitpoint?.waitpoint.completedByBatchId).toBe(batch.id);
193193

194-
await engine.unblockRunForCreatedBatch({
195-
runId: parentRun.id,
196-
batchId: batch.id,
197-
environmentId: authenticatedEnvironment.id,
198-
projectId: authenticatedEnvironment.projectId,
199-
});
200-
201194
//dequeue and start the 1st child
202195
const dequeuedChild = await engine.dequeueFromMasterQueue({
203196
consumerId: "test_12345",
@@ -303,7 +296,7 @@ describe("RunEngine batchTriggerAndWait", () => {
303296
expect(child2WaitpointAfter?.status).toBe("COMPLETED");
304297
expect(child2WaitpointAfter?.output).toBe('{"baz":"qux"}');
305298

306-
await setTimeout(500);
299+
await setTimeout(1_000);
307300

308301
const runWaitpointsAfterSecondChild = await prisma.taskRunWaitpoint.findMany({
309302
where: {
@@ -497,13 +490,6 @@ describe("RunEngine batchTriggerAndWait", () => {
497490
expect(parentAfterBatchChild.snapshot.executionStatus).toBe("EXECUTING_WITH_WAITPOINTS");
498491
expect(parentAfterBatchChild.batch?.id).toBe(batch.id);
499492

500-
await engine.unblockRunForCreatedBatch({
501-
runId: parentRun.id,
502-
batchId: batch.id,
503-
environmentId: authenticatedEnvironment.id,
504-
projectId: authenticatedEnvironment.projectId,
505-
});
506-
507493
//dequeue and start the batch child
508494
const dequeuedBatchChild = await engine.dequeueFromMasterQueue({
509495
consumerId: "test_12345",

internal-packages/run-engine/src/engine/tests/checkpoints.test.ts

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1166,13 +1166,6 @@ describe("RunEngine checkpoints", () => {
11661166
expect(batchWaitpoint?.waitpoint.type).toBe("BATCH");
11671167
expect(batchWaitpoint?.waitpoint.completedByBatchId).toBe(batch.id);
11681168

1169-
await engine.unblockRunForCreatedBatch({
1170-
runId: parentRun.id,
1171-
batchId: batch.id,
1172-
environmentId: authenticatedEnvironment.id,
1173-
projectId: authenticatedEnvironment.projectId,
1174-
});
1175-
11761169
// Create a checkpoint
11771170
const checkpointResult = await engine.createCheckpoint({
11781171
runId: parentRun.id,

0 commit comments

Comments
 (0)