Skip to content

RunEngine 2.0 batch trigger support #1581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
5526dab
Make it clear when BatchTriggerV2Service is used
matt-aitken Dec 17, 2024
a469365
Copy of BatchTriggerV2Service
matt-aitken Dec 17, 2024
64c4a99
WIP batch triggering
matt-aitken Dec 19, 2024
c1306eb
Merge remote-tracking branch 'origin/run-engine-2' into run-engine-ba…
matt-aitken Dec 19, 2024
6259b3d
Allow blocking a run with multiple waitpoints at once. Made it atomic
matt-aitken Dec 29, 2024
adbe83e
Removed unused param
matt-aitken Dec 29, 2024
fb08da1
New batch service
matt-aitken Dec 29, 2024
6aad704
Merge remote-tracking branch 'origin/run-engine-2' into run-engine-ba…
matt-aitken Dec 29, 2024
33b7607
Pass through the parentRunId and resumeParentOnCompletion
matt-aitken Dec 29, 2024
44d954a
Use the new batch service, and correct trigger task version
matt-aitken Dec 29, 2024
31a354a
Force V1 engine if using BatchTriggerV2Service, we’ve already done th…
matt-aitken Dec 30, 2024
45ada5c
Removed the $transaction and early exit if nothing changed
matt-aitken Dec 31, 2024
f0c250f
Merge remote-tracking branch 'origin/run-engine-2' into run-engine-ba…
matt-aitken Dec 31, 2024
98fad73
Adedd a simple batch task to the hello world reference catalog
matt-aitken Dec 31, 2024
8b33031
Fix for batch waits not working
matt-aitken Jan 2, 2025
c308ad2
Added parentRunId in a couple more places
matt-aitken Jan 2, 2025
f0c9fa4
Removed waitForBatch log
matt-aitken Jan 2, 2025
7de5540
Added another parentRunId
matt-aitken Jan 2, 2025
7096239
Expanded the example to include all the different triggers
matt-aitken Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions apps/webapp/app/routes/api.v1.tasks.batch.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import { json } from "@remix-run/server-runtime";
import {
BatchTriggerTaskResponse,
BatchTriggerTaskV2RequestBody,
BatchTriggerTaskV2Response,
generateJWT,
} from "@trigger.dev/core/v3";
import { env } from "~/env.server";
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { determineEngineVersion } from "~/v3/engineVersion.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import {
BatchProcessingStrategy,
BatchTriggerV2Service,
} from "~/v3/services/batchTriggerV2.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { z } from "zod";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";

const { action, loader } = createActionApiRoute(
{
Expand Down Expand Up @@ -87,7 +87,11 @@ const { action, loader } = createActionApiRoute(
resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);

const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined);
const version = await determineEngineVersion({ environment: authentication.environment });
const service =
version === "V1"
? new BatchTriggerV2Service(batchProcessingStrategy ?? undefined)
: new BatchTriggerV3Service(batchProcessingStrategy ?? undefined);

try {
const batch = await service.call(authentication.environment, body, {
Expand Down
14 changes: 14 additions & 0 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ import {
} from "~/v3/services/cancelDevSessionRuns.server";
import { logger } from "./logger.server";
import { BatchProcessingOptions, BatchTriggerV2Service } from "~/v3/services/batchTriggerV2.server";
import {
BatchProcessingOptions as BatchProcessingOptionsV3,
BatchTriggerV3Service,
} from "~/v3/services/batchTriggerV3.server";

const workerCatalog = {
indexEndpoint: z.object({
Expand Down Expand Up @@ -199,6 +203,7 @@ const workerCatalog = {
}),
"v3.cancelDevSessionRuns": CancelDevSessionRunsServiceOptions,
"v3.processBatchTaskRun": BatchProcessingOptions,
"v3.processBatchTaskRunV3": BatchProcessingOptionsV3,
};

const executionWorkerCatalog = {
Expand Down Expand Up @@ -735,6 +740,15 @@ function getWorkerQueue() {
handler: async (payload, job) => {
const service = new BatchTriggerV2Service(payload.strategy);

await service.processBatchTaskRun(payload);
},
},
"v3.processBatchTaskRunV3": {
priority: 0,
maxAttempts: 5,
handler: async (payload, job) => {
const service = new BatchTriggerV3Service(payload.strategy);

await service.processBatchTaskRun(payload);
},
},
Expand Down
6 changes: 5 additions & 1 deletion apps/webapp/app/v3/services/batchTriggerV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ export type BatchTriggerTaskServiceOptions = {
oneTimeUseToken?: string;
};

/**
* Larger batches, used in Run Engine v1
*/
export class BatchTriggerV2Service extends BaseService {
private _batchProcessingStrategy: BatchProcessingStrategy;

Expand Down Expand Up @@ -787,7 +790,8 @@ export class BatchTriggerV2Service extends BaseService {
batchId: batch.friendlyId,
skipChecks: true,
runFriendlyId: task.runFriendlyId,
}
},
"V1"
);

if (!run) {
Expand Down
Loading