Skip to content

Commit b020207

Browse files
committed
fix(incrementalDelivery) fixes null bubbling for async iterables
when a null bubbles up, no further payloads should be sent.
1 parent 04a94fb commit b020207

File tree

2 files changed

+50
-46
lines changed

2 files changed

+50
-46
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,8 +1056,8 @@ describe('Execute: stream directive', () => {
10561056
nestedObject: {
10571057
nonNullScalarField: () => Promise.resolve(null),
10581058
async *nestedFriendList() {
1059-
yield await Promise.resolve(friends[0]);
1060-
},
1059+
yield await Promise.resolve(friends[0]); /* c8 ignore start */
1060+
} /* c8 ignore stop */,
10611061
},
10621062
});
10631063
expectJSON(result).toDeepEqual({
@@ -1156,9 +1156,6 @@ describe('Execute: stream directive', () => {
11561156
path: ['nestedObject', 'nestedFriendList', 0],
11571157
},
11581158
],
1159-
hasNext: true,
1160-
},
1161-
{
11621159
hasNext: false,
11631160
},
11641161
]);
@@ -1183,8 +1180,8 @@ describe('Execute: stream directive', () => {
11831180
deeperNestedObject: {
11841181
nonNullScalarField: () => Promise.resolve(null),
11851182
async *deeperNestedFriendList() {
1186-
yield await Promise.resolve(friends[0]);
1187-
},
1183+
yield await Promise.resolve(friends[0]); /* c8 ignore start */
1184+
} /* c8 ignore stop */,
11881185
},
11891186
},
11901187
});
@@ -1271,14 +1268,17 @@ describe('Execute: stream directive', () => {
12711268

12721269
it('Returns iterator and ignores errors when stream payloads are filtered', async () => {
12731270
let returned = false;
1274-
let index = 0;
1271+
let requested = false;
12751272
const iterable = {
12761273
[Symbol.asyncIterator]: () => ({
12771274
next: () => {
1278-
const friend = friends[index++];
1279-
if (!friend) {
1280-
return Promise.resolve({ done: true, value: undefined });
1275+
if (requested) {
1276+
/* c8 ignore next 3 */
1277+
// Not reached, iterator should end immediately.
1278+
expect.fail('Not reached');
12811279
}
1280+
requested = true;
1281+
const friend = friends[0];
12821282
return Promise.resolve({
12831283
done: false,
12841284
value: {
@@ -1356,17 +1356,12 @@ describe('Execute: stream directive', () => {
13561356
],
13571357
},
13581358
],
1359-
hasNext: true,
1359+
hasNext: false,
13601360
},
13611361
});
1362-
const result3 = await iterator.next();
1363-
expectJSON(result3).toDeepEqual({
1364-
done: false,
1365-
value: { hasNext: false },
1366-
});
13671362

1368-
const result4 = await iterator.next();
1369-
expectJSON(result4).toDeepEqual({ done: true, value: undefined });
1363+
const result3 = await iterator.next();
1364+
expectJSON(result3).toDeepEqual({ done: true, value: undefined });
13701365

13711366
assert(returned);
13721367
});

src/execution/execute.ts

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2026,43 +2026,52 @@ async function executeStreamIterator(
20262026
exeContext,
20272027
});
20282028

2029-
const dataPromise = executeStreamIteratorItem(
2030-
iterator,
2031-
exeContext,
2032-
fieldNodes,
2033-
info,
2034-
itemType,
2035-
asyncPayloadRecord,
2036-
itemPath,
2037-
);
2038-
2039-
asyncPayloadRecord.addItems(
2040-
dataPromise
2041-
.then(({ value }) => value)
2042-
.then(
2043-
(value) => [value],
2044-
(err) => {
2045-
asyncPayloadRecord.errors.push(err);
2046-
return null;
2047-
},
2048-
),
2049-
);
2029+
let iteration;
20502030
try {
20512031
// eslint-disable-next-line no-await-in-loop
2052-
const { done } = await dataPromise;
2053-
if (done) {
2054-
break;
2055-
}
2056-
} catch (err) {
2057-
// entire stream has errored and bubbled upwards
2032+
iteration = await executeStreamIteratorItem(
2033+
iterator,
2034+
exeContext,
2035+
fieldNodes,
2036+
info,
2037+
itemType,
2038+
asyncPayloadRecord,
2039+
itemPath,
2040+
);
2041+
} catch (error) {
2042+
asyncPayloadRecord.errors.push(error);
20582043
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
2044+
asyncPayloadRecord.addItems(null);
2045+
// entire stream has errored and bubbled upwards
20592046
if (iterator?.return) {
20602047
iterator.return().catch(() => {
20612048
// ignore errors
20622049
});
20632050
}
20642051
return;
20652052
}
2053+
2054+
const { done, value: completedItem } = iteration;
2055+
2056+
let completedItems: PromiseOrValue<Array<unknown> | null>;
2057+
if (isPromise(completedItem)) {
2058+
completedItems = completedItem.then(
2059+
(value) => [value],
2060+
(error) => {
2061+
asyncPayloadRecord.errors.push(error);
2062+
filterSubsequentPayloads(exeContext, path, asyncPayloadRecord);
2063+
return null;
2064+
},
2065+
);
2066+
} else {
2067+
completedItems = [completedItem];
2068+
}
2069+
2070+
asyncPayloadRecord.addItems(completedItems);
2071+
2072+
if (done) {
2073+
break;
2074+
}
20662075
previousAsyncPayloadRecord = asyncPayloadRecord;
20672076
index++;
20682077
}

0 commit comments

Comments
 (0)