Skip to content

v3: fix triggering tasks with custom queues #1242

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/few-poems-vanish.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/sdk": patch
---

Fix trigger functions for custom queues
8 changes: 0 additions & 8 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,14 +274,6 @@ export class SharedQueueConsumer {
where: {
id: message.messageId,
},
include: {
lockedToVersion: {
include: {
deployment: true,
tasks: true,
},
},
},
});

if (!existingTaskRun) {
Expand Down
21 changes: 21 additions & 0 deletions apps/webapp/app/v3/models/workerDeployment.server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { Prettify } from "@trigger.dev/core";
import { BackgroundWorker } from "@trigger.dev/database";
import { CURRENT_DEPLOYMENT_LABEL } from "~/consts";
import { Prisma, prisma } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";

export type CurrentWorkerDeployment = Prettify<
NonNullable<Awaited<ReturnType<typeof findCurrentWorkerDeployment>>>
Expand Down Expand Up @@ -42,6 +44,25 @@ export async function findCurrentWorkerDeployment(
return promotion?.deployment;
}

export async function findCurrentWorkerFromEnvironment(
environment: Pick<AuthenticatedEnvironment, "id" | "type">
): Promise<BackgroundWorker | null> {
if (environment.type === "DEVELOPMENT") {
const latestDevWorker = await prisma.backgroundWorker.findFirst({
where: {
runtimeEnvironmentId: environment.id,
},
orderBy: {
createdAt: "desc",
},
});
return latestDevWorker;
} else {
const deployment = await findCurrentWorkerDeployment(environment.id);
return deployment?.worker ?? null;
}
}

export async function getWorkerDeploymentFromWorker(
workerId: string
): Promise<WorkerDeploymentWithWorkerTasks | undefined> {
Expand Down
57 changes: 56 additions & 1 deletion apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
IOPacket,
QueueOptions,
SemanticInternalAttributes,
TriggerTaskRequestBody,
packetRequiresOffloading,
Expand All @@ -18,6 +19,7 @@ import { BaseService, ServiceValidationError } from "./baseService.server";
import { logger } from "~/services/logger.server";
import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";

export type TriggerTaskServiceOptions = {
idempotencyKey?: string;
Expand Down Expand Up @@ -211,7 +213,9 @@ export class TriggerTaskService extends BaseService {
})
: undefined;

let queueName = sanitizeQueueName(body.options?.queue?.name ?? `task/${taskId}`);
let queueName = sanitizeQueueName(
await this.#getQueueName(taskId, environment, body.options?.queue?.name)
);

// Check that the queuename is not an empty string
if (!queueName) {
Expand Down Expand Up @@ -399,6 +403,57 @@ export class TriggerTaskService extends BaseService {
});
}

async #getQueueName(taskId: string, environment: AuthenticatedEnvironment, queueName?: string) {
if (queueName) {
return queueName;
}

const defaultQueueName = `task/${taskId}`;

const worker = await findCurrentWorkerFromEnvironment(environment);

if (!worker) {
logger.debug("Failed to get queue name: No worker found", {
taskId,
environmentId: environment.id,
});

return defaultQueueName;
}

const task = await this._prisma.backgroundWorkerTask.findUnique({
where: {
workerId_slug: {
workerId: worker.id,
slug: taskId,
},
},
});

if (!task) {
console.log("Failed to get queue name: No task found", {
taskId,
environmentId: environment.id,
});

return defaultQueueName;
}

const queueConfig = QueueOptions.optional().safeParse(task.queueConfig);

if (!queueConfig.success) {
console.log("Failed to get queue name: Invalid queue config", {
taskId,
environmentId: environment.id,
queueConfig: task.queueConfig,
});

return defaultQueueName;
}

return queueConfig.data?.name ?? defaultQueueName;
}

async #handlePayloadPacket(
payload: any,
payloadType: string,
Expand Down
35 changes: 27 additions & 8 deletions packages/trigger-sdk/src/v3/shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,13 @@ export function createTask<
>(
params: TaskOptions<TIdentifier, TInput, TOutput, TInitOutput>
): Task<TIdentifier, TInput, TOutput> {
const customQueue = params.queue
? queue({
name: params.queue?.name ?? `task/${params.id}`,
...params.queue,
})
: undefined;

const task: Task<TIdentifier, TInput, TOutput> = {
id: params.id,
trigger: async (payload, options) => {
Expand All @@ -487,7 +494,10 @@ export function createTask<
: `trigger()`,
params.id,
payload,
options
{
queue: customQueue,
...options,
}
);
},
batchTrigger: async (items) => {
Expand All @@ -498,7 +508,9 @@ export function createTask<
? `${taskMetadata.exportName}.batchTrigger()`
: `batchTrigger()`,
params.id,
items
items,
undefined,
customQueue
);
},
triggerAndWait: async (payload, options) => {
Expand All @@ -510,7 +522,10 @@ export function createTask<
: `triggerAndWait()`,
params.id,
payload,
options
{
queue: customQueue,
...options,
}
);
},
batchTriggerAndWait: async (items) => {
Expand All @@ -521,7 +536,9 @@ export function createTask<
? `${taskMetadata.exportName}.batchTriggerAndWait()`
: `batchTriggerAndWait()`,
params.id,
items
items,
undefined,
customQueue
);
},
};
Expand Down Expand Up @@ -758,7 +775,8 @@ async function batchTrigger_internal<TPayload, TOutput>(
name: string,
id: string,
items: Array<BatchItem<TPayload>>,
requestOptions?: ApiRequestOptions
requestOptions?: ApiRequestOptions,
queue?: QueueOptions
): Promise<BatchRunHandle<TOutput>> {
const apiClient = apiClientManager.client;

Expand All @@ -776,7 +794,7 @@ async function batchTrigger_internal<TPayload, TOutput>(
return {
payload: payloadPacket.data,
options: {
queue: item.options?.queue,
queue: item.options?.queue ?? queue,
concurrencyKey: item.options?.concurrencyKey,
test: taskContext.ctx?.run.isTest,
payloadType: payloadPacket.dataType,
Expand Down Expand Up @@ -919,7 +937,8 @@ async function batchTriggerAndWait_internal<TPayload, TOutput>(
name: string,
id: string,
items: Array<BatchItem<TPayload>>,
requestOptions?: ApiRequestOptions
requestOptions?: ApiRequestOptions,
queue?: QueueOptions
): Promise<BatchResult<TOutput>> {
const ctx = taskContext.ctx;

Expand Down Expand Up @@ -947,7 +966,7 @@ async function batchTriggerAndWait_internal<TPayload, TOutput>(
payload: payloadPacket.data,
options: {
lockToVersion: taskContext.worker?.version,
queue: item.options?.queue,
queue: item.options?.queue ?? queue,
concurrencyKey: item.options?.concurrencyKey,
test: taskContext.ctx?.run.isTest,
payloadType: payloadPacket.dataType,
Expand Down
4 changes: 2 additions & 2 deletions references/v3-catalog/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
},
"scripts": {
"dev:trigger": "triggerdev dev",
"management": "ts-node -r tsconfig-paths/register ./src/management.ts",
"queues": "ts-node -r tsconfig-paths/register ./src/queues.ts",
"management": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/management.ts",
"queues": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/queues.ts",
"build:client": "tsup-node ./src/clientUsage.ts --format esm,cjs",
"client": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/clientUsage.ts",
"triggerWithLargePayload": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/triggerWithLargePayload.ts",
Expand Down
Loading