Skip to content

Commit f4be73a

Browse files
committed
Test for releaseConcurrencyOnWaitpoint
1 parent 5bae166 commit f4be73a

File tree

1 file changed

+149
-0
lines changed

1 file changed

+149
-0
lines changed
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import { batch, logger, queue, task, wait } from "@trigger.dev/sdk";
2+
import assert from "node:assert";
3+
import { setTimeout } from "node:timers/promises";
4+
5+
// Queue with concurrency limit and release enabled
6+
const releaseEnabledQueue = queue({
7+
name: "release-concurrency-test-queue-enabled",
8+
concurrencyLimit: 2,
9+
releaseConcurrencyOnWaitpoint: true,
10+
});
11+
12+
// Queue with concurrency limit but release disabled
13+
const releaseDisabledQueue = queue({
14+
name: "release-concurrency-test-queue-disabled",
15+
concurrencyLimit: 2,
16+
releaseConcurrencyOnWaitpoint: false,
17+
});
18+
19+
// Task that runs on the release-enabled queue
20+
const releaseEnabledTask = task({
21+
id: "release-concurrency-enabled-task",
22+
queue: releaseEnabledQueue,
23+
retry: {
24+
maxAttempts: 1,
25+
},
26+
run: async (payload: { id: string; waitSeconds: number }, { ctx }) => {
27+
const startedAt = Date.now();
28+
logger.info(`Run ${payload.id} started at ${startedAt}`);
29+
30+
// Wait and release concurrency
31+
await wait.for({ seconds: payload.waitSeconds, releaseConcurrency: true });
32+
33+
const resumedAt = Date.now();
34+
await setTimeout(2000); // Additional work after resuming
35+
const completedAt = Date.now();
36+
37+
return { id: payload.id, startedAt, resumedAt, completedAt };
38+
},
39+
});
40+
41+
// Task that runs on the release-disabled queue
42+
const releaseDisabledTask = task({
43+
id: "release-concurrency-disabled-task",
44+
queue: releaseDisabledQueue,
45+
retry: {
46+
maxAttempts: 1,
47+
},
48+
run: async (payload: { id: string; waitSeconds: number }, { ctx }) => {
49+
const startedAt = Date.now();
50+
logger.info(`Run ${payload.id} started ${startedAt}`);
51+
52+
// Wait without releasing concurrency
53+
await wait.for({ seconds: payload.waitSeconds });
54+
55+
const resumedAt = Date.now();
56+
await setTimeout(2000);
57+
const completedAt = Date.now();
58+
59+
return { id: payload.id, startedAt, resumedAt, completedAt };
60+
},
61+
});
62+
63+
// Main test task
64+
export const waitReleaseConcurrencyTestTask = task({
65+
id: "wait-release-concurrency-test",
66+
retry: {
67+
maxAttempts: 1,
68+
},
69+
run: async (payload, { ctx }) => {
70+
logger.info("Starting wait release concurrency test");
71+
72+
// Test 1: Queue with release enabled
73+
logger.info("Testing queue with release enabled");
74+
const enabledResults = await batch.triggerAndWait([
75+
{ id: releaseEnabledTask.id, payload: { id: "e1", waitSeconds: 6 } },
76+
{ id: releaseEnabledTask.id, payload: { id: "e2", waitSeconds: 6 } },
77+
{ id: releaseEnabledTask.id, payload: { id: "e3", waitSeconds: 6 } },
78+
]);
79+
80+
// Verify all tasks completed
81+
assert(
82+
enabledResults.runs.every((r) => r.ok),
83+
"All enabled tasks should complete"
84+
);
85+
86+
// Get executions sorted by start time
87+
const enabledExecutions = enabledResults.runs
88+
.map((r) => r.output)
89+
.sort((a, b) => a.startedAt - b.startedAt);
90+
91+
// Verify that task e3 could start before e1 and e2 completed
92+
// (because concurrency was released during wait)
93+
const e3 = enabledExecutions.find((e) => e.id === "e3");
94+
const e1e2CompletedAt = Math.max(
95+
...enabledExecutions.filter((e) => ["e1", "e2"].includes(e.id)).map((e) => e.completedAt)
96+
);
97+
98+
assert(
99+
e3.startedAt < e1e2CompletedAt,
100+
"Task e3 should start before e1/e2 complete due to released concurrency"
101+
);
102+
103+
logger.info("✅ test with release enabled");
104+
105+
// Test 2: Queue with release disabled
106+
logger.info("Testing queue with release disabled");
107+
const disabledResults = await batch.triggerAndWait([
108+
{ id: releaseDisabledTask.id, payload: { id: "d1", waitSeconds: 6 } },
109+
{ id: releaseDisabledTask.id, payload: { id: "d2", waitSeconds: 6 } },
110+
{ id: releaseDisabledTask.id, payload: { id: "d3", waitSeconds: 6 } },
111+
]);
112+
113+
// Verify all tasks completed
114+
assert(
115+
disabledResults.runs.every((r) => r.ok),
116+
"All disabled tasks should complete"
117+
);
118+
119+
// Get executions sorted by start time
120+
const disabledExecutions = disabledResults.runs
121+
.map((r) => r.output)
122+
.sort((a, b) => a.startedAt - b.startedAt);
123+
124+
// Verify that task d3 could NOT start before d1 or d2 completed
125+
// (because concurrency was not released during wait)
126+
const d3 = disabledExecutions.find((e) => e.id === "d3");
127+
const d1d2CompletedAt = Math.max(
128+
...disabledExecutions.filter((e) => ["d1", "d2"].includes(e.id)).map((e) => e.completedAt)
129+
);
130+
131+
assert(
132+
d3.startedAt >= d1d2CompletedAt,
133+
"Task d3 should not start before d1/d2 complete when concurrency is not released"
134+
);
135+
136+
logger.info("✅ test with release disabled");
137+
138+
return {
139+
enabledQueueResults: {
140+
executions: enabledExecutions,
141+
concurrencyReleased: true,
142+
},
143+
disabledQueueResults: {
144+
executions: disabledExecutions,
145+
concurrencyReleased: false,
146+
},
147+
};
148+
},
149+
});

0 commit comments

Comments
 (0)