Skip to content

Commit 024eb8f

Browse files
committed
feat: Add basic queue implementation
1 parent d0ce076 commit 024eb8f

File tree

2 files changed

+99
-0
lines changed

2 files changed

+99
-0
lines changed

packages/core/src/queue.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
export class Queue<T> {
2+
private queue: Set<Promise<T>> = new Set();
3+
4+
public add(task: Promise<T>): Promise<T> {
5+
this.queue.add(task);
6+
task.then(() => this.queue.delete(task));
7+
return task;
8+
}
9+
10+
public length(): number {
11+
return this.queue.size;
12+
}
13+
14+
public drain(timeout?: number): Promise<void> {
15+
return new Promise((resolve, reject) => {
16+
const capturedSetTimeout = setTimeout(() => {
17+
if (timeout && timeout > 0) {
18+
reject('Drain timeout reached');
19+
}
20+
}, timeout);
21+
Promise.all(this.queue.values()).then(() => {
22+
clearTimeout(capturedSetTimeout);
23+
resolve();
24+
});
25+
});
26+
}
27+
}

packages/core/test/lib/queue.test.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import { Queue } from '../../src/queue';
2+
3+
describe('Queue', () => {
4+
beforeEach(() => {
5+
jest.useFakeTimers();
6+
});
7+
8+
test('add()', () => {
9+
const q = new Queue<void>();
10+
const p = new Promise<void>(resolve => setTimeout(resolve, 1));
11+
q.add(p);
12+
expect(q.length()).toBe(1);
13+
});
14+
15+
test('resolved promises should not show up in queue length', async () => {
16+
expect.assertions(2);
17+
const q = new Queue<void>();
18+
const p = new Promise<void>(resolve => setTimeout(resolve, 1));
19+
q.add(p).then(() => {
20+
expect(q.length()).toBe(0);
21+
});
22+
expect(q.length()).toBe(1);
23+
jest.runAllTimers();
24+
});
25+
26+
test('receive promise result outside and from queue', async () => {
27+
expect.assertions(4);
28+
const q = new Queue<string>();
29+
const p = new Promise<string>(resolve =>
30+
setTimeout(() => {
31+
resolve('test');
32+
}, 1),
33+
);
34+
q.add(p).then(result => {
35+
expect(q.length()).toBe(0);
36+
expect(result).toBe('test');
37+
});
38+
expect(q.length()).toBe(1);
39+
p.then(result => {
40+
expect(result).toBe('test');
41+
});
42+
jest.runAllTimers();
43+
});
44+
45+
test('drain()', async () => {
46+
expect.assertions(2);
47+
const q = new Queue<void>();
48+
for (let i = 0; i < 5; i++) {
49+
const p = new Promise<void>(resolve => setTimeout(resolve, 1));
50+
q.add(p);
51+
}
52+
expect(q.length()).toBe(5);
53+
q.drain().then(() => {
54+
expect(q.length()).toBe(0);
55+
});
56+
jest.runAllTimers();
57+
});
58+
59+
test('drain() with timeout', async () => {
60+
expect.assertions(2);
61+
const q = new Queue<void>();
62+
for (let i = 0; i < 5; i++) {
63+
const p = new Promise<void>(resolve => setTimeout(resolve, 100));
64+
q.add(p);
65+
}
66+
expect(q.length()).toBe(5);
67+
q.drain(50).catch(e => {
68+
expect(e).toBeTruthy();
69+
});
70+
jest.runAllTimers();
71+
});
72+
});

0 commit comments

Comments
 (0)