Skip to content

Commit fff3a35

Browse files
committed
Don’t have a default visibility timeout in the queue
1 parent 1c31aa0 commit fff3a35

File tree

2 files changed

+28
-48
lines changed

2 files changed

+28
-48
lines changed

internal-packages/redis-worker/src/queue.test.ts

Lines changed: 22 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,9 @@ describe("SimpleQueue", () => {
1010
const queue = new SimpleQueue({
1111
name: "test-1",
1212
schema: {
13-
test: {
14-
schema: z.object({
15-
value: z.number(),
16-
}),
17-
defaultVisibilityTimeoutMs: 2000,
18-
},
13+
test: z.object({
14+
value: z.number(),
15+
}),
1916
},
2017
redisOptions: {
2118
host: redisContainer.getHost(),
@@ -26,10 +23,10 @@ describe("SimpleQueue", () => {
2623
});
2724

2825
try {
29-
await queue.enqueue({ id: "1", job: "test", item: { value: 1 } });
26+
await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 2000 });
3027
expect(await queue.size()).toBe(1);
3128

32-
await queue.enqueue({ id: "2", job: "test", item: { value: 2 } });
29+
await queue.enqueue({ id: "2", job: "test", item: { value: 2 }, visibilityTimeoutMs: 2000 });
3330
expect(await queue.size()).toBe(2);
3431

3532
const [first] = await queue.dequeue(1);
@@ -64,12 +61,9 @@ describe("SimpleQueue", () => {
6461
const queue = new SimpleQueue({
6562
name: "test-1",
6663
schema: {
67-
test: {
68-
schema: z.object({
69-
value: z.number(),
70-
}),
71-
defaultVisibilityTimeoutMs: 2000,
72-
},
64+
test: z.object({
65+
value: z.number(),
66+
}),
7367
},
7468
redisOptions: {
7569
host: redisContainer.getHost(),
@@ -103,12 +97,9 @@ describe("SimpleQueue", () => {
10397
const queue = new SimpleQueue({
10498
name: "test-1",
10599
schema: {
106-
test: {
107-
schema: z.object({
108-
value: z.number(),
109-
}),
110-
defaultVisibilityTimeoutMs: 2000,
111-
},
100+
test: z.object({
101+
value: z.number(),
102+
}),
112103
},
113104
redisOptions: {
114105
host: redisContainer.getHost(),
@@ -124,6 +115,7 @@ describe("SimpleQueue", () => {
124115
job: "test",
125116
item: { value: 1 },
126117
availableAt: new Date(Date.now() + 50),
118+
visibilityTimeoutMs: 2000,
127119
});
128120

129121
const miss = await queue.dequeue(1);
@@ -147,12 +139,9 @@ describe("SimpleQueue", () => {
147139
const queue = new SimpleQueue({
148140
name: "test-1",
149141
schema: {
150-
test: {
151-
schema: z.object({
152-
value: z.number(),
153-
}),
154-
defaultVisibilityTimeoutMs: 2000,
155-
},
142+
test: z.object({
143+
value: z.number(),
144+
}),
156145
},
157146
redisOptions: {
158147
host: redisContainer.getHost(),
@@ -193,14 +182,11 @@ describe("SimpleQueue", () => {
193182

194183
redisTest("dequeue multiple items", { timeout: 20_000 }, async ({ redisContainer }) => {
195184
const queue = new SimpleQueue({
196-
name: "test-multi",
185+
name: "test-1",
197186
schema: {
198-
test: {
199-
schema: z.object({
200-
value: z.number(),
201-
}),
202-
defaultVisibilityTimeoutMs: 2000,
203-
},
187+
test: z.object({
188+
value: z.number(),
189+
}),
204190
},
205191
redisOptions: {
206192
host: redisContainer.getHost(),
@@ -211,9 +197,9 @@ redisTest("dequeue multiple items", { timeout: 20_000 }, async ({ redisContainer
211197
});
212198

213199
try {
214-
await queue.enqueue({ id: "1", job: "test", item: { value: 1 } });
215-
await queue.enqueue({ id: "2", job: "test", item: { value: 2 } });
216-
await queue.enqueue({ id: "3", job: "test", item: { value: 3 } });
200+
await queue.enqueue({ id: "1", job: "test", item: { value: 1 }, visibilityTimeoutMs: 2000 });
201+
await queue.enqueue({ id: "2", job: "test", item: { value: 2 }, visibilityTimeoutMs: 2000 });
202+
await queue.enqueue({ id: "3", job: "test", item: { value: 3 }, visibilityTimeoutMs: 2000 });
217203

218204
expect(await queue.size()).toBe(3);
219205

internal-packages/redis-worker/src/queue.ts

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,14 @@ import { nanoid } from "nanoid";
44
import { z } from "zod";
55

66
export interface MessageCatalogSchema {
7-
[key: string]: {
8-
schema: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>;
9-
defaultVisibilityTimeoutMs: number;
10-
};
7+
[key: string]: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>;
118
}
129

1310
export type MessageCatalogKey<TMessageCatalog extends MessageCatalogSchema> = keyof TMessageCatalog;
1411
export type MessageCatalogValue<
1512
TMessageCatalog extends MessageCatalogSchema,
1613
TKey extends MessageCatalogKey<TMessageCatalog>,
17-
> = z.infer<TMessageCatalog[TKey]["schema"]>;
14+
> = z.infer<TMessageCatalog[TKey]>;
1815

1916
export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
2017
name: string;
@@ -77,16 +74,14 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
7774
job: MessageCatalogKey<TMessageCatalog>;
7875
item: MessageCatalogValue<TMessageCatalog, MessageCatalogKey<TMessageCatalog>>;
7976
availableAt?: Date;
80-
visibilityTimeoutMs?: number;
77+
visibilityTimeoutMs: number;
8178
}): Promise<void> {
8279
try {
8380
const score = availableAt ? availableAt.getTime() : Date.now();
84-
const jobSchema = this.schema[job];
85-
const actualVisibilityTimeoutMs = visibilityTimeoutMs ?? jobSchema.defaultVisibilityTimeoutMs;
8681
const serializedItem = JSON.stringify({
8782
job,
8883
item,
89-
visibilityTimeoutMs: actualVisibilityTimeoutMs,
84+
visibilityTimeoutMs,
9085
});
9186

9287
const result = await this.redis.enqueueItem(
@@ -148,7 +143,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
148143
continue;
149144
}
150145

151-
const validatedItem = schema.schema.safeParse(parsedItem.item);
146+
const validatedItem = schema.safeParse(parsedItem.item);
152147

153148
if (!validatedItem.success) {
154149
this.logger.error("Invalid item in queue", {
@@ -160,8 +155,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
160155
continue;
161156
}
162157

163-
const visibilityTimeoutMs =
164-
(parsedItem.visibilityTimeoutMs as number) || schema.defaultVisibilityTimeoutMs;
158+
const visibilityTimeoutMs = parsedItem.visibilityTimeoutMs as number;
165159
const invisibleUntil = now + visibilityTimeoutMs;
166160

167161
await this.redis.zadd(`queue`, invisibleUntil, id);

0 commit comments

Comments
 (0)