Skip to content

Commit 7bea9eb

Browse files
committed
A task for reproducing a race condition with checkpoints
1 parent 6395259 commit 7bea9eb

File tree

1 file changed

+35
-1
lines changed

1 file changed

+35
-1
lines changed

references/v3-catalog/src/trigger/checkpoints.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { logger, task, wait } from "@trigger.dev/sdk/v3";
1+
import { logger, queue, task, wait } from "@trigger.dev/sdk/v3";
22

33
type Payload = {
44
count?: number;
@@ -70,6 +70,7 @@ export const nestedDependencies = task({
7070
maxDepth,
7171
waitSeconds,
7272
failAttemptChance,
73+
batchSize,
7374
});
7475
logger.log(`Triggered complete ${i + 1}/${batchSize}`);
7576

@@ -153,3 +154,36 @@ export const bulkPermanentlyFrozen = task({
153154
);
154155
},
155156
});
157+
158+
const oneAtATime = queue({
159+
name: "race-condition",
160+
concurrencyLimit: 1,
161+
});
162+
163+
export const raceConditionCheckpointDequeue = task({
164+
id: "race-condition-checkpoint-dequeue",
165+
queue: oneAtATime,
166+
run: async ({ isBatch = true }: { isBatch?: boolean }) => {
167+
await holdConcurrency.trigger({ waitSeconds: 45 });
168+
169+
if (isBatch) {
170+
await fixedLengthTask.batchTriggerAndWait(
171+
Array.from({ length: 1 }, (_, i) => ({
172+
payload: { waitSeconds: 5 },
173+
}))
174+
);
175+
} else {
176+
await fixedLengthTask.triggerAndWait({ waitSeconds: 5 });
177+
}
178+
179+
logger.log(`Successfully completed task`);
180+
},
181+
});
182+
183+
export const holdConcurrency = task({
184+
id: "hold-concurrency",
185+
queue: oneAtATime,
186+
run: async ({ waitSeconds = 60 }: { waitSeconds?: number }) => {
187+
await new Promise((resolve) => setTimeout(resolve, waitSeconds * 1000));
188+
},
189+
});

0 commit comments

Comments
 (0)