Skip to content

Commit 84ee4d3

Browse files
committed
Make sure to complete batches, even if they’re not andWait ones
1 parent 5ac9673 commit 84ee4d3

File tree

3 files changed

+55
-19
lines changed

3 files changed

+55
-19
lines changed

apps/webapp/app/v3/services/batchTriggerV3.server.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -567,17 +567,21 @@ export class BatchTriggerV3Service extends WithRunEngine {
567567
return { status: "INCOMPLETE", workingIndex };
568568
}
569569

570-
if (
571-
parentRunId &&
572-
resumeParentOnCompletion &&
573-
updatedBatch.runIds.length === updatedBatch.runCount
574-
) {
575-
await this._engine.unblockRunForCreatedBatch({
576-
runId: RunId.fromFriendlyId(parentRunId),
577-
batchId: batch.id,
578-
environmentId: environment.id,
579-
projectId: environment.projectId,
580-
});
570+
//triggered all the runs
571+
if (updatedBatch.runIds.length === updatedBatch.runCount) {
572+
//unblock the parent run from the batch
573+
//this prevents the parent continuing before all the runs are created
574+
if (parentRunId && resumeParentOnCompletion) {
575+
await this._engine.unblockRunForCreatedBatch({
576+
runId: RunId.fromFriendlyId(parentRunId),
577+
batchId: batch.id,
578+
environmentId: environment.id,
579+
projectId: environment.projectId,
580+
});
581+
}
582+
583+
//if all the runs were idempotent, it's possible the batch is already completed
584+
await this._engine.tryCompleteBatch({ batchId: batch.id });
581585
}
582586

583587
return { status: "COMPLETE" };

internal-packages/run-engine/src/engine/index.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1662,7 +1662,9 @@ export class RunEngine {
16621662
id: waitpoint.id,
16631663
output: { value: "Batch waitpoint completed", isError: false },
16641664
});
1665+
}
16651666

1667+
async tryCompleteBatch({ batchId }: { batchId: string }): Promise<void> {
16661668
await this.worker.enqueue({
16671669
//this will debounce the call
16681670
id: `tryCompleteBatch:${batchId}`,
@@ -3187,14 +3189,7 @@ export class RunEngine {
31873189
*/
31883190
async #finalizeRun({ id, batchId }: { id: string; batchId: string | null }) {
31893191
if (batchId) {
3190-
await this.worker.enqueue({
3191-
//this will debounce the call
3192-
id: `tryCompleteBatch:${batchId}`,
3193-
job: "tryCompleteBatch",
3194-
payload: { batchId: batchId },
3195-
//2s in the future
3196-
availableAt: new Date(Date.now() + 2_000),
3197-
});
3192+
await this.tryCompleteBatch({ batchId });
31983193
}
31993194
}
32003195

references/hello-world/src/trigger/idempotency.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,3 +245,40 @@ export const idempotencyBatch = task({
245245
logger.log("Results 4", { results4 });
246246
},
247247
});
248+
249+
export const idempotencyTriggerByTaskAndWait = task({
250+
id: "idempotency-trigger-by-task-and-wait",
251+
maxDuration: 60,
252+
run: async () => {
253+
const successfulKey = await idempotencyKeys.create("a", { scope: "global" });
254+
const failureKey = await idempotencyKeys.create("b", { scope: "global" });
255+
256+
const results1 = await batch.triggerByTaskAndWait([
257+
{
258+
task: childTask,
259+
payload: { message: "Hello, world !" },
260+
options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "60s" },
261+
},
262+
{
263+
task: childTask,
264+
payload: { message: "Hello, world 2!" },
265+
options: { idempotencyKey: failureKey, idempotencyKeyTTL: "60s" },
266+
},
267+
]);
268+
logger.log("Results 1", { results1 });
269+
270+
const results2 = await batch.triggerByTaskAndWait([
271+
{
272+
task: childTask,
273+
payload: { message: "Hello, world !" },
274+
options: { idempotencyKey: successfulKey, idempotencyKeyTTL: "60s" },
275+
},
276+
{
277+
task: childTask,
278+
payload: { message: "Hello, world 2!" },
279+
options: { idempotencyKey: failureKey, idempotencyKeyTTL: "60s" },
280+
},
281+
]);
282+
logger.log("Results 2", { results2 });
283+
},
284+
});

0 commit comments

Comments
 (0)