Skip to content

Commit 238f862

Browse files
committed
introduce IncrementalGraph class to manage the graph of pending subsequent results
1 parent 06bb157 commit 238f862

File tree

7 files changed

+843
-585
lines changed

7 files changed

+843
-585
lines changed

src/execution/IncrementalGraph.ts

Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
import { isPromise } from '../jsutils/isPromise.js';
2+
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';
3+
4+
import type {
5+
CompletedDeferredGroupedFieldSet,
6+
CompletedIncrementalData,
7+
CompletedReconcilableDeferredGroupedFieldSet,
8+
DeferredFragmentRecord,
9+
DeferredGroupedFieldSetRecord,
10+
IncrementalDataRecord,
11+
StreamItemsRecord,
12+
StreamRecord,
13+
SubsequentResultRecord,
14+
} from './types.js';
15+
import {
16+
isDeferredGroupedFieldSetRecord,
17+
isStreamItemsRecord,
18+
} from './types.js';
19+
20+
interface DeferredFragmentNode {
21+
deferredFragmentRecord: DeferredFragmentRecord;
22+
deferredGroupedFieldSetRecords: Set<DeferredGroupedFieldSetRecord>;
23+
completedReconcilableDeferredGroupedFieldSets: Set<CompletedReconcilableDeferredGroupedFieldSet>;
24+
children: Array<DeferredFragmentNode>;
25+
}
26+
27+
function isDeferredFragmentNode(
28+
node: DeferredFragmentNode | undefined,
29+
): node is DeferredFragmentNode {
30+
return node !== undefined;
31+
}
32+
33+
function isStreamNode(
34+
subsequentResultNode: SubsequentResultNode,
35+
): subsequentResultNode is StreamRecord {
36+
return 'path' in subsequentResultNode;
37+
}
38+
39+
type SubsequentResultNode = DeferredFragmentNode | StreamRecord;
40+
41+
/**
42+
* @internal
43+
*/
44+
export class IncrementalGraph {
45+
private _pending: Set<SubsequentResultNode>;
46+
private _newPending: Set<SubsequentResultNode>;
47+
private _newIncrementalDataRecords: Set<IncrementalDataRecord>;
48+
private _deferredFragmentNodes: Map<
49+
DeferredFragmentRecord,
50+
DeferredFragmentNode
51+
>;
52+
53+
private _completedQueue: Array<CompletedIncrementalData>;
54+
private _nextQueue: Array<
55+
(iterable: IteratorResult<Iterable<CompletedIncrementalData>>) => void
56+
>;
57+
58+
constructor() {
59+
this._pending = new Set();
60+
this._newIncrementalDataRecords = new Set();
61+
this._newPending = new Set();
62+
this._deferredFragmentNodes = new Map();
63+
this._completedQueue = [];
64+
this._nextQueue = [];
65+
}
66+
67+
addIncrementalDataRecords(
68+
incrementalDataRecords: ReadonlyArray<IncrementalDataRecord>,
69+
): void {
70+
for (const incrementalDataRecord of incrementalDataRecords) {
71+
if (isDeferredGroupedFieldSetRecord(incrementalDataRecord)) {
72+
this._addDeferredGroupedFieldSetRecord(incrementalDataRecord);
73+
} else {
74+
this._addStreamItemsRecord(incrementalDataRecord);
75+
}
76+
}
77+
}
78+
79+
addCompletedReconcilableDeferredGroupedFieldSet(
80+
completedDeferredGroupedFieldSet: CompletedReconcilableDeferredGroupedFieldSet,
81+
): void {
82+
const deferredFragmentNodes: Array<DeferredFragmentNode> =
83+
completedDeferredGroupedFieldSet.deferredGroupedFieldSetRecord.deferredFragmentRecords
84+
.map((deferredFragmentRecord) =>
85+
this._deferredFragmentNodes.get(deferredFragmentRecord),
86+
)
87+
.filter<DeferredFragmentNode>(isDeferredFragmentNode);
88+
for (const deferredFragmentNode of deferredFragmentNodes) {
89+
deferredFragmentNode.deferredGroupedFieldSetRecords.delete(
90+
completedDeferredGroupedFieldSet.deferredGroupedFieldSetRecord,
91+
);
92+
deferredFragmentNode.completedReconcilableDeferredGroupedFieldSets.add(
93+
completedDeferredGroupedFieldSet,
94+
);
95+
}
96+
}
97+
98+
getNewPending(): Set<SubsequentResultRecord> {
99+
const newPending = new Set<SubsequentResultRecord>();
100+
for (const node of this._newPending) {
101+
if (isStreamNode(node)) {
102+
this._pending.add(node);
103+
newPending.add(node);
104+
} else if (node.deferredGroupedFieldSetRecords.size > 0) {
105+
for (const deferredGroupedFieldSetNode of node.deferredGroupedFieldSetRecords) {
106+
this._newIncrementalDataRecords.add(deferredGroupedFieldSetNode);
107+
}
108+
this._pending.add(node);
109+
newPending.add(node.deferredFragmentRecord);
110+
} else {
111+
for (const child of node.children) {
112+
this._newPending.add(child);
113+
}
114+
}
115+
}
116+
this._newPending.clear();
117+
118+
for (const incrementalDataRecord of this._newIncrementalDataRecords) {
119+
if (isStreamItemsRecord(incrementalDataRecord)) {
120+
const result = incrementalDataRecord.streamItemsResult.value;
121+
if (isPromise(result)) {
122+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
123+
result.then((resolved) =>
124+
this._enqueue({
125+
streamItemsRecord: incrementalDataRecord,
126+
streamItemsResult: resolved,
127+
}),
128+
);
129+
} else {
130+
this._enqueue({
131+
streamItemsRecord: incrementalDataRecord,
132+
streamItemsResult: result,
133+
});
134+
}
135+
} else {
136+
const result =
137+
incrementalDataRecord.deferredGroupedFieldSetResult.value;
138+
if (isPromise(result)) {
139+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
140+
result.then((resolved) =>
141+
this._enqueue({
142+
deferredGroupedFieldSetRecord: incrementalDataRecord,
143+
deferredGroupedFieldSetResult: resolved,
144+
} as CompletedDeferredGroupedFieldSet),
145+
);
146+
} else {
147+
this._enqueue({
148+
deferredGroupedFieldSetRecord: incrementalDataRecord,
149+
deferredGroupedFieldSetResult: result,
150+
} as CompletedDeferredGroupedFieldSet);
151+
}
152+
}
153+
}
154+
this._newIncrementalDataRecords.clear();
155+
156+
return newPending;
157+
}
158+
159+
completedIncrementalData() {
160+
return {
161+
[Symbol.asyncIterator]() {
162+
return this;
163+
},
164+
next: (): Promise<IteratorResult<Iterable<CompletedIncrementalData>>> => {
165+
const firstResult = this._completedQueue.shift();
166+
if (firstResult !== undefined) {
167+
return Promise.resolve({
168+
value: this._yieldCurrentCompletedIncrementalData(firstResult),
169+
done: false,
170+
});
171+
}
172+
const { promise, resolve } =
173+
promiseWithResolvers<
174+
IteratorResult<Iterable<CompletedIncrementalData>>
175+
>();
176+
this._nextQueue.push(resolve);
177+
return promise;
178+
},
179+
return: (): Promise<
180+
IteratorResult<Iterable<CompletedIncrementalData>>
181+
> => {
182+
for (const resolve of this._nextQueue) {
183+
resolve({ value: undefined, done: true });
184+
}
185+
return Promise.resolve({ value: undefined, done: true });
186+
},
187+
};
188+
}
189+
190+
hasNext(): boolean {
191+
return this._pending.size > 0;
192+
}
193+
194+
completeDeferredFragment(
195+
deferredFragmentRecord: DeferredFragmentRecord,
196+
): Array<CompletedReconcilableDeferredGroupedFieldSet> | undefined {
197+
const deferredFragmentNode = this._deferredFragmentNodes.get(
198+
deferredFragmentRecord,
199+
);
200+
// TODO: add test case?
201+
/* c8 ignore next 3 */
202+
if (deferredFragmentNode === undefined) {
203+
return undefined;
204+
}
205+
const {
206+
deferredGroupedFieldSetRecords,
207+
completedReconcilableDeferredGroupedFieldSets,
208+
} = deferredFragmentNode;
209+
if (deferredGroupedFieldSetRecords.size > 0) {
210+
return undefined;
211+
}
212+
const results = Array.from(completedReconcilableDeferredGroupedFieldSets);
213+
for (const completedReconcilableDeferredGroupedFieldSet of completedReconcilableDeferredGroupedFieldSets) {
214+
for (const otherDeferredFragmentRecord of completedReconcilableDeferredGroupedFieldSet
215+
.deferredGroupedFieldSetRecord.deferredFragmentRecords) {
216+
const otherDeferredFragmentNode = this._deferredFragmentNodes.get(
217+
otherDeferredFragmentRecord,
218+
);
219+
if (otherDeferredFragmentNode === undefined) {
220+
continue;
221+
}
222+
otherDeferredFragmentNode.completedReconcilableDeferredGroupedFieldSets.delete(
223+
completedReconcilableDeferredGroupedFieldSet,
224+
);
225+
}
226+
}
227+
for (const child of deferredFragmentNode.children) {
228+
const childNode = this._addDeferredFragmentNode(
229+
child.deferredFragmentRecord,
230+
);
231+
this._newPending.add(childNode);
232+
}
233+
this._pending.delete(deferredFragmentNode);
234+
this._deferredFragmentNodes.delete(deferredFragmentRecord);
235+
return results;
236+
}
237+
238+
removeDeferredFragment(
239+
deferredFragmentRecord: DeferredFragmentRecord,
240+
): boolean {
241+
const deferredFragmentNode = this._deferredFragmentNodes.get(
242+
deferredFragmentRecord,
243+
);
244+
if (deferredFragmentNode === undefined) {
245+
return false;
246+
}
247+
this._pending.delete(deferredFragmentNode);
248+
this._deferredFragmentNodes.delete(deferredFragmentRecord);
249+
// TODO: add test case for an erroring deferred fragment with child defers
250+
/* c8 ignore next 3 */
251+
for (const child of deferredFragmentNode.children) {
252+
this.removeDeferredFragment(child.deferredFragmentRecord);
253+
}
254+
return true;
255+
}
256+
257+
removeStream(streamRecord: StreamRecord): void {
258+
this._pending.delete(streamRecord);
259+
}
260+
261+
private _addDeferredGroupedFieldSetRecord(
262+
deferredGroupedFieldSetRecord: DeferredGroupedFieldSetRecord,
263+
): void {
264+
for (const deferredFragmentRecord of deferredGroupedFieldSetRecord.deferredFragmentRecords) {
265+
const deferredFragmentNode = this._addDeferredFragmentNode(
266+
deferredFragmentRecord,
267+
);
268+
if (this._pending.has(deferredFragmentNode)) {
269+
this._newIncrementalDataRecords.add(deferredGroupedFieldSetRecord);
270+
}
271+
deferredFragmentNode.deferredGroupedFieldSetRecords.add(
272+
deferredGroupedFieldSetRecord,
273+
);
274+
}
275+
}
276+
277+
private _addStreamItemsRecord(streamItemsRecord: StreamItemsRecord): void {
278+
const streamRecord = streamItemsRecord.streamRecord;
279+
if (!this._pending.has(streamRecord)) {
280+
this._newPending.add(streamRecord);
281+
}
282+
this._newIncrementalDataRecords.add(streamItemsRecord);
283+
}
284+
285+
private _addDeferredFragmentNode(
286+
deferredFragmentRecord: DeferredFragmentRecord,
287+
): DeferredFragmentNode {
288+
let deferredFragmentNode = this._deferredFragmentNodes.get(
289+
deferredFragmentRecord,
290+
);
291+
if (deferredFragmentNode !== undefined) {
292+
return deferredFragmentNode;
293+
}
294+
deferredFragmentNode = {
295+
deferredFragmentRecord,
296+
deferredGroupedFieldSetRecords: new Set(),
297+
completedReconcilableDeferredGroupedFieldSets: new Set(),
298+
children: [],
299+
};
300+
this._deferredFragmentNodes.set(
301+
deferredFragmentRecord,
302+
deferredFragmentNode,
303+
);
304+
const parent = deferredFragmentRecord.parent;
305+
if (parent === undefined) {
306+
this._newPending.add(deferredFragmentNode);
307+
return deferredFragmentNode;
308+
}
309+
const parentNode = this._addDeferredFragmentNode(parent);
310+
parentNode.children.push(deferredFragmentNode);
311+
return deferredFragmentNode;
312+
}
313+
314+
private *_yieldCurrentCompletedIncrementalData(
315+
first: CompletedIncrementalData,
316+
): Generator<CompletedIncrementalData> {
317+
yield first;
318+
let completed;
319+
while ((completed = this._completedQueue.shift()) !== undefined) {
320+
yield completed;
321+
}
322+
}
323+
324+
private _enqueue(completed: CompletedIncrementalData): void {
325+
const next = this._nextQueue.shift();
326+
if (next !== undefined) {
327+
next({
328+
value: this._yieldCurrentCompletedIncrementalData(completed),
329+
done: false,
330+
});
331+
return;
332+
}
333+
this._completedQueue.push(completed);
334+
}
335+
}

0 commit comments

Comments
 (0)