Skip to content

Commit 4ea959d

Browse files
committed
Handle stream as stream
Rather than creating a linked list where each incremental data record also includes the next record in addition to any new defers and/or streams. Enables easily batching all available stream items within the same incremental entry.
1 parent f3fafa7 commit 4ea959d

File tree

5 files changed

+235
-303
lines changed

5 files changed

+235
-303
lines changed

src/execution/IncrementalGraph.ts

Lines changed: 82 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
11
import { isPromise } from '../jsutils/isPromise.js';
22
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
33

4+
import type { GraphQLError } from '../error/GraphQLError.js';
5+
46
import type {
57
CompletedDeferredGroupedFieldSet,
68
CompletedIncrementalData,
79
CompletedReconcilableDeferredGroupedFieldSet,
810
DeferredFragmentRecord,
911
DeferredGroupedFieldSetRecord,
1012
IncrementalDataRecord,
11-
StreamItemsRecord,
13+
StreamItemRecord,
1214
StreamRecord,
1315
SubsequentResultRecord,
1416
} from './types.js';
15-
import {
16-
isDeferredGroupedFieldSetRecord,
17-
isStreamItemsRecord,
18-
} from './types.js';
17+
import { isDeferredGroupedFieldSetRecord } from './types.js';
1918

2019
interface DeferredFragmentNode {
2120
deferredFragmentRecord: DeferredFragmentRecord;
@@ -31,9 +30,9 @@ function isDeferredFragmentNode(
3130
}
3231

3332
function isStreamNode(
34-
subsequentResultNode: SubsequentResultNode,
35-
): subsequentResultNode is StreamRecord {
36-
return 'path' in subsequentResultNode;
33+
record: SubsequentResultNode | IncrementalDataRecord,
34+
): record is StreamRecord {
35+
return 'streamItemRecords' in record;
3736
}
3837

3938
type SubsequentResultNode = DeferredFragmentNode | StreamRecord;
@@ -71,7 +70,7 @@ export class IncrementalGraph {
7170
if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
7271
this._addDeferredGroupedFieldSetRecord(incrementalDataRecord);
7372
} else {
74-
this._addStreamItemsRecord(incrementalDataRecord);
73+
this._addStreamRecord(incrementalDataRecord);
7574
}
7675
}
7776
}
@@ -101,6 +100,7 @@ export class IncrementalGraph {
101100
if (isStreamNode(node)) {
102101
this._pending.add(node);
103102
newPending.add(node);
103+
this._newIncrementalDataRecords.add(node);
104104
} else if (node.deferredGroupedFieldSetRecords.size > 0) {
105105
for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) {
106106
this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode);
@@ -116,22 +116,12 @@ export class IncrementalGraph {
116116
this._newPending.clear();
117117

118118
for (const incrementalDataRecord of this._newIncrementalDataRecords) {
119-
if (isStreamItemsRecord(incrementalDataRecord)) {
120-
const result = incrementalDataRecord.streamItemsResult.value;
121-
if (isPromise(result)) {
122-
// eslint-disable-next-line @typescript-eslint/no-floating-promises
123-
result.then((resolved) =>
124-
this._enqueue({
125-
streamItemsRecord: incrementalDataRecord,
126-
streamItemsResult: resolved,
127-
}),
128-
);
129-
} else {
130-
this._enqueue({
131-
streamItemsRecord: incrementalDataRecord,
132-
streamItemsResult: result,
133-
});
134-
}
119+
if (isStreamNode(incrementalDataRecord)) {
120+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
121+
this._onStreamItems(
122+
incrementalDataRecord,
123+
incrementalDataRecord.streamItemRecords,
124+
);
135125
} else {
136126
const result =
137127
incrementalDataRecord.deferredGroupedFieldSetResult.value;
@@ -274,12 +264,8 @@ export class IncrementalGraph {
274264
}
275265
}
276266

277-
private _addStreamItemsRecord(streamItemsRecord: StreamItemsRecord): void {
278-
const streamRecord = streamItemsRecord.streamRecord;
279-
if (!this._pending.has(streamRecord)) {
280-
this._newPending.add(streamRecord);
281-
}
282-
this._newIncrementalDataRecords.add(streamItemsRecord);
267+
private _addStreamRecord(streamRecord: StreamRecord): void {
268+
this._newPending.add(streamRecord);
283269
}
284270

285271
private _addDeferredFragmentNode(
@@ -311,6 +297,71 @@ export class IncrementalGraph {
311297
return deferredFragmentNode;
312298
}
313299

300+
private async _onStreamItems(
301+
streamRecord: StreamRecord,
302+
streamItemRecords: Array<StreamItemRecord>,
303+
): Promise<void> {
304+
let items: Array<unknown> = [];
305+
let errors: Array<GraphQLError> = [];
306+
let incrementalDataRecords: Array<IncrementalDataRecord> = [];
307+
let streamItemRecord: StreamItemRecord | undefined;
308+
while ((streamItemRecord = streamItemRecords.shift()) !== undefined) {
309+
let result = streamItemRecord.value;
310+
if (isPromise(result)) {
311+
if (items.length > 0) {
312+
this._enqueue({
313+
streamRecord,
314+
streamItemsResult: {
315+
result:
316+
// TODO add additional test case or rework for coverage
317+
errors.length > 0 /* c8 ignore start */
318+
? { items, errors } /* c8 ignore stop */
319+
: { items },
320+
incrementalDataRecords,
321+
},
322+
});
323+
items = [];
324+
errors = [];
325+
incrementalDataRecords = [];
326+
}
327+
// eslint-disable-next-line no-await-in-loop
328+
result = await result;
329+
// wait an additional tick to coalesce resolving additional promises
330+
// within the queue
331+
// eslint-disable-next-line no-await-in-loop
332+
await Promise.resolve();
333+
}
334+
if (result.item === undefined) {
335+
if (items.length > 0) {
336+
this._enqueue({
337+
streamRecord,
338+
streamItemsResult: {
339+
result: errors.length > 0 ? { items, errors } : { items },
340+
incrementalDataRecords,
341+
},
342+
});
343+
}
344+
this._enqueue({
345+
streamRecord,
346+
streamItemsResult:
347+
result.errors === undefined
348+
? {}
349+
: {
350+
errors: result.errors,
351+
},
352+
});
353+
return;
354+
}
355+
items.push(result.item);
356+
if (result.errors !== undefined) {
357+
errors.push(...result.errors);
358+
}
359+
if (result.incrementalDataRecords !== undefined) {
360+
incrementalDataRecords.push(...result.incrementalDataRecords);
361+
}
362+
}
363+
}
364+
314365
private *_yieldCurrentCompletedIncrementalData(
315366
first: CompletedIncrementalData,
316367
): Generator<CompletedIncrementalData> {

src/execution/IncrementalPublisher.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,9 +298,8 @@ class IncrementalPublisher {
298298
completedStreamItems: CompletedStreamItems,
299299
context: SubsequentIncrementalExecutionResultContext,
300300
): void {
301-
const { streamItemsRecord, streamItemsResult } = completedStreamItems;
302-
const streamRecord = streamItemsRecord.streamRecord;
303-
const id = streamItemsRecord.streamRecord.id;
301+
const { streamRecord, streamItemsResult } = completedStreamItems;
302+
const id = streamRecord.id;
304303
invariant(id !== undefined);
305304
if (streamItemsResult.errors !== undefined) {
306305
this._incrementalGraph.removeStream(streamRecord);

0 commit comments

Comments
 (0)