Skip to content

Commit 46eed03

Browse files
committed
Added pause/resume functions to the SDK
1 parent 03fb6e2 commit 46eed03

File tree

6 files changed

+195
-4
lines changed

6 files changed

+195
-4
lines changed

apps/webapp/app/presenters/v3/QueueListPresenter.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { engine } from "~/v3/runEngine.server";
44
import { BasePresenter } from "./basePresenter.server";
55
import { toQueueItem } from "./QueueRetrievePresenter.server";
66

7-
const DEFAULT_ITEMS_PER_PAGE = 10;
7+
const DEFAULT_ITEMS_PER_PAGE = 25;
88
const MAX_ITEMS_PER_PAGE = 100;
99
export class QueueListPresenter extends BasePresenter {
1010
private readonly perPage: number;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { json } from "@remix-run/server-runtime";
2+
import { type QueueItem, type RetrieveQueueParam, RetrieveQueueType } from "@trigger.dev/core/v3";
3+
import { z } from "zod";
4+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
5+
import { PauseQueueService } from "~/v3/services/pauseQueue.server";
6+
7+
const BodySchema = z.object({
8+
type: RetrieveQueueType.default("id"),
9+
action: z.enum(["pause", "resume"]),
10+
});
11+
12+
export const { action } = createActionApiRoute(
13+
{
14+
body: BodySchema,
15+
params: z.object({
16+
queueParam: z.string().transform((val) => val.replace(/%2F/g, "/")),
17+
}),
18+
},
19+
async ({ params, body, authentication }) => {
20+
const input: RetrieveQueueParam =
21+
body.type === "id"
22+
? params.queueParam
23+
: {
24+
type: body.type,
25+
name: decodeURIComponent(params.queueParam).replace(/%2F/g, "/"),
26+
};
27+
28+
const service = new PauseQueueService();
29+
const result = await service.call(
30+
authentication.environment,
31+
input,
32+
body.action === "pause" ? "paused" : "resumed"
33+
);
34+
35+
if (!result.success) {
36+
if (result.code === "queue-not-found") {
37+
return json({ error: result.code }, { status: 404 });
38+
}
39+
40+
return json({ error: result.code }, { status: 400 });
41+
}
42+
43+
const q: QueueItem = result.queue;
44+
return json(q);
45+
}
46+
);

apps/webapp/app/v3/services/pauseQueue.server.ts

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
1-
import { type RetrieveQueueParam } from "@trigger.dev/core/v3";
2-
import { getQueue } from "~/presenters/v3/QueueRetrievePresenter.server";
1+
import { QueueItem, type RetrieveQueueParam } from "@trigger.dev/core/v3";
2+
import { getQueue, toQueueItem } from "~/presenters/v3/QueueRetrievePresenter.server";
33
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
44
import { logger } from "~/services/logger.server";
55
import { BaseService } from "./baseService.server";
66
import { determineEngineVersion } from "../engineVersion.server";
77
import { removeQueueConcurrencyLimits, updateQueueConcurrencyLimits } from "../runQueue.server";
8+
import { engine } from "../runEngine.server";
89

910
export type PauseStatus = "paused" | "resumed";
1011

1112
export type PauseQueueResult =
1213
| {
1314
success: true;
1415
state: PauseStatus;
16+
queue: QueueItem;
1517
}
1618
| {
1719
success: false;
@@ -46,7 +48,7 @@ export class PauseQueueService extends BaseService {
4648
};
4749
}
4850

49-
await this._prisma.taskQueue.update({
51+
const updatedQueue = await this._prisma.taskQueue.update({
5052
where: {
5153
id: queue.id,
5254
},
@@ -71,9 +73,23 @@ export class PauseQueueService extends BaseService {
7173
environmentId: environment.id,
7274
});
7375

76+
const results = await Promise.all([
77+
engine.lengthOfQueues(environment, [queue.name]),
78+
engine.currentConcurrencyOfQueues(environment, [queue.name]),
79+
]);
80+
7481
return {
7582
success: true,
7683
state: action,
84+
queue: toQueueItem({
85+
friendlyId: updatedQueue.friendlyId,
86+
name: updatedQueue.name,
87+
type: updatedQueue.type,
88+
running: results[1]?.[updatedQueue.name] ?? 0,
89+
queued: results[0]?.[updatedQueue.name] ?? 0,
90+
concurrencyLimit: updatedQueue.concurrencyLimit ?? null,
91+
paused: updatedQueue.paused,
92+
}),
7793
};
7894
} catch (error) {
7995
logger.error("PauseQueueService: error updating queue state", {

packages/core/src/v3/apiClient/index.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,32 @@ export class ApiClient {
763763
);
764764
}
765765

766+
pauseQueue(
767+
queue: RetrieveQueueParam,
768+
action: "pause" | "resume",
769+
requestOptions?: ZodFetchOptions
770+
) {
771+
const type = typeof queue === "string" ? "id" : queue.type;
772+
const value = typeof queue === "string" ? queue : queue.name;
773+
774+
// Explicitly encode slashes before encoding the rest of the string
775+
const encodedValue = encodeURIComponent(value.replace(/\//g, "%2F"));
776+
777+
return zodfetch(
778+
QueueItem,
779+
`${this.baseUrl}/api/v1/queues/${encodedValue}/pause`,
780+
{
781+
method: "POST",
782+
headers: this.#getHeaders(false),
783+
body: JSON.stringify({
784+
type,
785+
action,
786+
}),
787+
},
788+
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
789+
);
790+
}
791+
766792
subscribeToRun<TRunTypes extends AnyRunTypes>(
767793
runId: string,
768794
options?: {

packages/trigger-sdk/src/v3/queues.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,92 @@ export function retrieve(
8686

8787
return apiClient.retrieveQueue(queue, $requestOptions);
8888
}
89+
90+
/**
91+
* Pauses a queue, preventing any new runs from being started.
92+
* Runs that are currently running will continue to completion.
93+
*
94+
* @example
95+
* ```ts
96+
* // Pause using a queue id
97+
* await queues.pause("queue_12345");
98+
*
99+
* // Or pause using type and name
100+
* await queues.pause({ type: "task", name: "my-task-id"});
101+
* ```
102+
* @param queue - The ID of the queue to pause, or the type and name
103+
* @returns The updated queue state
104+
*/
105+
export function pause(
106+
queue: RetrieveQueueParam,
107+
requestOptions?: ApiRequestOptions
108+
): ApiPromise<QueueItem> {
109+
const apiClient = apiClientManager.clientOrThrow();
110+
111+
const $requestOptions = mergeRequestOptions(
112+
{
113+
tracer,
114+
name: "queues.pause()",
115+
icon: "queue",
116+
attributes: {
117+
...flattenAttributes({ queue }),
118+
...accessoryAttributes({
119+
items: [
120+
{
121+
text: typeof queue === "string" ? queue : queue.name,
122+
variant: "normal",
123+
},
124+
],
125+
style: "codepath",
126+
}),
127+
},
128+
},
129+
requestOptions
130+
);
131+
132+
return apiClient.pauseQueue(queue, "pause", $requestOptions);
133+
}
134+
135+
/**
136+
* Resumes a paused queue, allowing new runs to be started.
137+
*
138+
* @example
139+
* ```ts
140+
* // Resume using a queue id
141+
* await queues.resume("queue_12345");
142+
*
143+
* // Or resume using type and name
144+
* await queues.resume({ type: "task", name: "my-task-id"});
145+
* ```
146+
* @param queue - The ID of the queue to resume, or the type and name
147+
* @returns The updated queue state
148+
*/
149+
export function resume(
150+
queue: RetrieveQueueParam,
151+
requestOptions?: ApiRequestOptions
152+
): ApiPromise<QueueItem> {
153+
const apiClient = apiClientManager.clientOrThrow();
154+
155+
const $requestOptions = mergeRequestOptions(
156+
{
157+
tracer,
158+
name: "queues.resume()",
159+
icon: "queue",
160+
attributes: {
161+
...flattenAttributes({ queue }),
162+
...accessoryAttributes({
163+
items: [
164+
{
165+
text: typeof queue === "string" ? queue : queue.name,
166+
variant: "normal",
167+
},
168+
],
169+
style: "codepath",
170+
}),
171+
},
172+
},
173+
requestOptions
174+
);
175+
176+
return apiClient.pauseQueue(queue, "resume", $requestOptions);
177+
}

references/hello-world/src/trigger/queues.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,24 @@ export const queuesTester = task({
1818
});
1919
logger.log("Retrieved from name", { retrievedFromCtxName });
2020

21+
//pause the queue
22+
const pausedQueue = await queues.pause({
23+
type: "task",
24+
name: "queues-tester",
25+
});
26+
logger.log("Paused queue", { pausedQueue });
27+
2128
const retrievedFromName = await queues.retrieve({
2229
type: "task",
2330
name: "queues-tester",
2431
});
2532
logger.log("Retrieved from name", { retrievedFromName });
33+
34+
//resume the queue
35+
const resumedQueue = await queues.resume({
36+
type: "task",
37+
name: "queues-tester",
38+
});
39+
logger.log("Resumed queue", { resumedQueue });
2640
},
2741
});

0 commit comments

Comments
 (0)