Skip to content

Commit 98204f8

Browse files
Add a helper for chaining async iterators. (#13528)
This will be used in upcoming locator-related changes.
1 parent aeca6dd commit 98204f8

File tree

2 files changed

+345
-1
lines changed

2 files changed

+345
-1
lines changed

src/client/common/utils/async.ts

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,86 @@ export function createDeferredFromPromise<T>(promise: Promise<T>): Deferred<T> {
110110
promise.then(deferred.resolve.bind(deferred)).catch(deferred.reject.bind(deferred));
111111
return deferred;
112112
}
113+
114+
//================================
115+
// iterators
116+
117+
/**
118+
* An iterator that yields nothing.
119+
*/
120+
export function iterEmpty<T, R = void>(): AsyncIterator<T, R> {
121+
// tslint:disable-next-line:no-empty
122+
return ((async function* () {})() as unknown) as AsyncIterator<T, R>;
123+
}
124+
125+
type NextResult<T, R = void> = { index: number } & (
126+
| { result: IteratorResult<T, R>; err: null }
127+
| { result: null; err: Error }
128+
);
129+
async function getNext<T, R = void>(it: AsyncIterator<T, R>, indexMaybe?: number): Promise<NextResult<T, R>> {
130+
const index = indexMaybe === undefined ? -1 : indexMaybe;
131+
try {
132+
const result = await it.next();
133+
return { index, result, err: null };
134+
} catch (err) {
135+
return { index, err, result: null };
136+
}
137+
}
138+
139+
// tslint:disable-next-line:promise-must-complete no-empty
140+
const NEVER: Promise<unknown> = new Promise(() => {});
141+
142+
/**
143+
* Yield everything produced by the given iterators as soon as each is ready.
144+
*
145+
* When one of the iterators has something to yield then it gets yielded
146+
* right away, regardless of where the iterator is located in the array
147+
* of iterators.
148+
*
149+
* @param iterators - the async iterators from which to yield items
150+
* @param onError - called/awaited once for each iterator that fails
151+
*/
152+
export async function* chain<T, R = void>(
153+
iterators: AsyncIterator<T | void, R>[],
154+
onError?: (err: Error, index: number) => Promise<void>
155+
// Ultimately we may also want to support cancellation.
156+
): AsyncIterator<T | R | void, void> {
157+
const promises = iterators.map(getNext);
158+
let numRunning = iterators.length;
159+
while (numRunning > 0) {
160+
const { index, result, err } = await Promise.race(promises);
161+
if (err !== null) {
162+
promises[index] = NEVER as Promise<NextResult<T, R>>;
163+
numRunning -= 1;
164+
if (onError !== undefined) {
165+
await onError(err, index);
166+
}
167+
// XXX Log the error.
168+
} else if (result!.done) {
169+
promises[index] = NEVER as Promise<NextResult<T, R>>;
170+
numRunning -= 1;
171+
// If R is void then result.value will be undefined.
172+
if (result!.value !== undefined) {
173+
yield result!.value;
174+
}
175+
} else {
176+
promises[index] = getNext(iterators[index], index);
177+
yield result!.value;
178+
}
179+
}
180+
}
181+
182+
/**
183+
* Get everything yielded by the iterator.
184+
*/
185+
export async function flattenIterator<T>(iterator: AsyncIterator<T, void>): Promise<T[]> {
186+
const results: T[] = [];
187+
// We are dealing with an iterator, not an iterable, so we have
188+
// to iterate manually rather than with a for-await loop.
189+
let result = await iterator.next();
190+
while (!result.done) {
191+
results.push(result.value);
192+
result = await iterator.next();
193+
}
194+
return results;
195+
}

src/test/common/utils/async.unit.test.ts

Lines changed: 262 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
'use strict';
55

66
import * as assert from 'assert';
7-
import { createDeferred } from '../../../client/common/utils/async';
7+
import { chain, createDeferred, flattenIterator } from '../../../client/common/utils/async';
88

99
suite('Deferred', () => {
1010
test('Resolve', (done) => {
@@ -53,3 +53,264 @@ suite('Deferred', () => {
5353
assert.equal(def.completed, true, 'Promise is not completed even when it should not be');
5454
});
5555
});
56+
57+
suite('chain async iterators', () => {
58+
const flatten = flattenIterator;
59+
60+
test('no iterators', async () => {
61+
const expected: string[] = [];
62+
63+
const results = await flatten(chain([]));
64+
65+
assert.deepEqual(results, expected);
66+
});
67+
68+
test('one iterator, one item', async () => {
69+
const expected = ['foo'];
70+
const it = (async function* () {
71+
yield 'foo';
72+
})();
73+
74+
const results = await flatten(chain([it]));
75+
76+
assert.deepEqual(results, expected);
77+
});
78+
79+
test('one iterator, many items', async () => {
80+
const expected = ['foo', 'bar', 'baz'];
81+
const it = (async function* () {
82+
yield* expected;
83+
})();
84+
85+
const results = await flatten(chain([it]));
86+
87+
assert.deepEqual(results, expected);
88+
});
89+
90+
test('one iterator, no items', async () => {
91+
const deferred = createDeferred<void>();
92+
const it = (async function* () {
93+
deferred.resolve();
94+
})();
95+
96+
const results = await flatten(chain([it]));
97+
98+
assert.deepEqual(results, []);
99+
// Make sure chain() actually used up the iterator,
100+
// even through it didn't yield anything.
101+
assert.ok(deferred.resolved);
102+
});
103+
104+
test('many iterators, one item each', async () => {
105+
// For deterministic results we must control when each iterator starts.
106+
const deferred12 = createDeferred<void>();
107+
const deferred23 = createDeferred<void>();
108+
const expected = ['a', 'b', 'c'];
109+
const it1 = (async function* () {
110+
yield 'a';
111+
deferred12.resolve();
112+
})();
113+
const it2 = (async function* () {
114+
await deferred12.promise;
115+
yield 'b';
116+
deferred23.resolve();
117+
})();
118+
const it3 = (async function* () {
119+
await deferred23.promise;
120+
yield 'c';
121+
})();
122+
123+
const results = await flatten(chain([it1, it2, it3]));
124+
125+
assert.deepEqual(results, expected);
126+
});
127+
128+
test('many iterators, many items each', async () => {
129+
// For deterministic results we must control when each iterator starts.
130+
const deferred12 = createDeferred<void>();
131+
const deferred23 = createDeferred<void>();
132+
const expected = ['a1', 'a2', 'a3', 'b1', 'b2', 'b3', 'c1', 'c2', 'c3'];
133+
const it1 = (async function* () {
134+
yield 'a1';
135+
yield 'a2';
136+
yield 'a3';
137+
deferred12.resolve();
138+
})();
139+
const it2 = (async function* () {
140+
await deferred12.promise;
141+
yield 'b1';
142+
yield 'b2';
143+
yield 'b3';
144+
deferred23.resolve();
145+
})();
146+
const it3 = (async function* () {
147+
await deferred23.promise;
148+
yield 'c1';
149+
yield 'c2';
150+
yield 'c3';
151+
})();
152+
153+
const results = await flatten(chain([it1, it2, it3]));
154+
155+
assert.deepEqual(results, expected);
156+
});
157+
158+
test('many iterators, one empty', async () => {
159+
// For deterministic results we must control when each iterator starts.
160+
const deferred12 = createDeferred<void>();
161+
const deferred23 = createDeferred<void>();
162+
const expected = ['a', 'c'];
163+
const it1 = (async function* () {
164+
yield 'a';
165+
deferred12.resolve();
166+
})();
167+
const it2 = (async function* () {
168+
await deferred12.promise;
169+
// We do not yield anything.
170+
deferred23.resolve();
171+
})();
172+
const it3 = (async function* () {
173+
await deferred23.promise;
174+
yield 'c';
175+
})();
176+
const empty = it2;
177+
178+
const results = await flatten(chain([it1, empty, it3]));
179+
180+
assert.deepEqual(results, expected);
181+
});
182+
183+
test('Results are yielded as soon as ready, regardless of source iterator.', async () => {
184+
// For deterministic results we must control when each iterator starts.
185+
const deferred24 = createDeferred<void>();
186+
const deferred41 = createDeferred<void>();
187+
const deferred13 = createDeferred<void>();
188+
const deferred35 = createDeferred<void>();
189+
const deferred56 = createDeferred<void>();
190+
const expected = ['b', 'd', 'a', 'c', 'e', 'f'];
191+
const it1 = (async function* () {
192+
await deferred41.promise;
193+
yield 'a';
194+
deferred13.resolve();
195+
})();
196+
const it2 = (async function* () {
197+
yield 'b';
198+
deferred24.resolve();
199+
})();
200+
const it3 = (async function* () {
201+
await deferred13.promise;
202+
yield 'c';
203+
deferred35.resolve();
204+
})();
205+
const it4 = (async function* () {
206+
await deferred24.promise;
207+
yield 'd';
208+
deferred41.resolve();
209+
})();
210+
const it5 = (async function* () {
211+
await deferred35.promise;
212+
yield 'e';
213+
deferred56.resolve();
214+
})();
215+
const it6 = (async function* () {
216+
await deferred56.promise;
217+
yield 'f';
218+
})();
219+
220+
const results = await flatten(chain([it1, it2, it3, it4, it5, it6]));
221+
222+
assert.deepEqual(results, expected);
223+
});
224+
225+
test('A failed iterator does not block the others, with onError.', async () => {
226+
// For deterministic results we must control when each iterator starts.
227+
const deferred12 = createDeferred<void>();
228+
const deferred23 = createDeferred<void>();
229+
const expected = ['a', 'b', 'c'];
230+
const it1 = (async function* () {
231+
yield 'a';
232+
deferred12.resolve();
233+
})();
234+
const failure = new Error('uh-oh!');
235+
const it2 = (async function* () {
236+
await deferred12.promise;
237+
yield 'b';
238+
throw failure;
239+
})();
240+
const it3 = (async function* () {
241+
await deferred23.promise;
242+
yield 'c';
243+
})();
244+
const fails = it2;
245+
let gotErr: { err: Error; index: number } | undefined;
246+
async function onError(err: Error, index: number) {
247+
gotErr = { err, index };
248+
deferred23.resolve();
249+
}
250+
251+
const results = await flatten(chain([it1, fails, it3], onError));
252+
253+
assert.deepEqual(results, expected);
254+
assert.deepEqual(gotErr, { err: failure, index: 1 });
255+
});
256+
257+
test('A failed iterator does not block the others, without onError.', async () => {
258+
// If this test fails then it will likely fail intermittently.
259+
// For (mostly) deterministic results we must control when each iterator starts.
260+
const deferred12 = createDeferred<void>();
261+
const deferred23 = createDeferred<void>();
262+
const expected = ['a', 'b', 'c'];
263+
const it1 = (async function* () {
264+
yield 'a';
265+
deferred12.resolve();
266+
})();
267+
const failure = new Error('uh-oh!');
268+
const it2 = (async function* () {
269+
await deferred12.promise;
270+
yield 'b';
271+
deferred23.resolve();
272+
// This is ignored by chain() since we did not provide onError().
273+
throw failure;
274+
})();
275+
const it3 = (async function* () {
276+
await deferred23.promise;
277+
yield 'c';
278+
})();
279+
const fails = it2;
280+
281+
const results = await flatten(chain([it1, fails, it3]));
282+
283+
assert.deepEqual(results, expected);
284+
});
285+
286+
test('int results', async () => {
287+
const expected = [42, 7, 11, 13];
288+
const it = (async function* () {
289+
yield 42;
290+
yield* [7, 11, 13];
291+
})();
292+
293+
const results = await flatten(chain([it]));
294+
295+
assert.deepEqual(results, expected);
296+
});
297+
298+
test('object results', async () => {
299+
type Result = {
300+
value: string;
301+
};
302+
const expected: Result[] = [
303+
// We don't need anything special here.
304+
{ value: 'foo' },
305+
{ value: 'bar' },
306+
{ value: 'baz' }
307+
];
308+
const it = (async function* () {
309+
yield* expected;
310+
})();
311+
312+
const results = await flatten(chain([it]));
313+
314+
assert.deepEqual(results, expected);
315+
});
316+
});

0 commit comments

Comments
 (0)