Skip to content

Commit cc36653

Browse files
committed
add abort signal support to our async iterables
1 parent 46643cd commit cc36653

File tree

7 files changed

+558
-138
lines changed

7 files changed

+558
-138
lines changed

src/execution/AbortSignalListener.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
2+
3+
/**
4+
* A AbortSignalListener object can be used to trigger multiple responses
5+
* in response to an abort signal.
6+
*
7+
* @internal
8+
*/
9+
export class AbortSignalListener {
10+
abortSignal: AbortSignal;
11+
abort: () => void;
12+
13+
private _aborts: Set<() => void>;
14+
15+
constructor(abortSignal: AbortSignal) {
16+
this.abortSignal = abortSignal;
17+
this._aborts = new Set<() => void>();
18+
this.abort = () => {
19+
for (const abort of this._aborts) {
20+
abort();
21+
}
22+
};
23+
24+
abortSignal.addEventListener('abort', this.abort);
25+
}
26+
27+
disconnect(): void {
28+
this.abortSignal.removeEventListener('abort', this.abort);
29+
}
30+
31+
cancellablePromise<T>(
32+
originalPromise: Promise<T>,
33+
onCancel?: (() => unknown) | undefined,
34+
): Promise<T> {
35+
if (this.abortSignal.aborted) {
36+
onCancel?.();
37+
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
38+
return Promise.reject(this.abortSignal.reason);
39+
}
40+
41+
const { promise, resolve, reject } = promiseWithResolvers<T>();
42+
const abort = () => {
43+
onCancel?.();
44+
reject(this.abortSignal.reason);
45+
};
46+
this._aborts.add(abort);
47+
originalPromise.then(
48+
(resolved) => {
49+
this._aborts.delete(abort);
50+
resolve(resolved);
51+
},
52+
(error: unknown) => {
53+
this._aborts.delete(abort);
54+
reject(error);
55+
},
56+
);
57+
58+
return promise;
59+
}
60+
61+
cancellableIterable<T>(iterable: AsyncIterable<T>): AsyncIterable<T> {
62+
const iterator = iterable[Symbol.asyncIterator]();
63+
64+
if (iterator.return) {
65+
const _return = iterator.return.bind(iterator);
66+
const _returnIgnoringErrors = async (): Promise<IteratorResult<T>> => {
67+
_return().catch(() => {
68+
/* c8 ignore next */
69+
// ignore
70+
});
71+
return Promise.resolve({ value: undefined, done: true });
72+
};
73+
74+
return {
75+
[Symbol.asyncIterator]: () => ({
76+
next: () =>
77+
this.cancellablePromise(iterator.next(), _returnIgnoringErrors),
78+
return: () => this.cancellablePromise(_return()),
79+
}),
80+
};
81+
}
82+
83+
return {
84+
[Symbol.asyncIterator]: () => ({
85+
next: () => this.cancellablePromise(iterator.next()),
86+
}),
87+
};
88+
}
89+
}

src/execution/IncrementalPublisher.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import { pathToArray } from '../jsutils/Path.js';
44

55
import type { GraphQLError } from '../error/GraphQLError.js';
66

7+
import type { AbortSignalListener } from './AbortSignalListener.js';
78
import { IncrementalGraph } from './IncrementalGraph.js';
8-
import type { PromiseCanceller } from './PromiseCanceller.js';
99
import type {
1010
CancellableStreamRecord,
1111
CompletedExecutionGroup,
@@ -44,7 +44,7 @@ export function buildIncrementalResponse(
4444
}
4545

4646
interface IncrementalPublisherContext {
47-
promiseCanceller: PromiseCanceller | undefined;
47+
abortSignalListener: AbortSignalListener | undefined;
4848
cancellableStreams: Set<CancellableStreamRecord> | undefined;
4949
}
5050

@@ -127,7 +127,7 @@ class IncrementalPublisher {
127127
IteratorResult<SubsequentIncrementalExecutionResult, void>
128128
> => {
129129
if (isDone) {
130-
this._context.promiseCanceller?.disconnect();
130+
this._context.abortSignalListener?.disconnect();
131131
await this._returnAsyncIteratorsIgnoringErrors();
132132
return { value: undefined, done: true };
133133
}
@@ -176,7 +176,7 @@ class IncrementalPublisher {
176176

177177
// TODO: add test for this case
178178
/* c8 ignore next */
179-
this._context.promiseCanceller?.disconnect();
179+
this._context.abortSignalListener?.disconnect();
180180
await this._returnAsyncIteratorsIgnoringErrors();
181181
return { value: undefined, done: true };
182182
};

src/execution/PromiseCanceller.ts

Lines changed: 0 additions & 53 deletions
This file was deleted.

0 commit comments

Comments
 (0)