Skip to content

Commit 0399963

Browse files
committed
publisher: iterate only through ready items
1 parent 5e1fe3b commit 0399963

File tree

3 files changed

+54
-41
lines changed

3 files changed

+54
-41
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1286,6 +1286,10 @@ describe('Execute: stream directive', () => {
12861286
},
12871287
{
12881288
incremental: [
1289+
{
1290+
items: [{ name: 'Luke' }],
1291+
path: ['nestedObject', 'nestedFriendList', 0],
1292+
},
12891293
{
12901294
data: { scalarField: null },
12911295
path: ['otherNestedObject'],
@@ -1297,11 +1301,10 @@ describe('Execute: stream directive', () => {
12971301
},
12981302
],
12991303
},
1300-
{
1301-
items: [{ name: 'Luke' }],
1302-
path: ['nestedObject', 'nestedFriendList', 0],
1303-
},
13041304
],
1305+
hasNext: true,
1306+
},
1307+
{
13051308
hasNext: false,
13061309
},
13071310
]);

src/execution/execute.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2113,20 +2113,16 @@ function filterSubsequentPayloads(
21132113
}
21142114

21152115
function getIncrementalResult(
2116-
pending: ReadonlySet<IncrementalDataRecord>,
2116+
completedRecords: ReadonlySet<IncrementalDataRecord>,
21172117
publisher: Publisher<
21182118
IncrementalDataRecord,
21192119
SubsequentIncrementalExecutionResult
21202120
>,
21212121
): SubsequentIncrementalExecutionResult | undefined {
21222122
const incrementalResults: Array<IncrementalResult> = [];
21232123
let encounteredCompletedAsyncIterator = false;
2124-
for (const incrementalDataRecord of pending) {
2124+
for (const incrementalDataRecord of completedRecords) {
21252125
const incrementalResult: IncrementalResult = {};
2126-
if (!incrementalDataRecord.isCompleted) {
2127-
continue;
2128-
}
2129-
publisher.delete(incrementalDataRecord);
21302126
if (isStreamItemsRecord(incrementalDataRecord)) {
21312127
const items = incrementalDataRecord.items;
21322128
if (incrementalDataRecord.isCompletedAsyncIterator) {
@@ -2149,7 +2145,6 @@ function getIncrementalResult(
21492145
}
21502146
incrementalResults.push(incrementalResult);
21512147
}
2152-
21532148
return incrementalResults.length
21542149
? { incremental: incrementalResults, hasNext: publisher.hasNext() }
21552150
: encounteredCompletedAsyncIterator && !publisher.hasNext()
@@ -2195,7 +2190,7 @@ class DeferredFragmentRecord {
21952190
this.parentContext = opts.parentContext;
21962191
this.errors = [];
21972192
this._exeContext = opts.exeContext;
2198-
this._exeContext.publisher.add(this);
2193+
this._exeContext.publisher.introduce(this);
21992194
this.isCompleted = false;
22002195
this.data = null;
22012196
this.promise = new Promise<ObjMap<unknown> | null>((resolve) => {
@@ -2205,6 +2200,7 @@ class DeferredFragmentRecord {
22052200
}).then((data) => {
22062201
this.data = data;
22072202
this.isCompleted = true;
2203+
this._exeContext.publisher.release(this);
22082204
});
22092205
}
22102206

@@ -2246,7 +2242,7 @@ class StreamItemsRecord {
22462242
this.asyncIterator = opts.asyncIterator;
22472243
this.errors = [];
22482244
this._exeContext = opts.exeContext;
2249-
this._exeContext.publisher.add(this);
2245+
this._exeContext.publisher.introduce(this);
22502246
this.isCompleted = false;
22512247
this.items = null;
22522248
this.promise = new Promise<Array<unknown> | null>((resolve) => {
@@ -2256,6 +2252,7 @@ class StreamItemsRecord {
22562252
}).then((items) => {
22572253
this.items = items;
22582254
this.isCompleted = true;
2255+
this._exeContext.publisher.release(this);
22592256
});
22602257
}
22612258

src/jsutils/Publisher.ts

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
interface ContainsPromise {
2-
promise: Promise<void>;
3-
}
4-
51
/** @internal */
6-
export class Publisher<I extends ContainsPromise, R> {
2+
export class Publisher<I, R> {
3+
_unreleased: Set<I>;
4+
_released: Set<I>;
75
_pending: Set<I>;
86
_update: (completed: Set<I>, publisher: Publisher<I, R>) => R | undefined;
97
_onAbruptClose: (pending: ReadonlySet<I>) => Promise<void>;
@@ -19,6 +17,8 @@ export class Publisher<I extends ContainsPromise, R> {
1917
) => R | undefined,
2018
onAbruptClose: (pending: ReadonlySet<I>) => Promise<void>,
2119
) {
20+
this._unreleased = new Set();
21+
this._released = new Set();
2222
this._pending = new Set();
2323
this._update = update;
2424
this._onAbruptClose = onAbruptClose;
@@ -42,11 +42,22 @@ export class Publisher<I extends ContainsPromise, R> {
4242
return this._pending.size > 0;
4343
}
4444

45-
add(item: I) {
45+
introduce(item: I) {
46+
this._unreleased.add(item);
4647
this._pending.add(item);
4748
}
4849

50+
release(item: I): void {
51+
if (this._pending.has(item)) {
52+
this._unreleased.delete(item);
53+
this._released.add(item);
54+
this._trigger();
55+
}
56+
}
57+
4958
delete(item: I) {
59+
this._unreleased.delete(item);
60+
this._released.delete(item);
5061
this._pending.delete(item);
5162
this._trigger();
5263
}
@@ -55,29 +66,31 @@ export class Publisher<I extends ContainsPromise, R> {
5566
let isDone = false;
5667

5768
const _next = async (): Promise<IteratorResult<R, void>> => {
58-
if (isDone) {
59-
return { value: undefined, done: true };
60-
}
61-
62-
await Promise.race(Array.from(this._pending).map((item) => item.promise));
63-
64-
if (isDone) {
65-
// a different call to next has exhausted all payloads
66-
return { value: undefined, done: true };
69+
// eslint-disable-next-line no-constant-condition
70+
while (true) {
71+
if (isDone) {
72+
return { value: undefined, done: true };
73+
}
74+
75+
for (const item of this._released) {
76+
this._pending.delete(item);
77+
}
78+
const released = this._released;
79+
this._released = new Set();
80+
81+
const result = this._update(released, this);
82+
83+
if (!this.hasNext()) {
84+
isDone = true;
85+
}
86+
87+
if (result !== undefined) {
88+
return { value: result, done: false };
89+
}
90+
91+
// eslint-disable-next-line no-await-in-loop
92+
await this._signalled;
6793
}
68-
69-
const result = this._update(this._pending, this);
70-
const hasNext = this._pending.size > 0;
71-
72-
if (result === undefined && hasNext) {
73-
return _next();
74-
}
75-
76-
if (!hasNext) {
77-
isDone = true;
78-
}
79-
80-
return { value: result as R, done: false };
8194
};
8295

8396
const _return = async (): Promise<IteratorResult<R, void>> => {

0 commit comments

Comments
 (0)