Skip to content

Commit a521d69

Browse files
committed
clean up PromiseCanceller when used with subscriptions
1 parent d91b66f commit a521d69

File tree

6 files changed

+143
-159
lines changed

6 files changed

+143
-159
lines changed

src/execution/PromiseCanceller.ts

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,14 @@ export class PromiseCanceller {
2828
this.abortSignal.removeEventListener('abort', this.abort);
2929
}
3030

31-
cancellablePromise<T>(
32-
originalPromise: Promise<T>,
33-
onCancel?: (() => unknown) | undefined,
34-
): Promise<T> {
31+
cancellablePromise<T>(originalPromise: Promise<T>): Promise<T> {
3532
if (this.abortSignal.aborted) {
36-
onCancel?.();
3733
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
3834
return Promise.reject(this.abortSignal.reason);
3935
}
4036

4137
const { promise, resolve, reject } = promiseWithResolvers<T>();
4238
const abort = () => {
43-
onCancel?.();
4439
reject(this.abortSignal.reason);
4540
};
4641
this._aborts.add(abort);
@@ -61,28 +56,22 @@ export class PromiseCanceller {
6156
cancellableIterable<T>(iterable: AsyncIterable<T>): AsyncIterable<T> {
6257
const iterator = iterable[Symbol.asyncIterator]();
6358

59+
const _next = iterator.next.bind(iterator);
60+
6461
if (iterator.return) {
6562
const _return = iterator.return.bind(iterator);
66-
const _returnIgnoringErrors = async (): Promise<IteratorResult<T>> => {
67-
_return().catch(() => {
68-
/* c8 ignore next */
69-
// ignore
70-
});
71-
return Promise.resolve({ value: undefined, done: true });
72-
};
7363

7464
return {
7565
[Symbol.asyncIterator]: () => ({
76-
next: () =>
77-
this.cancellablePromise(iterator.next(), _returnIgnoringErrors),
66+
next: () => this.cancellablePromise(_next()),
7867
return: () => this.cancellablePromise(_return()),
7968
}),
8069
};
8170
}
8271

8372
return {
8473
[Symbol.asyncIterator]: () => ({
85-
next: () => this.cancellablePromise(iterator.next()),
74+
next: () => this.cancellablePromise(_next()),
8675
}),
8776
};
8877
}
Lines changed: 0 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { expect } from 'chai';
21
import { describe, it } from 'mocha';
32

43
import { expectPromise } from '../../__testUtils__/expectPromise.js';
@@ -70,62 +69,6 @@ describe('PromiseCanceller', () => {
7069

7170
await expectPromise(withCancellation).toRejectWith('Cancelled!');
7271
});
73-
74-
it('works to trigger onCancel when cancelling a hanging promise', async () => {
75-
const abortController = new AbortController();
76-
const abortSignal = abortController.signal;
77-
78-
const promiseCanceller = new PromiseCanceller(abortSignal);
79-
80-
const promise = new Promise(() => {
81-
/* never resolves */
82-
});
83-
84-
let onCancelCalled = false;
85-
const onCancel = () => {
86-
onCancelCalled = true;
87-
};
88-
89-
const withCancellation = promiseCanceller.cancellablePromise(
90-
promise,
91-
onCancel,
92-
);
93-
94-
expect(onCancelCalled).to.equal(false);
95-
96-
abortController.abort(new Error('Cancelled!'));
97-
98-
expect(onCancelCalled).to.equal(true);
99-
100-
await expectPromise(withCancellation).toRejectWith('Cancelled!');
101-
});
102-
103-
it('works to trigger onCancel when cancelling a hanging promise created after abort signal triggered', async () => {
104-
const abortController = new AbortController();
105-
const abortSignal = abortController.signal;
106-
107-
abortController.abort(new Error('Cancelled!'));
108-
109-
const promiseCanceller = new PromiseCanceller(abortSignal);
110-
111-
const promise = new Promise(() => {
112-
/* never resolves */
113-
});
114-
115-
let onCancelCalled = false;
116-
const onCancel = () => {
117-
onCancelCalled = true;
118-
};
119-
120-
const withCancellation = promiseCanceller.cancellablePromise(
121-
promise,
122-
onCancel,
123-
);
124-
125-
expect(onCancelCalled).to.equal(true);
126-
127-
await expectPromise(withCancellation).toRejectWith('Cancelled!');
128-
});
12972
});
13073

13174
describe('cancellableAsyncIterable', () => {
@@ -174,69 +117,5 @@ describe('PromiseCanceller', () => {
174117

175118
await expectPromise(nextPromise).toRejectWith('Cancelled!');
176119
});
177-
178-
it('works to call return', async () => {
179-
const abortController = new AbortController();
180-
const abortSignal = abortController.signal;
181-
182-
const promiseCanceller = new PromiseCanceller(abortSignal);
183-
184-
let returned = false;
185-
const asyncIterable = {
186-
[Symbol.asyncIterator]: () => ({
187-
next: () => Promise.resolve({ value: 1, done: false }),
188-
return: () => {
189-
returned = true;
190-
return Promise.resolve({ value: undefined, done: true });
191-
},
192-
}),
193-
};
194-
195-
const cancellableAsyncIterable =
196-
promiseCanceller.cancellableIterable(asyncIterable);
197-
198-
abortController.abort(new Error('Cancelled!'));
199-
200-
expect(returned).to.equal(false);
201-
202-
const nextPromise =
203-
cancellableAsyncIterable[Symbol.asyncIterator]().next();
204-
205-
expect(returned).to.equal(true);
206-
207-
await expectPromise(nextPromise).toRejectWith('Cancelled!');
208-
});
209-
210-
it('works to call return when already aborted', async () => {
211-
const abortController = new AbortController();
212-
const abortSignal = abortController.signal;
213-
214-
abortController.abort(new Error('Cancelled!'));
215-
216-
const promiseCanceller = new PromiseCanceller(abortSignal);
217-
218-
let returned = false;
219-
const asyncIterable = {
220-
[Symbol.asyncIterator]: () => ({
221-
next: () => Promise.resolve({ value: 1, done: false }),
222-
return: () => {
223-
returned = true;
224-
return Promise.resolve({ value: undefined, done: true });
225-
},
226-
}),
227-
};
228-
229-
const cancellableAsyncIterable =
230-
promiseCanceller.cancellableIterable(asyncIterable);
231-
232-
expect(returned).to.equal(false);
233-
234-
const nextPromise =
235-
cancellableAsyncIterable[Symbol.asyncIterator]().next();
236-
237-
expect(returned).to.equal(true);
238-
239-
await expectPromise(nextPromise).toRejectWith('Cancelled!');
240-
});
241120
});
242121
});

src/execution/__tests__/abort-signal-test.ts

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -795,7 +795,7 @@ describe('Execute: Cancellation', () => {
795795
});
796796
});
797797

798-
it('should stop the execution when aborted during subscription', async () => {
798+
it('should stop the execution when aborted prior to return of a subscription resolver', async () => {
799799
const abortController = new AbortController();
800800
const document = parse(`
801801
subscription {
@@ -830,6 +830,44 @@ describe('Execute: Cancellation', () => {
830830
});
831831
});
832832

833+
it('should successfully wrap the subscription', async () => {
834+
const abortController = new AbortController();
835+
const document = parse(`
836+
subscription {
837+
foo
838+
}
839+
`);
840+
841+
async function* foo() {
842+
yield await Promise.resolve({ foo: 'foo' });
843+
}
844+
845+
const subscription = await subscribe({
846+
document,
847+
schema,
848+
abortSignal: abortController.signal,
849+
rootValue: {
850+
foo: Promise.resolve(foo()),
851+
},
852+
});
853+
854+
assert(isAsyncIterable(subscription));
855+
856+
expectJSON(await subscription.next()).toDeepEqual({
857+
value: {
858+
data: {
859+
foo: 'foo',
860+
},
861+
},
862+
done: false,
863+
});
864+
865+
expectJSON(await subscription.next()).toDeepEqual({
866+
value: undefined,
867+
done: true,
868+
});
869+
});
870+
833871
it('should stop the execution when aborted during subscription', async () => {
834872
const abortController = new AbortController();
835873
const document = parse(`
@@ -838,15 +876,16 @@ describe('Execute: Cancellation', () => {
838876
}
839877
`);
840878

879+
async function* foo() {
880+
yield await Promise.resolve({ foo: 'foo' });
881+
}
882+
841883
const subscription = subscribe({
842884
document,
843885
schema,
844886
abortSignal: abortController.signal,
845887
rootValue: {
846-
async *foo() {
847-
yield await Promise.resolve({ foo: 'foo' });
848-
yield await Promise.resolve({ foo: 'foo' }); /* c8 ignore start */
849-
} /* c8 ignore stop */,
888+
foo: foo(),
850889
},
851890
});
852891

src/execution/__tests__/mapAsyncIterable-test.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,54 @@ describe('mapAsyncIterable', () => {
8989
});
9090
});
9191

92+
it('calls done when completes', async () => {
93+
async function* source() {
94+
yield 1;
95+
yield 2;
96+
yield 3;
97+
}
98+
99+
let done = false;
100+
const doubles = mapAsyncIterable(
101+
source(),
102+
(x) => Promise.resolve(x + x),
103+
() => {
104+
done = true;
105+
},
106+
);
107+
108+
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
109+
expect(await doubles.next()).to.deep.equal({ value: 4, done: false });
110+
expect(await doubles.next()).to.deep.equal({ value: 6, done: false });
111+
expect(done).to.equal(false);
112+
expect(await doubles.next()).to.deep.equal({
113+
value: undefined,
114+
done: true,
115+
});
116+
expect(done).to.equal(true);
117+
});
118+
119+
it('calls done when completes with error', async () => {
120+
async function* source() {
121+
yield 1;
122+
throw new Error('Oops');
123+
}
124+
125+
let done = false;
126+
const doubles = mapAsyncIterable(
127+
source(),
128+
(x) => Promise.resolve(x + x),
129+
() => {
130+
done = true;
131+
},
132+
);
133+
134+
expect(await doubles.next()).to.deep.equal({ value: 2, done: false });
135+
expect(done).to.equal(false);
136+
await expectPromise(doubles.next()).toRejectWith('Oops');
137+
expect(done).to.equal(true);
138+
});
139+
92140
it('allows returning early from mapped async generator', async () => {
93141
async function* source() {
94142
try {

src/execution/execute.ts

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2099,6 +2099,13 @@ export function subscribe(
20992099
return mapSourceToResponse(validatedExecutionArgs, resultOrStream);
21002100
}
21012101

2102+
/**
2103+
*
2104+
* For each payload yielded from a subscription, map it over the normal
2105+
* GraphQL `execute` function, with `payload` as the rootValue.
2106+
* This implements the "MapSourceToResponseEvent" algorithm described in
2107+
* the GraphQL specification..
2108+
*/
21022109
function mapSourceToResponse(
21032110
validatedExecutionArgs: ValidatedExecutionArgs,
21042111
resultOrStream: ExecutionResult | AsyncIterable<unknown>,
@@ -2107,10 +2114,22 @@ function mapSourceToResponse(
21072114
return resultOrStream;
21082115
}
21092116

2110-
// For each payload yielded from a subscription, map it over the normal
2111-
// GraphQL `execute` function, with `payload` as the rootValue.
2112-
// This implements the "MapSourceToResponseEvent" algorithm described in
2113-
// the GraphQL specification..
2117+
const abortSignal = validatedExecutionArgs.abortSignal;
2118+
if (abortSignal) {
2119+
const promiseCanceller = new PromiseCanceller(abortSignal);
2120+
return mapAsyncIterable(
2121+
promiseCanceller?.cancellableIterable(resultOrStream),
2122+
(payload: unknown) => {
2123+
const perEventExecutionArgs: ValidatedExecutionArgs = {
2124+
...validatedExecutionArgs,
2125+
rootValue: payload,
2126+
};
2127+
return validatedExecutionArgs.perEventExecutor(perEventExecutionArgs);
2128+
},
2129+
() => promiseCanceller.disconnect(),
2130+
);
2131+
}
2132+
21142133
return mapAsyncIterable(resultOrStream, (payload: unknown) => {
21152134
const perEventExecutionArgs: ValidatedExecutionArgs = {
21162135
...validatedExecutionArgs,
@@ -2265,16 +2284,16 @@ function executeSubscription(
22652284
// used to represent an authenticated user, or request-specific caches.
22662285
const result = resolveFn(rootValue, args, contextValue, info, abortSignal);
22672286

2268-
const promiseCanceller = abortSignal
2269-
? new PromiseCanceller(abortSignal)
2270-
: undefined;
2271-
22722287
if (isPromise(result)) {
2288+
const promiseCanceller = abortSignal
2289+
? new PromiseCanceller(abortSignal)
2290+
: undefined;
2291+
22732292
const promise = promiseCanceller?.cancellablePromise(result) ?? result;
22742293
return promise.then(assertEventStream).then(
22752294
(resolved) => {
22762295
promiseCanceller?.disconnect();
2277-
return promiseCanceller?.cancellableIterable(resolved) ?? resolved;
2296+
return resolved;
22782297
},
22792298
(error: unknown) => {
22802299
promiseCanceller?.disconnect();
@@ -2284,7 +2303,7 @@ function executeSubscription(
22842303
}
22852304

22862305
const eventStream = assertEventStream(result);
2287-
return promiseCanceller?.cancellableIterable(eventStream) ?? eventStream;
2306+
return eventStream;
22882307
} catch (error) {
22892308
throw locatedError(error, fieldNodes, pathToArray(path));
22902309
}

0 commit comments

Comments
 (0)