Skip to content

Commit 5fc2723

Browse files
committed
Handle stream as stream
Rather than creating a linked list where each incremental data record also includes the next record in addition to any new defers and/or streams. Enables easily batching all available stream items within the same incremental entry.
1 parent cba24de commit 5fc2723

File tree

5 files changed

+235
-303
lines changed

5 files changed

+235
-303
lines changed

src/execution/IncrementalGraph.ts

Lines changed: 82 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
11
import { isPromise } from '../jsutils/isPromise.js';
22
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
33

4+
import type { GraphQLError } from '../error/GraphQLError.js';
5+
46
import type {
57
CompletedDeferredGroupedFieldSet,
68
CompletedIncrementalData,
79
CompletedReconcilableDeferredGroupedFieldSet,
810
DeferredFragmentRecord,
911
DeferredGroupedFieldSetRecord,
1012
IncrementalDataRecord,
11-
StreamItemsRecord,
13+
StreamItemRecord,
1214
StreamRecord,
1315
SubsequentResultRecord,
1416
} from './types.js';
15-
import {
16-
isDeferredGroupedFieldSetRecord,
17-
isStreamItemsRecord,
18-
} from './types.js';
17+
import { isDeferredGroupedFieldSetRecord } from './types.js';
1918

2019
interface DeferredFragmentNode {
2120
deferredFragmentRecord: DeferredFragmentRecord;
@@ -31,9 +30,9 @@ function isDeferredFragmentNode(
3130
}
3231

3332
function isStreamNode(
34-
subsequentResultNode: SubsequentResultNode,
35-
): subsequentResultNode is StreamRecord {
36-
return 'path' in subsequentResultNode;
33+
record: SubsequentResultNode | IncrementalDataRecord,
34+
): record is StreamRecord {
35+
return 'streamItemRecords' in record;
3736
}
3837

3938
type SubsequentResultNode = DeferredFragmentNode | StreamRecord;
@@ -71,7 +70,7 @@ export class IncrementalGraph {
7170
if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
7271
this._addDeferredGroupedFieldSetRecord(incrementalDataRecord);
7372
} else {
74-
this._addStreamItemsRecord(incrementalDataRecord);
73+
this._addStreamRecord(incrementalDataRecord);
7574
}
7675
}
7776
}
@@ -101,6 +100,7 @@ export class IncrementalGraph {
101100
if (isStreamNode(node)) {
102101
this._pending.add(node);
103102
newPending.add(node);
103+
this._newIncrementalDataRecords.add(node);
104104
} else if (node.deferredGroupedFieldSetRecords.size > 0) {
105105
for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) {
106106
this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode);
@@ -116,22 +116,12 @@ export class IncrementalGraph {
116116
this._newPending.clear();
117117

118118
for (const incrementalDataRecord of this._newIncrementalDataRecords) {
119-
if (isStreamItemsRecord(incrementalDataRecord)) {
120-
const result = incrementalDataRecord.streamItemsResult.value;
121-
if (isPromise(result)) {
122-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
123-
result.then((resolved) =>
124-
this._enqueue({
125-
streamItemsRecord: incrementalDataRecord,
126-
streamItemsResult: resolved,
127-
}),
128-
);
129-
} else {
130-
this._enqueue({
131-
streamItemsRecord: incrementalDataRecord,
132-
streamItemsResult: result,
133-
});
134-
}
119+
if (isStreamNode(incrementalDataRecord)) {
120+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
121+
this._onStreamItems(
122+
incrementalDataRecord,
123+
incrementalDataRecord.streamItemRecords,
124+
);
135125
} else {
136126
const result =
137127
incrementalDataRecord.deferredGroupedFieldSetResult.value;
@@ -283,12 +273,8 @@ export class IncrementalGraph {
283273
}
284274
}
285275

286-
private _addStreamItemsRecord(streamItemsRecord: StreamItemsRecord): void {
287-
const streamRecord = streamItemsRecord.streamRecord;
288-
if (!this._pending.has(streamRecord)) {
289-
this._newPending.add(streamRecord);
290-
}
291-
this._newIncrementalDataRecords.add(streamItemsRecord);
276+
private _addStreamRecord(streamRecord: StreamRecord): void {
277+
this._newPending.add(streamRecord);
292278
}
293279

294280
private _addDeferredFragmentNode(
@@ -320,6 +306,71 @@ export class IncrementalGraph {
320306
return deferredFragmentNode;
321307
}
322308

309+
private async _onStreamItems(
310+
streamRecord: StreamRecord,
311+
streamItemRecords: Array<StreamItemRecord>,
312+
): Promise<void> {
313+
let items: Array<unknown> = [];
314+
let errors: Array<GraphQLError> = [];
315+
let incrementalDataRecords: Array<IncrementalDataRecord> = [];
316+
let streamItemRecord: StreamItemRecord | undefined;
317+
while ((streamItemRecord = streamItemRecords.shift()) !== undefined) {
318+
let result = streamItemRecord.value;
319+
if (isPromise(result)) {
320+
if (items.length > 0) {
321+
this._enqueue({
322+
streamRecord,
323+
streamItemsResult: {
324+
result:
325+
// TODO add additional test case or rework for coverage
326+
errors.length > 0 /* c8 ignore start */
327+
? { items, errors } /* c8 ignore stop */
328+
: { items },
329+
incrementalDataRecords,
330+
},
331+
});
332+
items = [];
333+
errors = [];
334+
incrementalDataRecords = [];
335+
}
336+
// eslint-disable-next-line no-await-in-loop
337+
result = await result;
338+
// wait an additional tick to coalesce resolving additional promises
339+
// within the queue
340+
// eslint-disable-next-line no-await-in-loop
341+
await Promise.resolve();
342+
}
343+
if (result.item === undefined) {
344+
if (items.length > 0) {
345+
this._enqueue({
346+
streamRecord,
347+
streamItemsResult: {
348+
result: errors.length > 0 ? { items, errors } : { items },
349+
incrementalDataRecords,
350+
},
351+
});
352+
}
353+
this._enqueue({
354+
streamRecord,
355+
streamItemsResult:
356+
result.errors === undefined
357+
? {}
358+
: {
359+
errors: result.errors,
360+
},
361+
});
362+
return;
363+
}
364+
items.push(result.item);
365+
if (result.errors !== undefined) {
366+
errors.push(...result.errors);
367+
}
368+
if (result.incrementalDataRecords !== undefined) {
369+
incrementalDataRecords.push(...result.incrementalDataRecords);
370+
}
371+
}
372+
}
373+
323374
private *_yieldCurrentCompletedIncrementalData(
324375
first: CompletedIncrementalData,
325376
): Generator<CompletedIncrementalData> {

src/execution/IncrementalPublisher.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -296,9 +296,8 @@ class IncrementalPublisher {
296296
completedStreamItems: CompletedStreamItems,
297297
context: SubsequentIncrementalExecutionResultContext,
298298
): void {
299-
const { streamItemsRecord, streamItemsResult } = completedStreamItems;
300-
const streamRecord = streamItemsRecord.streamRecord;
301-
const id = streamItemsRecord.streamRecord.id;
299+
const { streamRecord, streamItemsResult } = completedStreamItems;
300+
const id = streamRecord.id;
302301
invariant(id !== undefined);
303302
if (streamItemsResult.errors !== undefined) {
304303
this._incrementalGraph.removeStream(streamRecord);

0 commit comments

Comments
 (0)