Skip to content

Commit 3a14f7e

Browse files
committed
coalesce multiple stream items into single incremental entry
1 parent 9c9d80d commit 3a14f7e

File tree

3 files changed

+320
-338
lines changed

3 files changed

+320
-338
lines changed

src/execution/IncrementalPublisher.ts

Lines changed: 172 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ class IncrementalPublisher {
224224
): ExperimentalIncrementalExecutionResults {
225225
this._addIncrementalDataRecords(incrementalDataRecords);
226226
this._pruneEmpty();
227-
this._enqueueNewRootDeferredFragments();
227+
this._enqueueNew();
228228

229229
const pending = this._pendingSourcesToResults();
230230

@@ -257,22 +257,17 @@ class IncrementalPublisher {
257257
}
258258

259259
if (pending) {
260-
this._enqueueCompletedIncrementalDataRecordResult(
260+
this._enqueueCompletedDeferredGroupedFieldSetResult(
261261
incrementalDataRecord.result,
262262
);
263263
}
264264

265265
continue;
266266
}
267267

268-
const streamRecord = incrementalDataRecord.streamRecord;
269-
if (streamRecord.id === undefined) {
270-
this._newPending.add(streamRecord);
268+
if (incrementalDataRecord.id === undefined) {
269+
this._newPending.add(incrementalDataRecord);
271270
}
272-
273-
this._enqueueCompletedIncrementalDataRecordResult(
274-
incrementalDataRecord.result,
275-
);
276271
}
277272
}
278273

@@ -333,8 +328,9 @@ class IncrementalPublisher {
333328
}
334329
}
335330

336-
private _enqueueNewRootDeferredFragments(): void {
337-
const incrementalDataRecordsToWatch = new Set<IncrementalDataRecord>();
331+
private _enqueueNew(): void {
332+
const deferredGroupedFieldSetRecords =
333+
new Set<DeferredGroupedFieldSetRecord>();
338334
for (const subsequentResultRecord of this._newPending) {
339335
if (isDeferredFragmentRecord(subsequentResultRecord)) {
340336
for (const deferredGroupedFieldSetRecord of subsequentResultRecord.deferredGroupedFieldSetRecords) {
@@ -351,19 +347,21 @@ class IncrementalPublisher {
351347
continue;
352348
}
353349
}
354-
incrementalDataRecordsToWatch.add(deferredGroupedFieldSetRecord);
350+
deferredGroupedFieldSetRecords.add(deferredGroupedFieldSetRecord);
355351
}
352+
} else if (isStreamRecord(subsequentResultRecord)) {
353+
this._enqueueStreamRecord(subsequentResultRecord);
356354
}
357355
}
358356

359-
for (const incrementalDataRecord of incrementalDataRecordsToWatch) {
360-
this._enqueueCompletedIncrementalDataRecordResult(
361-
incrementalDataRecord.result,
357+
for (const deferredGroupedFieldSetRecord of deferredGroupedFieldSetRecords) {
358+
this._enqueueCompletedDeferredGroupedFieldSetResult(
359+
deferredGroupedFieldSetRecord.result,
362360
);
363361
}
364362
}
365363

366-
private _enqueueCompletedIncrementalDataRecordResult(
364+
private _enqueueCompletedDeferredGroupedFieldSetResult(
367365
result: PromiseOrValue<IncrementalDataRecordResult>,
368366
): void {
369367
if (isPromise(result)) {
@@ -378,6 +376,81 @@ class IncrementalPublisher {
378376
}
379377
}
380378

379+
private _enqueueStreamRecord(record: StreamRecord): void {
380+
const streamItemRecords = record.streamItemRecords;
381+
if (isPromise(streamItemRecords)) {
382+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
383+
streamItemRecords.then((resolved) =>
384+
this._enqueueStreamItems(record, resolved),
385+
);
386+
} else {
387+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
388+
this._enqueueStreamItems(record, streamItemRecords);
389+
}
390+
}
391+
392+
private async _enqueueStreamItems(
393+
streamRecord: StreamRecord,
394+
streamItemRecords: Array<StreamItemRecord>,
395+
): Promise<void> {
396+
let items: Array<unknown> = [];
397+
let errors: Array<GraphQLError> = [];
398+
let incrementalDataRecords: Array<IncrementalDataRecord> = [];
399+
let streamItemRecord: StreamItemRecord | undefined;
400+
while ((streamItemRecord = streamItemRecords.shift()) !== undefined) {
401+
let result = streamItemRecord.result;
402+
if (isPromise(result)) {
403+
if (items.length > 0) {
404+
this._completedResultQueue.push({
405+
streamRecord,
406+
result:
407+
// TODO: add test case or rework for coverage
408+
errors.length > 0 /* c8 ignore start */
409+
? { items, errors } /* c8 ignore stop */
410+
: { items },
411+
incrementalDataRecords,
412+
});
413+
this._trigger();
414+
items = [];
415+
errors = [];
416+
incrementalDataRecords = [];
417+
}
418+
// eslint-disable-next-line no-await-in-loop
419+
result = await result;
420+
// wait an additional tick to coalesce resolving additional promises
421+
// within the queue
422+
// eslint-disable-next-line no-await-in-loop
423+
await Promise.resolve();
424+
}
425+
if (result.item === undefined) {
426+
if (items.length > 0) {
427+
this._completedResultQueue.push({
428+
streamRecord,
429+
result: errors.length > 0 ? { items, errors } : { items },
430+
incrementalDataRecords,
431+
});
432+
}
433+
this._completedResultQueue.push(
434+
result.errors === undefined
435+
? { streamRecord }
436+
: {
437+
streamRecord,
438+
errors: result.errors,
439+
},
440+
);
441+
this._trigger();
442+
return;
443+
}
444+
items.push(result.item);
445+
if (result.errors !== undefined) {
446+
errors.push(...result.errors);
447+
}
448+
if (result.incrementalDataRecords !== undefined) {
449+
incrementalDataRecords.push(...result.incrementalDataRecords);
450+
}
451+
}
452+
}
453+
381454
private _pendingSourcesToResults(): Array<PendingResult> {
382455
const pendingResults: Array<PendingResult> = [];
383456
for (const pendingSource of this._newPending) {
@@ -424,7 +497,7 @@ class IncrementalPublisher {
424497
}
425498

426499
this._pruneEmpty();
427-
this._enqueueNewRootDeferredFragments();
500+
this._enqueueNew();
428501

429502
pending = [...pending, ...this._pendingSourcesToResults()];
430503
}
@@ -720,9 +793,25 @@ function isNonReconcilableDeferredGroupedFieldSetResult(
720793
return deferredGroupedFieldSetResult.errors !== undefined;
721794
}
722795

723-
export interface DeferredGroupedFieldSetRecord {
796+
/** @internal */
797+
export class DeferredGroupedFieldSetRecord {
724798
deferredFragmentRecords: ReadonlyArray<DeferredFragmentRecord>;
725799
result: PromiseOrValue<DeferredGroupedFieldSetResult>;
800+
801+
constructor(opts: {
802+
deferredFragmentRecords: ReadonlyArray<DeferredFragmentRecord>;
803+
result: PromiseOrValue<DeferredGroupedFieldSetResult>;
804+
}) {
805+
this.deferredFragmentRecords = opts.deferredFragmentRecords;
806+
const result = opts.result;
807+
this.result = result;
808+
if (isPromise(result)) {
809+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
810+
result.then((resolved) => {
811+
this.result = resolved;
812+
});
813+
}
814+
}
726815
}
727816

728817
export interface SubsequentResultRecord {
@@ -755,55 +844,93 @@ export class DeferredFragmentRecord {
755844
}
756845
}
757846

758-
export interface CancellableStreamRecord extends SubsequentResultRecord {
847+
/** @internal */
848+
export class StreamRecord {
849+
path: Path | undefined;
850+
label: string | undefined;
851+
id?: string | undefined;
852+
streamItemRecords: PromiseOrValue<Array<StreamItemRecord>>;
853+
854+
constructor(opts: {
855+
path: Path | undefined;
856+
label: string | undefined;
857+
streamItemRecords: PromiseOrValue<Array<StreamItemRecord>>;
858+
}) {
859+
this.path = opts.path;
860+
this.label = opts.label;
861+
const streamItemRecords = opts.streamItemRecords;
862+
this.streamItemRecords = streamItemRecords;
863+
if (isPromise(streamItemRecords)) {
864+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
865+
streamItemRecords.then((resolved) => {
866+
this.streamItemRecords = resolved;
867+
});
868+
}
869+
}
870+
}
871+
872+
/** @internal */
873+
export class CancellableStreamRecord extends StreamRecord {
759874
earlyReturn: () => Promise<unknown>;
875+
876+
constructor(opts: {
877+
path: Path | undefined;
878+
label: string | undefined;
879+
streamItemRecords: PromiseOrValue<Array<StreamItemRecord>>;
880+
earlyReturn: () => Promise<unknown>;
881+
}) {
882+
super(opts);
883+
this.earlyReturn = opts.earlyReturn;
884+
}
885+
}
886+
887+
function isStreamRecord(
888+
record: SubsequentResultRecord | IncrementalDataRecord,
889+
): record is StreamRecord {
890+
return record instanceof StreamRecord;
760891
}
761892

762893
function isCancellableStreamRecord(
763894
subsequentResultRecord: SubsequentResultRecord,
764895
): subsequentResultRecord is CancellableStreamRecord {
765-
return 'earlyReturn' in subsequentResultRecord;
766-
}
767-
768-
interface ReconcilableStreamItemsResult {
769-
streamRecord: SubsequentResultRecord;
770-
result: BareStreamItemsResult;
771-
incrementalDataRecords: ReadonlyArray<IncrementalDataRecord> | undefined;
772-
errors?: never;
896+
return subsequentResultRecord instanceof CancellableStreamRecord;
773897
}
774898

775-
export function isReconcilableStreamItemsResult(
776-
streamItemsResult: StreamItemsResult,
777-
): streamItemsResult is ReconcilableStreamItemsResult {
778-
return streamItemsResult.result !== undefined;
899+
/** @internal */
900+
export class StreamItemRecord {
901+
result: PromiseOrValue<StreamItemResult>;
902+
constructor(result: PromiseOrValue<StreamItemResult>) {
903+
this.result = result;
904+
if (isPromise(result)) {
905+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
906+
result.then((resolved) => {
907+
this.result = resolved;
908+
});
909+
}
910+
}
779911
}
780912

781-
interface TerminatingStreamItemsResult {
782-
streamRecord: SubsequentResultRecord;
783-
result?: never;
784-
incrementalDataRecords?: never;
785-
errors?: never;
913+
export interface StreamItemResult {
914+
item?: unknown;
915+
incrementalDataRecords?: ReadonlyArray<IncrementalDataRecord> | undefined;
916+
errors?: ReadonlyArray<GraphQLError> | undefined;
786917
}
787918

788-
interface NonReconcilableStreamItemsResult {
919+
interface StreamItemsResult {
789920
streamRecord: SubsequentResultRecord;
790-
errors: ReadonlyArray<GraphQLError>;
791-
result?: never;
921+
result?: BareStreamItemsResult | undefined;
922+
incrementalDataRecords?: ReadonlyArray<IncrementalDataRecord> | undefined;
923+
errors?: ReadonlyArray<GraphQLError> | undefined;
792924
}
793925

794-
export type StreamItemsResult =
795-
| ReconcilableStreamItemsResult
796-
| TerminatingStreamItemsResult
797-
| NonReconcilableStreamItemsResult;
798-
799926
export interface StreamItemsRecord {
800927
streamRecord: SubsequentResultRecord;
801928
result: PromiseOrValue<StreamItemsResult>;
802929
}
803930

804931
export type IncrementalDataRecord =
805932
| DeferredGroupedFieldSetRecord
806-
| StreamItemsRecord;
933+
| StreamRecord;
807934

808935
export type IncrementalDataRecordResult =
809936
| DeferredGroupedFieldSetResult

0 commit comments

Comments
 (0)