Skip to content

Commit 20904e5

Browse files
committed
incremental: add highWaterMark option to apply backpressure when using async streams
This protects against a potential OOM error if we end up pulling data from a stream faster than it is consumed. Default is set at 100; after pulling 100 entries, we will pause until some have been flushed.
1 parent e15c3ec commit 20904e5

File tree

3 files changed

+177
-15
lines changed

3 files changed

+177
-15
lines changed

src/execution/IncrementalPublisher.ts

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ export function buildIncrementalResponse(
185185
}
186186

187187
interface IncrementalPublisherContext {
188+
streamHighWaterMark: number;
188189
cancellableStreams: Set<CancellableStreamRecord> | undefined;
189190
}
190191

@@ -201,6 +202,7 @@ class IncrementalPublisher {
201202
private _completedResultQueue: Array<IncrementalDataRecordResult>;
202203
private _newPending: Set<SubsequentResultRecord>;
203204
private _incremental: Array<IncrementalResult>;
205+
private _asyncStreamCounts: Map<AsyncStreamRecord, number>;
204206
private _completed: Array<CompletedResult>;
205207
// these are assigned within the Promise executor called synchronously within the constructor
206208
private _signalled!: Promise<unknown>;
@@ -213,6 +215,7 @@ class IncrementalPublisher {
213215
this._completedResultQueue = [];
214216
this._newPending = new Set();
215217
this._incremental = [];
218+
this._asyncStreamCounts = new Map();
216219
this._completed = [];
217220
this._reset();
218221
}
@@ -427,7 +430,18 @@ class IncrementalPublisher {
427430
subsequentIncrementalExecutionResult.completed = this._completed;
428431
}
429432

433+
for (const [streamRecord, count] of this._asyncStreamCounts) {
434+
streamRecord.waterMark -= count;
435+
if (
436+
streamRecord.resume !== undefined &&
437+
streamRecord.waterMark < this._context.streamHighWaterMark
438+
) {
439+
streamRecord.resume();
440+
}
441+
}
442+
430443
this._incremental = [];
444+
this._asyncStreamCounts.clear();
431445
this._completed = [];
432446

433447
return { value: subsequentIncrementalExecutionResult, done: false };
@@ -593,20 +607,26 @@ class IncrementalPublisher {
593607
errors: streamItemsResult.errors,
594608
});
595609
this._pending.delete(streamRecord);
596-
if (isCancellableStreamRecord(streamRecord)) {
597-
invariant(this._context.cancellableStreams !== undefined);
598-
this._context.cancellableStreams.delete(streamRecord);
599-
streamRecord.earlyReturn().catch(() => {
600-
/* c8 ignore next 1 */
601-
// ignore error
602-
});
610+
if (isAsyncStreamRecord(streamRecord)) {
611+
this._asyncStreamCounts.delete(streamRecord);
612+
if (isCancellableStreamRecord(streamRecord)) {
613+
invariant(this._context.cancellableStreams !== undefined);
614+
this._context.cancellableStreams.delete(streamRecord);
615+
streamRecord.earlyReturn().catch(() => {
616+
/* c8 ignore next 1 */
617+
// ignore error
618+
});
619+
}
603620
}
604621
} else if (streamItemsResult.result === undefined) {
605622
this._completed.push({ id });
606623
this._pending.delete(streamRecord);
607-
if (isCancellableStreamRecord(streamRecord)) {
608-
invariant(this._context.cancellableStreams !== undefined);
609-
this._context.cancellableStreams.delete(streamRecord);
624+
if (isAsyncStreamRecord(streamRecord)) {
625+
this._asyncStreamCounts.delete(streamRecord);
626+
if (isCancellableStreamRecord(streamRecord)) {
627+
invariant(this._context.cancellableStreams !== undefined);
628+
this._context.cancellableStreams.delete(streamRecord);
629+
}
610630
}
611631
} else {
612632
const incrementalEntry: IncrementalStreamResult = {
@@ -615,6 +635,13 @@ class IncrementalPublisher {
615635
};
616636

617637
this._incremental.push(incrementalEntry);
638+
if (isAsyncStreamRecord(streamRecord)) {
639+
const count = this._asyncStreamCounts.get(streamRecord);
640+
this._asyncStreamCounts.set(
641+
streamRecord,
642+
count === undefined ? 1 : count + 1,
643+
);
644+
}
618645

619646
if (streamItemsResult.incrementalDataRecords !== undefined) {
620647
this._addIncrementalDataRecords(
@@ -739,7 +766,18 @@ export class DeferredFragmentRecord implements SubsequentResultRecord {
739766
}
740767
}
741768

742-
export interface CancellableStreamRecord extends SubsequentResultRecord {
769+
export interface AsyncStreamRecord extends SubsequentResultRecord {
770+
waterMark: number;
771+
resume: (() => void) | undefined;
772+
}
773+
774+
function isAsyncStreamRecord(
775+
subsequentResultRecord: SubsequentResultRecord,
776+
): subsequentResultRecord is AsyncStreamRecord {
777+
return 'waterMark' in subsequentResultRecord;
778+
}
779+
780+
export interface CancellableStreamRecord extends AsyncStreamRecord {
743781
earlyReturn: () => Promise<unknown>;
744782
}
745783

src/execution/__tests__/stream-test.ts

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -667,6 +667,106 @@ describe('Execute: stream directive', () => {
667667
},
668668
});
669669
});
670+
it('Can stream a field that returns an async iterable with backpressure', async () => {
671+
const document = parse(`
672+
query {
673+
friendList @stream {
674+
name
675+
id
676+
}
677+
}
678+
`);
679+
let count = 0;
680+
const executeResult = await experimentalExecuteIncrementally({
681+
schema,
682+
document,
683+
rootValue: {
684+
async *friendList() {
685+
for (const friend of friends) {
686+
count++;
687+
// eslint-disable-next-line no-await-in-loop
688+
yield await Promise.resolve(friend);
689+
}
690+
},
691+
},
692+
streamHighWaterMark: 2,
693+
});
694+
assert('initialResult' in executeResult);
695+
const iterator = executeResult.subsequentResults[Symbol.asyncIterator]();
696+
697+
const result1 = executeResult.initialResult;
698+
expectJSON(result1).toDeepEqual({
699+
data: {
700+
friendList: [],
701+
},
702+
pending: [{ id: '0', path: ['friendList'] }],
703+
hasNext: true,
704+
});
705+
706+
expect(count).to.equal(2);
707+
708+
await resolveOnNextTick();
709+
await resolveOnNextTick();
710+
await resolveOnNextTick();
711+
await resolveOnNextTick();
712+
await resolveOnNextTick();
713+
714+
const result2 = await iterator.next();
715+
expectJSON(result2).toDeepEqual({
716+
done: false,
717+
value: {
718+
incremental: [
719+
{
720+
items: [{ name: 'Luke', id: '1' }],
721+
id: '0',
722+
},
723+
],
724+
hasNext: true,
725+
},
726+
});
727+
728+
expect(count).to.equal(3);
729+
730+
const result3 = await iterator.next();
731+
expectJSON(result3).toDeepEqual({
732+
done: false,
733+
value: {
734+
incremental: [
735+
{
736+
items: [{ name: 'Han', id: '2' }],
737+
id: '0',
738+
},
739+
],
740+
hasNext: true,
741+
},
742+
});
743+
744+
const result4 = await iterator.next();
745+
expectJSON(result4).toDeepEqual({
746+
done: false,
747+
value: {
748+
incremental: [
749+
{
750+
items: [{ name: 'Leia', id: '3' }],
751+
id: '0',
752+
},
753+
],
754+
hasNext: true,
755+
},
756+
});
757+
758+
const result5 = await iterator.next();
759+
expectJSON(result5).toDeepEqual({
760+
done: false,
761+
value: {
762+
completed: [{ id: '0' }],
763+
hasNext: false,
764+
},
765+
});
766+
767+
const result6 = await iterator.next();
768+
expectJSON(result6).toDeepEqual({ done: true, value: undefined });
769+
});
670770
it('Can handle concurrent calls to .next() without waiting', async () => {
671771
const document = parse(`
672772
query {

src/execution/execute.ts

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { addPath, pathToArray } from '../jsutils/Path.js';
1212
import { promiseForObject } from '../jsutils/promiseForObject.js';
1313
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
1414
import { promiseReduce } from '../jsutils/promiseReduce.js';
15+
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
1516

1617
import { GraphQLError } from '../error/GraphQLError.js';
1718
import { locatedError } from '../error/locatedError.js';
@@ -59,6 +60,7 @@ import {
5960
collectSubfields as _collectSubfields,
6061
} from './collectFields.js';
6162
import type {
63+
AsyncStreamRecord,
6264
CancellableStreamRecord,
6365
DeferredGroupedFieldSetRecord,
6466
DeferredGroupedFieldSetResult,
@@ -143,6 +145,7 @@ export interface ExecutionContext {
143145
typeResolver: GraphQLTypeResolver<any, any>;
144146
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
145147
errors: Array<GraphQLError> | undefined;
148+
streamHighWaterMark: number;
146149
cancellableStreams: Set<CancellableStreamRecord> | undefined;
147150
}
148151

@@ -161,6 +164,7 @@ export interface ExecutionArgs {
161164
fieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
162165
typeResolver?: Maybe<GraphQLTypeResolver<any, any>>;
163166
subscribeFieldResolver?: Maybe<GraphQLFieldResolver<any, any>>;
167+
streamHighWaterMark?: Maybe<number>;
164168
}
165169

166170
export interface StreamUsage {
@@ -439,6 +443,7 @@ export function buildExecutionContext(
439443
fieldResolver,
440444
typeResolver,
441445
subscribeFieldResolver,
446+
streamHighWaterMark,
442447
} = args;
443448

444449
// If the schema used for execution is invalid, throw an error.
@@ -504,6 +509,7 @@ export function buildExecutionContext(
504509
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
505510
errors: undefined,
506511
cancellableStreams: undefined,
512+
streamHighWaterMark: streamHighWaterMark ?? 100,
507513
};
508514
}
509515

@@ -1096,16 +1102,20 @@ async function completeAsyncIteratorValue(
10961102
while (true) {
10971103
if (streamUsage && index >= streamUsage.initialCount) {
10981104
const returnFn = asyncIterator.return;
1099-
let streamRecord: SubsequentResultRecord | CancellableStreamRecord;
1105+
let streamRecord: AsyncStreamRecord | CancellableStreamRecord;
11001106
if (returnFn === undefined) {
11011107
streamRecord = {
11021108
label: streamUsage.label,
11031109
path,
1104-
} as SubsequentResultRecord;
1110+
waterMark: 0,
1111+
resume: undefined,
1112+
};
11051113
} else {
11061114
streamRecord = {
11071115
label: streamUsage.label,
11081116
path,
1117+
waterMark: 0,
1118+
resume: undefined,
11091119
earlyReturn: returnFn.bind(asyncIterator),
11101120
};
11111121
if (exeContext.cancellableStreams === undefined) {
@@ -2317,7 +2327,7 @@ function prependNextResolvedStreamItems(
23172327
}
23182328

23192329
function firstAsyncStreamItems(
2320-
streamRecord: SubsequentResultRecord,
2330+
streamRecord: AsyncStreamRecord,
23212331
path: Path,
23222332
initialIndex: number,
23232333
asyncIterator: AsyncIterator<unknown>,
@@ -2343,7 +2353,7 @@ function firstAsyncStreamItems(
23432353
}
23442354

23452355
async function getNextAsyncStreamItemsResult(
2346-
streamRecord: SubsequentResultRecord,
2356+
streamRecord: AsyncStreamRecord,
23472357
path: Path,
23482358
index: number,
23492359
asyncIterator: AsyncIterator<unknown>,
@@ -2353,6 +2363,18 @@ async function getNextAsyncStreamItemsResult(
23532363
itemType: GraphQLOutputType,
23542364
): Promise<StreamItemsResult> {
23552365
let iteration;
2366+
2367+
const waterMark = streamRecord.waterMark;
2368+
2369+
if (waterMark === exeContext.streamHighWaterMark) {
2370+
// promiseWithResolvers uses void only as a generic type parameter
2371+
// see: https://typescript-eslint.io/rules/no-invalid-void-type/
2372+
// eslint-disable-next-line @typescript-eslint/no-invalid-void-type
2373+
const { promise: resumed, resolve: resume } = promiseWithResolvers<void>();
2374+
streamRecord.resume = resume;
2375+
await resumed;
2376+
}
2377+
23562378
try {
23572379
iteration = await asyncIterator.next();
23582380
} catch (error) {
@@ -2366,6 +2388,8 @@ async function getNextAsyncStreamItemsResult(
23662388
return { streamRecord };
23672389
}
23682390

2391+
streamRecord.waterMark++;
2392+
23692393
const itemPath = addPath(path, index, undefined);
23702394

23712395
const result = completeStreamItems(

0 commit comments

Comments
 (0)