Skip to content

Commit 3d4068a

Browse files
authored
RunEngine 2.0 batch trigger support (#1581)
* Make it clear when BatchTriggerV2Service is used * Copy of BatchTriggerV2Service * WIP batch triggering * Allow blocking a run with multiple waitpoints at once. Made it atomic * Removed unused param * New batch service * Pass through the parentRunId and resumeParentOnCompletion * Use the new batch service, and correct trigger task version * Force V1 engine if using BatchTriggerV2Service, we’ve already done the check at this point * Removed the $transaction and early exit if nothing changed * Adedd a simple batch task to the hello world reference catalog * Fix for batch waits not working * Added parentRunId in a couple more places * Removed waitForBatch log * Added another parentRunId * Expanded the example to include all the different triggers
1 parent 4f954b2 commit 3d4068a

File tree

12 files changed

+1046
-55
lines changed

12 files changed

+1046
-55
lines changed

apps/webapp/app/routes/api.v1.tasks.batch.ts

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,23 @@
11
import { json } from "@remix-run/server-runtime";
22
import {
3-
BatchTriggerTaskResponse,
43
BatchTriggerTaskV2RequestBody,
54
BatchTriggerTaskV2Response,
65
generateJWT,
76
} from "@trigger.dev/core/v3";
87
import { env } from "~/env.server";
8+
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
9+
import { logger } from "~/services/logger.server";
910
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
10-
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
1111
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
12+
import { determineEngineVersion } from "~/v3/engineVersion.server";
13+
import { ServiceValidationError } from "~/v3/services/baseService.server";
1214
import {
1315
BatchProcessingStrategy,
1416
BatchTriggerV2Service,
1517
} from "~/v3/services/batchTriggerV2.server";
16-
import { ServiceValidationError } from "~/v3/services/baseService.server";
18+
import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
1719
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
18-
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
19-
import { logger } from "~/services/logger.server";
20-
import { z } from "zod";
20+
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
2121

2222
const { action, loader } = createActionApiRoute(
2323
{
@@ -87,7 +87,11 @@ const { action, loader } = createActionApiRoute(
8787
resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
8888
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);
8989

90-
const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined);
90+
const version = await determineEngineVersion({ environment: authentication.environment });
91+
const service =
92+
version === "V1"
93+
? new BatchTriggerV2Service(batchProcessingStrategy ?? undefined)
94+
: new BatchTriggerV3Service(batchProcessingStrategy ?? undefined);
9195

9296
try {
9397
const batch = await service.call(authentication.environment, body, {

apps/webapp/app/services/worker.server.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ import {
5656
} from "~/v3/services/cancelDevSessionRuns.server";
5757
import { logger } from "./logger.server";
5858
import { BatchProcessingOptions, BatchTriggerV2Service } from "~/v3/services/batchTriggerV2.server";
59+
import {
60+
BatchProcessingOptions as BatchProcessingOptionsV3,
61+
BatchTriggerV3Service,
62+
} from "~/v3/services/batchTriggerV3.server";
5963

6064
const workerCatalog = {
6165
indexEndpoint: z.object({
@@ -199,6 +203,7 @@ const workerCatalog = {
199203
}),
200204
"v3.cancelDevSessionRuns": CancelDevSessionRunsServiceOptions,
201205
"v3.processBatchTaskRun": BatchProcessingOptions,
206+
"v3.processBatchTaskRunV3": BatchProcessingOptionsV3,
202207
};
203208

204209
const executionWorkerCatalog = {
@@ -735,6 +740,15 @@ function getWorkerQueue() {
735740
handler: async (payload, job) => {
736741
const service = new BatchTriggerV2Service(payload.strategy);
737742

743+
await service.processBatchTaskRun(payload);
744+
},
745+
},
746+
"v3.processBatchTaskRunV3": {
747+
priority: 0,
748+
maxAttempts: 5,
749+
handler: async (payload, job) => {
750+
const service = new BatchTriggerV3Service(payload.strategy);
751+
738752
await service.processBatchTaskRun(payload);
739753
},
740754
},

apps/webapp/app/v3/services/batchTriggerV2.server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@ export type BatchTriggerTaskServiceOptions = {
4949
oneTimeUseToken?: string;
5050
};
5151

52+
/**
53+
* Larger batches, used in Run Engine v1
54+
*/
5255
export class BatchTriggerV2Service extends BaseService {
5356
private _batchProcessingStrategy: BatchProcessingStrategy;
5457

@@ -787,7 +790,8 @@ export class BatchTriggerV2Service extends BaseService {
787790
batchId: batch.friendlyId,
788791
skipChecks: true,
789792
runFriendlyId: task.runFriendlyId,
790-
}
793+
},
794+
"V1"
791795
);
792796

793797
if (!run) {

0 commit comments

Comments
 (0)