Skip to content

Commit 1db5a6b

Browse files
committed
Merge remote-tracking branch 'origin/main' into increase-v4-worker-visibility-timeouts
2 parents 7e4c896 + 32ff569 commit 1db5a6b

File tree

5 files changed

+105
-8
lines changed

5 files changed

+105
-8
lines changed

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.queues/route.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ export default function Page() {
285285
{success ? (
286286
<div
287287
className={cn(
288-
"grid h-fit max-h-full min-h-full grid-rows-[1fr] overflow-x-auto",
288+
"grid max-h-full min-h-full grid-rows-[1fr] overflow-x-auto",
289289
pagination.totalPages > 1 && "grid-rows-[1fr_auto]"
290290
)}
291291
>

apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.schedules/route.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ export default function Page() {
229229

230230
<div
231231
className={cn(
232-
"grid h-fit max-h-full min-h-full overflow-x-auto",
232+
"grid max-h-full min-h-full overflow-x-auto",
233233
totalPages > 1 ? "grid-rows-[1fr_auto]" : "grid-rows-[1fr]"
234234
)}
235235
>

docs/upgrade-to-v4.mdx

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ pnpm dlx trigger.dev@v4-beta dev
508508

509509
During the beta we will be tracking issues and releasing regular fixes.
510510

511-
There are no known issues at the moment.
511+
**ISSUE:** Runs not continuing after a child run(s) have completed.
512512

513513
## Deprecations
514514

@@ -703,7 +703,8 @@ import { task } from "@trigger.dev/sdk";
703703
export const myTask = task({
704704
id: "my-task",
705705
onStart: ({ payload, ctx }) => {},
706-
run: async ({ payload, ctx }) => {},
706+
// The run function still uses separate parameters
707+
run: async ( payload, { ctx }) => {},
707708
});
708709
```
709710

@@ -721,7 +722,7 @@ export const myTask = task({
721722
onResume: ({ payload, ctx, task, wait }) => {},
722723
onComplete: ({ payload, ctx, task, result }) => {},
723724
catchError: ({ payload, ctx, task, error, retry, retryAt, retryDelayInMs }) => {},
724-
run: async ({ payload, ctx }) => {},
725+
run: async (payload, { ctx }) => {},
725726
});
726727
```
727728

@@ -731,3 +732,32 @@ We've made a few small changes to the `ctx` object:
731732

732733
- `ctx.attempt.id` and `ctx.attempt.status` have been removed. `ctx.attempt.number` is still available.
733734
- `ctx.task.exportName` has been removed (since we no longer require tasks to be exported to be triggered).
735+
736+
### BatchTrigger changes
737+
738+
The `batchTrigger` function no longer returns a `runs` list directly. In v3, you could access the runs directly from the batch handle:
739+
740+
```ts
741+
// In v3
742+
const batchHandle = await tasks.batchTrigger([
743+
[myTask, { foo: "bar" }],
744+
[myOtherTask, { baz: "qux" }],
745+
]);
746+
747+
// You could access runs directly
748+
console.log(batchHandle.runs);
749+
```
750+
751+
In v4, you now need to use the `runs.list()` method to get the list of runs:
752+
753+
```ts
754+
// In v4
755+
const batchHandle = await tasks.batchTrigger([
756+
[myTask, { foo: "bar" }],
757+
[myOtherTask, { baz: "qux" }],
758+
]);
759+
760+
// Now you need to call runs.list()
761+
const runs = await batchHandle.runs.list();
762+
console.log(runs);
763+
```

packages/redis-worker/src/queue.test.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { expect } from "vitest";
44
import { z } from "zod";
55
import { SimpleQueue } from "./queue.js";
66
import { Logger } from "@trigger.dev/core/logger";
7+
import { createRedisClient } from "@internal/redis";
78

89
describe("SimpleQueue", () => {
910
redisTest("enqueue/dequeue", { timeout: 20_000 }, async ({ redisContainer }) => {
@@ -209,6 +210,10 @@ describe("SimpleQueue", () => {
209210
timestamp: expect.any(Date),
210211
})
211212
);
213+
214+
// Acknowledge the item and verify it's removed
215+
await queue.ack(second!.id);
216+
expect(await queue.size({ includeFuture: true })).toBe(0);
212217
} finally {
213218
await queue.close();
214219
}
@@ -328,6 +333,7 @@ describe("SimpleQueue", () => {
328333

329334
// Redrive item from DLQ
330335
await queue.redriveFromDeadLetterQueue("1");
336+
await new Promise((resolve) => setTimeout(resolve, 200));
331337
expect(await queue.size()).toBe(1);
332338
expect(await queue.sizeOfDeadLetterQueue()).toBe(0);
333339

@@ -357,4 +363,64 @@ describe("SimpleQueue", () => {
357363
await queue.close();
358364
}
359365
});
366+
367+
redisTest("cleanup orphaned queue entries", { timeout: 20_000 }, async ({ redisContainer }) => {
368+
const queue = new SimpleQueue({
369+
name: "test-orphaned",
370+
schema: {
371+
test: z.object({
372+
value: z.number(),
373+
}),
374+
},
375+
redisOptions: {
376+
host: redisContainer.getHost(),
377+
port: redisContainer.getPort(),
378+
password: redisContainer.getPassword(),
379+
},
380+
logger: new Logger("test", "log"),
381+
});
382+
383+
try {
384+
// First, add a normal item
385+
await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 2000 });
386+
387+
const redisClient = createRedisClient({
388+
host: redisContainer.getHost(),
389+
port: redisContainer.getPort(),
390+
password: redisContainer.getPassword(),
391+
});
392+
393+
// Manually add an orphaned item to the queue (without corresponding hash entry)
394+
await redisClient.zadd(`{queue:test-orphaned:}queue`, Date.now(), "orphaned-id");
395+
396+
// Verify both items are in the queue
397+
expect(await queue.size()).toBe(2);
398+
399+
// Dequeue should process both items, but only return the valid one
400+
// and clean up the orphaned entry
401+
const dequeued = await queue.dequeue(2);
402+
403+
// Should only get the valid item
404+
expect(dequeued).toHaveLength(1);
405+
expect(dequeued[0]).toEqual(
406+
expect.objectContaining({
407+
id: "1",
408+
job: "test",
409+
item: { value: 1 },
410+
visibilityTimeoutMs: 2000,
411+
attempt: 0,
412+
timestamp: expect.any(Date),
413+
})
414+
);
415+
416+
// The orphaned item should have been removed
417+
expect(await queue.size({ includeFuture: true })).toBe(1);
418+
419+
// Verify the orphaned ID is no longer in the queue
420+
const orphanedScore = await redisClient.zscore(`{queue:test-orphaned:}queue`, "orphaned-id");
421+
expect(orphanedScore).toBeNull();
422+
} finally {
423+
await queue.close();
424+
}
425+
});
360426
});

packages/redis-worker/src/queue.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
130130
throw e;
131131
}
132132
}
133+
133134
async dequeue(count: number = 1): Promise<Array<QueueItem<TMessageCatalog>>> {
134135
const now = Date.now();
135136

@@ -179,9 +180,6 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
179180
}
180181

181182
const visibilityTimeoutMs = parsedItem.visibilityTimeoutMs as number;
182-
const invisibleUntil = now + visibilityTimeoutMs;
183-
184-
await this.redis.zadd(`queue`, invisibleUntil, id);
185183

186184
dequeuedItems.push({
187185
id,
@@ -374,6 +372,9 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
374372
375373
redis.call('ZADD', queue, invisibleUntil, id)
376374
table.insert(dequeued, {id, serializedItem, score})
375+
else
376+
-- Remove the orphaned queue entry if no corresponding item exists
377+
redis.call('ZREM', queue, id)
377378
end
378379
end
379380

0 commit comments

Comments
 (0)