Skip to content

Optionally trigger batched items sequentially to preserve order #1536

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/sweet-suits-kick.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

Add option to trigger batched items sequentially, and default to parallel triggering which is faster
14 changes: 11 additions & 3 deletions apps/webapp/app/routes/api.v1.tasks.batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,21 @@ import { env } from "~/env.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
import { BatchTriggerV2Service } from "~/v3/services/batchTriggerV2.server";
import {
BatchProcessingStrategy,
BatchTriggerV2Service,
} from "~/v3/services/batchTriggerV2.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { z } from "zod";

const { action, loader } = createActionApiRoute(
{
headers: HeadersSchema,
headers: HeadersSchema.extend({
"batch-processing-strategy": BatchProcessingStrategy.nullish(),
}),
body: BatchTriggerTaskV2RequestBody,
allowJWT: true,
maxContentLength: env.BATCH_TASK_PAYLOAD_MAXIMUM_SIZE,
Expand Down Expand Up @@ -52,6 +58,7 @@ const { action, loader } = createActionApiRoute(
"x-trigger-span-parent-as-link": spanParentAsLink,
"x-trigger-worker": isFromWorker,
"x-trigger-client": triggerClient,
"batch-processing-strategy": batchProcessingStrategy,
traceparent,
tracestate,
} = headers;
Expand All @@ -67,6 +74,7 @@ const { action, loader } = createActionApiRoute(
triggerClient,
traceparent,
tracestate,
batchProcessingStrategy,
});

const traceContext =
Expand All @@ -79,7 +87,7 @@ const { action, loader } = createActionApiRoute(
resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);

const service = new BatchTriggerV2Service();
const service = new BatchTriggerV2Service(batchProcessingStrategy ?? undefined);

try {
const batch = await service.call(authentication.environment, body, {
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/routes/api.v3.runs.$runId.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export const loader = createLoaderApiRoute(
findResource: (params, auth) => {
return ApiRetrieveRunPresenter.findRun(params.runId, auth.environment);
},
shouldRetryNotFound: true,
authorization: {
action: "read",
resource: (run) => ({
Expand Down
7 changes: 6 additions & 1 deletion apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ApiKeyRouteBuilderOptions<
params: TParamsSchema extends z.AnyZodObject ? z.infer<TParamsSchema> : undefined,
authentication: ApiAuthenticationResultSuccess
) => Promise<TResource | undefined>;
shouldRetryNotFound?: boolean;
authorization?: {
action: AuthorizationAction;
resource: (
Expand Down Expand Up @@ -81,6 +82,7 @@ export function createLoaderApiRoute<
corsStrategy = "none",
authorization,
findResource,
shouldRetryNotFound,
} = options;

if (corsStrategy !== "none" && request.method.toUpperCase() === "OPTIONS") {
Expand Down Expand Up @@ -162,7 +164,10 @@ export function createLoaderApiRoute<
if (!resource) {
return await wrapResponse(
request,
json({ error: "Not found" }, { status: 404 }),
json(
{ error: "Not found" },
{ status: 404, headers: { "x-should-retry": shouldRetryNotFound ? "true" : "false" } }
),
corsStrategy !== "none"
);
}
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ function getWorkerQueue() {
priority: 0,
maxAttempts: 5,
handler: async (payload, job) => {
const service = new BatchTriggerV2Service();
const service = new BatchTriggerV2Service(payload.strategy);

await service.processBatchTaskRun(payload);
},
Expand Down
26 changes: 17 additions & 9 deletions apps/webapp/app/v3/services/batchTriggerV2.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
parsePacket,
} from "@trigger.dev/core/v3";
import { BatchTaskRun, Prisma, TaskRunAttempt } from "@trigger.dev/database";
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
import { $transaction, prisma, PrismaClientOrTransaction } from "~/db.server";
import { env } from "~/env.server";
import { batchTaskRunItemStatusForRunStatus } from "~/models/taskRun.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
Expand All @@ -26,11 +26,8 @@ import { z } from "zod";
const PROCESSING_BATCH_SIZE = 50;
const ASYNC_BATCH_PROCESS_SIZE_THRESHOLD = 20;

const BatchProcessingStrategy = z.enum(["sequential", "parallel"]);

type BatchProcessingStrategy = z.infer<typeof BatchProcessingStrategy>;

const CURRENT_STRATEGY: BatchProcessingStrategy = "parallel";
export const BatchProcessingStrategy = z.enum(["sequential", "parallel"]);
export type BatchProcessingStrategy = z.infer<typeof BatchProcessingStrategy>;

export const BatchProcessingOptions = z.object({
batchId: z.string(),
Expand All @@ -52,6 +49,17 @@ export type BatchTriggerTaskServiceOptions = {
};

export class BatchTriggerV2Service extends BaseService {
private _batchProcessingStrategy: BatchProcessingStrategy;

constructor(
batchProcessingStrategy?: BatchProcessingStrategy,
protected readonly _prisma: PrismaClientOrTransaction = prisma
) {
super(_prisma);

this._batchProcessingStrategy = batchProcessingStrategy ?? "parallel";
}

public async call(
environment: AuthenticatedEnvironment,
body: BatchTriggerTaskV2RequestBody,
Expand Down Expand Up @@ -452,14 +460,14 @@ export class BatchTriggerV2Service extends BaseService {
},
});

switch (CURRENT_STRATEGY) {
switch (this._batchProcessingStrategy) {
case "sequential": {
await this.#enqueueBatchTaskRun({
batchId: batch.id,
processingId: batchId,
range: { start: 0, count: PROCESSING_BATCH_SIZE },
attemptCount: 0,
strategy: CURRENT_STRATEGY,
strategy: this._batchProcessingStrategy,
});

break;
Expand All @@ -480,7 +488,7 @@ export class BatchTriggerV2Service extends BaseService {
processingId: `${index}`,
range,
attemptCount: 0,
strategy: CURRENT_STRATEGY,
strategy: this._batchProcessingStrategy,
},
tx
)
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/v3/apiClient/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ export type ClientTriggerOptions = {
export type ClientBatchTriggerOptions = ClientTriggerOptions & {
idempotencyKey?: string;
idempotencyKeyTTL?: string;
processingStrategy?: "parallel" | "sequential";
};

export type TriggerRequestOptions = ZodFetchOptions & {
Expand Down Expand Up @@ -239,6 +240,7 @@ export class ApiClient {
headers: this.#getHeaders(clientOptions?.spanParentAsLink ?? false, {
"idempotency-key": clientOptions?.idempotencyKey,
"idempotency-key-ttl": clientOptions?.idempotencyKeyTTL,
"batch-processing-strategy": clientOptions?.processingStrategy,
}),
body: JSON.stringify(body),
},
Expand Down
29 changes: 28 additions & 1 deletion packages/core/src/v3/types/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,8 @@ export interface Task<TIdentifier extends string, TInput = void, TOutput = any>
* ```
*/
batchTriggerAndWait: (
items: Array<BatchTriggerAndWaitItem<TInput>>
items: Array<BatchTriggerAndWaitItem<TInput>>,
options?: BatchTriggerAndWaitOptions
) => Promise<BatchResult<TIdentifier, TOutput>>;
}

Expand Down Expand Up @@ -781,6 +782,32 @@ export type TriggerAndWaitOptions = Omit<TriggerOptions, "idempotencyKey" | "ide
export type BatchTriggerOptions = {
idempotencyKey?: IdempotencyKey | string | string[];
idempotencyKeyTTL?: string;

/**
* When true, triggers tasks sequentially in batch order. This ensures ordering but may be slower,
* especially for large batches.
*
* When false (default), triggers tasks in parallel for better performance, but order is not guaranteed.
*
* Note: This only affects the order of run creation, not the actual task execution.
*
* @default false
*/
triggerSequentially?: boolean;
};

export type BatchTriggerAndWaitOptions = {
/**
* When true, triggers tasks sequentially in batch order. This ensures ordering but may be slower,
* especially for large batches.
*
* When false (default), triggers tasks in parallel for better performance, but order is not guaranteed.
*
* Note: This only affects the order of run creation, not the actual task execution.
*
* @default false
*/
triggerSequentially?: boolean;
};

export type TaskMetadataWithFunctions = TaskMetadata & {
Expand Down
28 changes: 22 additions & 6 deletions packages/trigger-sdk/src/v3/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ import type {
TriggerApiRequestOptions,
TriggerOptions,
AnyTaskRunResult,
BatchTriggerAndWaitOptions,
} from "@trigger.dev/core/v3";

export type {
Expand Down Expand Up @@ -181,7 +182,7 @@ export function createTask<
});
}, params.id);
},
batchTriggerAndWait: async (items) => {
batchTriggerAndWait: async (items, options) => {
const taskMetadata = taskCatalog.getTaskManifest(params.id);

return await batchTriggerAndWait_internal<TIdentifier, TInput, TOutput>(
Expand All @@ -191,6 +192,7 @@ export function createTask<
params.id,
items,
undefined,
options,
undefined,
customQueue
);
Expand Down Expand Up @@ -326,7 +328,7 @@ export function createSchemaTask<
});
}, params.id);
},
batchTriggerAndWait: async (items) => {
batchTriggerAndWait: async (items, options) => {
const taskMetadata = taskCatalog.getTaskManifest(params.id);

return await batchTriggerAndWait_internal<TIdentifier, inferSchemaIn<TSchema>, TOutput>(
Expand All @@ -336,6 +338,7 @@ export function createSchemaTask<
params.id,
items,
parsePayload,
options,
undefined,
customQueue
);
Expand Down Expand Up @@ -469,13 +472,14 @@ export function triggerAndWait<TTask extends AnyTask>(
export async function batchTriggerAndWait<TTask extends AnyTask>(
id: TaskIdentifier<TTask>,
items: Array<BatchItem<TaskPayload<TTask>>>,
options?: BatchTriggerAndWaitOptions,
requestOptions?: ApiRequestOptions
): Promise<BatchResult<TaskIdentifier<TTask>, TaskOutput<TTask>>> {
return await batchTriggerAndWait_internal<
TaskIdentifier<TTask>,
TaskPayload<TTask>,
TaskOutput<TTask>
>("tasks.batchTriggerAndWait()", id, items, undefined, requestOptions);
>("tasks.batchTriggerAndWait()", id, items, undefined, options, requestOptions);
}

/**
Expand Down Expand Up @@ -618,6 +622,7 @@ export async function batchTriggerById<TTask extends AnyTask>(
spanParentAsLink: true,
idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey),
idempotencyKeyTTL: options?.idempotencyKeyTTL,
processingStrategy: options?.triggerSequentially ? "sequential" : undefined,
},
{
name: "batch.trigger()",
Expand Down Expand Up @@ -740,6 +745,7 @@ export async function batchTriggerById<TTask extends AnyTask>(
*/
export async function batchTriggerByIdAndWait<TTask extends AnyTask>(
items: Array<BatchByIdAndWaitItem<InferRunTypes<TTask>>>,
options?: BatchTriggerAndWaitOptions,
requestOptions?: TriggerApiRequestOptions
): Promise<BatchByIdResult<TTask>> {
const ctx = taskContext.ctx;
Expand Down Expand Up @@ -786,7 +792,9 @@ export async function batchTriggerByIdAndWait<TTask extends AnyTask>(
),
dependentAttempt: ctx.attempt.id,
},
{},
{
processingStrategy: options?.triggerSequentially ? "sequential" : undefined,
},
requestOptions
);

Expand Down Expand Up @@ -948,6 +956,7 @@ export async function batchTriggerTasks<TTasks extends readonly AnyTask[]>(
spanParentAsLink: true,
idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey),
idempotencyKeyTTL: options?.idempotencyKeyTTL,
processingStrategy: options?.triggerSequentially ? "sequential" : undefined,
},
{
name: "batch.triggerByTask()",
Expand Down Expand Up @@ -1072,6 +1081,7 @@ export async function batchTriggerAndWaitTasks<TTasks extends readonly AnyTask[]
items: {
[K in keyof TTasks]: BatchByTaskAndWaitItem<TTasks[K]>;
},
options?: BatchTriggerAndWaitOptions,
requestOptions?: TriggerApiRequestOptions
): Promise<BatchByTaskResult<TTasks>> {
const ctx = taskContext.ctx;
Expand Down Expand Up @@ -1118,7 +1128,9 @@ export async function batchTriggerAndWaitTasks<TTasks extends readonly AnyTask[]
),
dependentAttempt: ctx.attempt.id,
},
{},
{
processingStrategy: options?.triggerSequentially ? "sequential" : undefined,
},
requestOptions
);

Expand Down Expand Up @@ -1256,6 +1268,7 @@ async function batchTrigger_internal<TRunTypes extends AnyRunTypes>(
spanParentAsLink: true,
idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey),
idempotencyKeyTTL: options?.idempotencyKeyTTL,
processingStrategy: options?.triggerSequentially ? "sequential" : undefined,
},
{
name,
Expand Down Expand Up @@ -1377,6 +1390,7 @@ async function batchTriggerAndWait_internal<TIdentifier extends string, TPayload
id: TIdentifier,
items: Array<BatchTriggerAndWaitItem<TPayload>>,
parsePayload?: SchemaParseFn<TPayload>,
options?: BatchTriggerAndWaitOptions,
requestOptions?: ApiRequestOptions,
queue?: QueueOptions
): Promise<BatchResult<TIdentifier, TOutput>> {
Expand Down Expand Up @@ -1420,7 +1434,9 @@ async function batchTriggerAndWait_internal<TIdentifier extends string, TPayload
),
dependentAttempt: ctx.attempt.id,
},
{},
{
processingStrategy: options?.triggerSequentially ? "sequential" : undefined,
},
requestOptions
);

Expand Down
Loading