Skip to content

Commit fb36bff

Browse files
committed
introduce new Publisher for incremental delivery
Depends on #3784 The proposed new Publisher: 1. does not use the event loop to manage AsyncRecord dependencies 2. uses separate sets to store pending vs ready AsyncRecords 3. does not use `Promise.race` ## No event loop for managing AsyncRecord dependencies The current Publisher wraps every AsyncRecord result in a promise that only resolves after the parent AsyncRecord resolves. If multiple items within a stream are released for publishing because their parent has just been published, the stream cannot be in its entirety until after all of these promises unwind. The new Publisher keeps track of dependencies manually. When an AsyncRecord is pushed by the publisher, all of its dependent AsyncRecords are synchronously pushed, repeating as necessary, without using the event loop. ## Separate sets for pending vs ready AsyncRecords The current publisher inspects all pending AsyncRecords whenever any of them resolves. All that are completed are added to the response. The new Publisher moves AsyncRecords from the pending set to the ready set as they are pushed, so that on each call to next, only the ready set must be iterated. As a side-effect of this change, the incremental array is ordered by which items are ready for delivery first, and not by the initial document. This seems like a worthwhile tradeoff, and is still adherent to the spec, as far as I can tell. ## No `Promise.race` The old Publisher uses `Promise.race` as the trigger to determine whether payloads are ready. The new Publisher uses a single triggering promise that is triggered whenever an AsyncRecord is pushed, and then reset. This may be beneficial as the implementation of `Promise.race` within V8 has a known memory leak for long-running promises. (see https://bugs.chromium.org/p/v8/issues/detail?id=9858). An alternative would be to utilize @brainkim 's memory-safe version detailed in that issue.
1 parent dce5f9d commit fb36bff

File tree

4 files changed

+173
-154
lines changed

4 files changed

+173
-154
lines changed

src/execution/__tests__/defer-test.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -605,11 +605,6 @@ describe('Execute: defer directive', () => {
605605
data: { slowField: 'slow', friends: [{}, {}, {}] },
606606
path: ['hero'],
607607
},
608-
],
609-
hasNext: true,
610-
},
611-
{
612-
incremental: [
613608
{ data: { name: 'Han' }, path: ['hero', 'friends', 0] },
614609
{ data: { name: 'Leia' }, path: ['hero', 'friends', 1] },
615610
{ data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] },
@@ -653,11 +648,6 @@ describe('Execute: defer directive', () => {
653648
},
654649
path: ['hero'],
655650
},
656-
],
657-
hasNext: true,
658-
},
659-
{
660-
incremental: [
661651
{ data: { name: 'Han' }, path: ['hero', 'friends', 0] },
662652
{ data: { name: 'Leia' }, path: ['hero', 'friends', 1] },
663653
{ data: { name: 'C-3PO' }, path: ['hero', 'friends', 2] },

src/execution/__tests__/stream-test.ts

Lines changed: 21 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,10 @@ describe('Execute: stream directive', () => {
151151
hasNext: true,
152152
},
153153
{
154-
incremental: [{ items: ['banana'], path: ['scalarList', 1] }],
155-
hasNext: true,
156-
},
157-
{
158-
incremental: [{ items: ['coconut'], path: ['scalarList', 2] }],
154+
incremental: [
155+
{ items: ['banana'], path: ['scalarList', 1] },
156+
{ items: ['coconut'], path: ['scalarList', 2] },
157+
],
159158
hasNext: false,
160159
},
161160
]);
@@ -173,15 +172,11 @@ describe('Execute: stream directive', () => {
173172
hasNext: true,
174173
},
175174
{
176-
incremental: [{ items: ['apple'], path: ['scalarList', 0] }],
177-
hasNext: true,
178-
},
179-
{
180-
incremental: [{ items: ['banana'], path: ['scalarList', 1] }],
181-
hasNext: true,
182-
},
183-
{
184-
incremental: [{ items: ['coconut'], path: ['scalarList', 2] }],
175+
incremental: [
176+
{ items: ['apple'], path: ['scalarList', 0] },
177+
{ items: ['banana'], path: ['scalarList', 1] },
178+
{ items: ['coconut'], path: ['scalarList', 2] },
179+
],
185180
hasNext: false,
186181
},
187182
]);
@@ -230,11 +225,6 @@ describe('Execute: stream directive', () => {
230225
path: ['scalarList', 1],
231226
label: 'scalar-stream',
232227
},
233-
],
234-
hasNext: true,
235-
},
236-
{
237-
incremental: [
238228
{
239229
items: ['coconut'],
240230
path: ['scalarList', 2],
@@ -296,11 +286,6 @@ describe('Execute: stream directive', () => {
296286
items: [['banana', 'banana', 'banana']],
297287
path: ['scalarListList', 1],
298288
},
299-
],
300-
hasNext: true,
301-
},
302-
{
303-
incremental: [
304289
{
305290
items: [['coconut', 'coconut', 'coconut']],
306291
path: ['scalarListList', 2],
@@ -379,20 +364,10 @@ describe('Execute: stream directive', () => {
379364
items: [{ name: 'Luke', id: '1' }],
380365
path: ['friendList', 0],
381366
},
382-
],
383-
hasNext: true,
384-
},
385-
{
386-
incremental: [
387367
{
388368
items: [{ name: 'Han', id: '2' }],
389369
path: ['friendList', 1],
390370
},
391-
],
392-
hasNext: true,
393-
},
394-
{
395-
incremental: [
396371
{
397372
items: [{ name: 'Leia', id: '3' }],
398373
path: ['friendList', 2],
@@ -628,9 +603,6 @@ describe('Execute: stream directive', () => {
628603
path: ['friendList', 2],
629604
},
630605
],
631-
hasNext: true,
632-
},
633-
{
634606
hasNext: false,
635607
},
636608
]);
@@ -670,7 +642,7 @@ describe('Execute: stream directive', () => {
670642
}
671643
}
672644
`);
673-
const result = await completeAsync(document, 3, {
645+
const result = await completeAsync(document, 2, {
674646
async *friendList() {
675647
yield await Promise.resolve(friends[0]);
676648
yield await Promise.resolve(friends[1]);
@@ -699,10 +671,9 @@ describe('Execute: stream directive', () => {
699671
path: ['friendList', 2],
700672
},
701673
],
702-
hasNext: true,
674+
hasNext: false,
703675
},
704676
},
705-
{ done: false, value: { hasNext: false } },
706677
{ done: true, value: undefined },
707678
]);
708679
});
@@ -930,11 +901,6 @@ describe('Execute: stream directive', () => {
930901
},
931902
],
932903
},
933-
],
934-
hasNext: true,
935-
},
936-
{
937-
incremental: [
938904
{
939905
items: [{ nonNullName: 'Han' }],
940906
path: ['friendList', 2],
@@ -1107,11 +1073,6 @@ describe('Execute: stream directive', () => {
11071073
},
11081074
],
11091075
},
1110-
],
1111-
hasNext: true,
1112-
},
1113-
{
1114-
incremental: [
11151076
{
11161077
items: [{ nonNullName: 'Han' }],
11171078
path: ['friendList', 2],
@@ -1276,6 +1237,10 @@ describe('Execute: stream directive', () => {
12761237
},
12771238
{
12781239
incremental: [
1240+
{
1241+
items: [{ name: 'Luke' }],
1242+
path: ['nestedObject', 'nestedFriendList', 0],
1243+
},
12791244
{
12801245
data: { scalarField: null },
12811246
path: ['otherNestedObject'],
@@ -1287,10 +1252,6 @@ describe('Execute: stream directive', () => {
12871252
},
12881253
],
12891254
},
1290-
{
1291-
items: [{ name: 'Luke' }],
1292-
path: ['nestedObject', 'nestedFriendList', 0],
1293-
},
12941255
],
12951256
hasNext: false,
12961257
},
@@ -1397,9 +1358,6 @@ describe('Execute: stream directive', () => {
13971358
],
13981359
},
13991360
],
1400-
hasNext: true,
1401-
},
1402-
{
14031361
hasNext: false,
14041362
},
14051363
]);
@@ -1546,9 +1504,6 @@ describe('Execute: stream directive', () => {
15461504
path: ['friendList', 2],
15471505
},
15481506
],
1549-
hasNext: true,
1550-
},
1551-
{
15521507
hasNext: false,
15531508
},
15541509
]);
@@ -1602,15 +1557,6 @@ describe('Execute: stream directive', () => {
16021557
data: { scalarField: 'slow', nestedFriendList: [] },
16031558
path: ['nestedObject'],
16041559
},
1605-
],
1606-
hasNext: true,
1607-
},
1608-
done: false,
1609-
});
1610-
const result3 = await iterator.next();
1611-
expectJSON(result3).toDeepEqual({
1612-
value: {
1613-
incremental: [
16141560
{
16151561
items: [{ name: 'Luke' }],
16161562
path: ['nestedObject', 'nestedFriendList', 0],
@@ -1620,8 +1566,8 @@ describe('Execute: stream directive', () => {
16201566
},
16211567
done: false,
16221568
});
1623-
const result4 = await iterator.next();
1624-
expectJSON(result4).toDeepEqual({
1569+
const result3 = await iterator.next();
1570+
expectJSON(result3).toDeepEqual({
16251571
value: {
16261572
incremental: [
16271573
{
@@ -1633,13 +1579,13 @@ describe('Execute: stream directive', () => {
16331579
},
16341580
done: false,
16351581
});
1636-
const result5 = await iterator.next();
1637-
expectJSON(result5).toDeepEqual({
1582+
const result4 = await iterator.next();
1583+
expectJSON(result4).toDeepEqual({
16381584
value: { hasNext: false },
16391585
done: false,
16401586
});
1641-
const result6 = await iterator.next();
1642-
expectJSON(result6).toDeepEqual({
1587+
const result5 = await iterator.next();
1588+
expectJSON(result5).toDeepEqual({
16431589
value: undefined,
16441590
done: true,
16451591
});

src/execution/execute.ts

Lines changed: 20 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2095,12 +2095,14 @@ class DeferredFragmentRecord {
20952095
errors: Array<GraphQLError>;
20962096
label: string | undefined;
20972097
path: Array<string | number>;
2098-
promise: Promise<void>;
20992098
data: ObjMap<unknown> | null;
21002099
parentContext: AsyncPayloadRecord | undefined;
2101-
isCompleted: boolean;
2102-
_exeContext: ExecutionContext;
2103-
_resolve?: (arg: PromiseOrValue<ObjMap<unknown> | null>) => void;
2100+
_publisher: Publisher<
2101+
AsyncPayloadRecord,
2102+
IncrementalResult,
2103+
SubsequentIncrementalExecutionResult
2104+
>;
2105+
21042106
constructor(opts: {
21052107
label: string | undefined;
21062108
path: Path | undefined;
@@ -2112,27 +2114,14 @@ class DeferredFragmentRecord {
21122114
this.path = pathToArray(opts.path);
21132115
this.parentContext = opts.parentContext;
21142116
this.errors = [];
2115-
this._exeContext = opts.exeContext;
2116-
this._exeContext.publisher.add(this);
2117-
this.isCompleted = false;
2117+
this._publisher = opts.exeContext.publisher;
2118+
this._publisher.add(this);
21182119
this.data = null;
2119-
this.promise = new Promise<ObjMap<unknown> | null>((resolve) => {
2120-
this._resolve = (promiseOrValue) => {
2121-
resolve(promiseOrValue);
2122-
};
2123-
}).then((data) => {
2124-
this.data = data;
2125-
this.isCompleted = true;
2126-
});
21272120
}
21282121

21292122
addData(data: ObjMap<unknown> | null) {
2130-
const parentData = this.parentContext?.promise;
2131-
if (parentData) {
2132-
this._resolve?.(parentData.then(() => data));
2133-
return;
2134-
}
2135-
this._resolve?.(data);
2123+
this.data = data;
2124+
this._publisher.complete(this);
21362125
}
21372126
}
21382127

@@ -2142,13 +2131,15 @@ class StreamRecord {
21422131
label: string | undefined;
21432132
path: Array<string | number>;
21442133
items: Array<unknown> | null;
2145-
promise: Promise<void>;
21462134
parentContext: AsyncPayloadRecord | undefined;
21472135
iterator: AsyncIterator<unknown> | undefined;
21482136
isCompletedIterator?: boolean;
2149-
isCompleted: boolean;
2150-
_exeContext: ExecutionContext;
2151-
_resolve?: (arg: PromiseOrValue<Array<unknown> | null>) => void;
2137+
_publisher: Publisher<
2138+
AsyncPayloadRecord,
2139+
IncrementalResult,
2140+
SubsequentIncrementalExecutionResult
2141+
>;
2142+
21522143
constructor(opts: {
21532144
label: string | undefined;
21542145
path: Path | undefined;
@@ -2163,27 +2154,14 @@ class StreamRecord {
21632154
this.parentContext = opts.parentContext;
21642155
this.iterator = opts.iterator;
21652156
this.errors = [];
2166-
this._exeContext = opts.exeContext;
2167-
this._exeContext.publisher.add(this);
2168-
this.isCompleted = false;
2157+
this._publisher = opts.exeContext.publisher;
2158+
this._publisher.add(this);
21692159
this.items = null;
2170-
this.promise = new Promise<Array<unknown> | null>((resolve) => {
2171-
this._resolve = (promiseOrValue) => {
2172-
resolve(promiseOrValue);
2173-
};
2174-
}).then((items) => {
2175-
this.items = items;
2176-
this.isCompleted = true;
2177-
});
21782160
}
21792161

21802162
addItems(items: Array<unknown> | null) {
2181-
const parentData = this.parentContext?.promise;
2182-
if (parentData) {
2183-
this._resolve?.(parentData.then(() => items));
2184-
return;
2185-
}
2186-
this._resolve?.(items);
2163+
this.items = items;
2164+
this._publisher.complete(this);
21872165
}
21882166

21892167
setIsCompletedIterator() {

0 commit comments

Comments
 (0)