Skip to content

Commit 0bc2d10

Browse files
committed
publisher: iterate only through ready items
1 parent b56e737 commit 0bc2d10

File tree

3 files changed

+54
-41
lines changed

3 files changed

+54
-41
lines changed

src/execution/__tests__/stream-test.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1286,6 +1286,10 @@ describe('Execute: stream directive', () => {
12861286
},
12871287
{
12881288
incremental: [
1289+
{
1290+
items: [{ name: 'Luke' }],
1291+
path: ['nestedObject', 'nestedFriendList', 0],
1292+
},
12891293
{
12901294
data: { scalarField: null },
12911295
path: ['otherNestedObject'],
@@ -1297,11 +1301,10 @@ describe('Execute: stream directive', () => {
12971301
},
12981302
],
12991303
},
1300-
{
1301-
items: [{ name: 'Luke' }],
1302-
path: ['nestedObject', 'nestedFriendList', 0],
1303-
},
13041304
],
1305+
hasNext: true,
1306+
},
1307+
{
13051308
hasNext: false,
13061309
},
13071310
]);

src/execution/execute.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2129,20 +2129,16 @@ function filterSubsequentPayloads(
21292129
}
21302130

21312131
function getIncrementalResult(
2132-
pending: ReadonlySet<IncrementalDataRecord>,
2132+
completedRecords: ReadonlySet<IncrementalDataRecord>,
21332133
publisher: Publisher<
21342134
IncrementalDataRecord,
21352135
SubsequentIncrementalExecutionResult
21362136
>,
21372137
): SubsequentIncrementalExecutionResult | undefined {
21382138
const incrementalResults: Array<IncrementalResult> = [];
21392139
let encounteredCompletedAsyncIterator = false;
2140-
for (const incrementalDataRecord of pending) {
2140+
for (const incrementalDataRecord of completedRecords) {
21412141
const incrementalResult: IncrementalResult = {};
2142-
if (!incrementalDataRecord.isCompleted) {
2143-
continue;
2144-
}
2145-
publisher.delete(incrementalDataRecord);
21462142
if (isStreamItemsRecord(incrementalDataRecord)) {
21472143
const items = incrementalDataRecord.items;
21482144
if (incrementalDataRecord.isCompletedAsyncIterator) {
@@ -2165,7 +2161,6 @@ function getIncrementalResult(
21652161
}
21662162
incrementalResults.push(incrementalResult);
21672163
}
2168-
21692164
return incrementalResults.length
21702165
? { incremental: incrementalResults, hasNext: publisher.hasNext() }
21712166
: encounteredCompletedAsyncIterator && !publisher.hasNext()
@@ -2211,7 +2206,7 @@ class DeferredFragmentRecord {
22112206
this.parentContext = opts.parentContext;
22122207
this.errors = [];
22132208
this._exeContext = opts.exeContext;
2214-
this._exeContext.publisher.add(this);
2209+
this._exeContext.publisher.introduce(this);
22152210
this.isCompleted = false;
22162211
this.data = null;
22172212
this.promise = new Promise<ObjMap<unknown> | null>((resolve) => {
@@ -2221,6 +2216,7 @@ class DeferredFragmentRecord {
22212216
}).then((data) => {
22222217
this.data = data;
22232218
this.isCompleted = true;
2219+
this._exeContext.publisher.release(this);
22242220
});
22252221
}
22262222

@@ -2262,7 +2258,7 @@ class StreamItemsRecord {
22622258
this.asyncIterator = opts.asyncIterator;
22632259
this.errors = [];
22642260
this._exeContext = opts.exeContext;
2265-
this._exeContext.publisher.add(this);
2261+
this._exeContext.publisher.introduce(this);
22662262
this.isCompleted = false;
22672263
this.items = null;
22682264
this.promise = new Promise<Array<unknown> | null>((resolve) => {
@@ -2272,6 +2268,7 @@ class StreamItemsRecord {
22722268
}).then((items) => {
22732269
this.items = items;
22742270
this.isCompleted = true;
2271+
this._exeContext.publisher.release(this);
22752272
});
22762273
}
22772274

src/jsutils/Publisher.ts

Lines changed: 41 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
interface ContainsPromise {
2-
promise: Promise<void>;
3-
}
4-
51
/** @internal */
6-
export class Publisher<I extends ContainsPromise, R> {
2+
export class Publisher<I, R> {
3+
_unreleased: Set<I>;
4+
_released: Set<I>;
75
_pending: Set<I>;
86
_update: (completed: Set<I>, publisher: Publisher<I, R>) => R | undefined;
97
_onAbruptClose: (pending: ReadonlySet<I>) => Promise<void>;
@@ -19,6 +17,8 @@ export class Publisher<I extends ContainsPromise, R> {
1917
) => R | undefined,
2018
onAbruptClose: (pending: ReadonlySet<I>) => Promise<void>,
2119
) {
20+
this._unreleased = new Set();
21+
this._released = new Set();
2222
this._pending = new Set();
2323
this._update = update;
2424
this._onAbruptClose = onAbruptClose;
@@ -42,11 +42,22 @@ export class Publisher<I extends ContainsPromise, R> {
4242
return this._pending.size > 0;
4343
}
4444

45-
add(item: I) {
45+
introduce(item: I) {
46+
this._unreleased.add(item);
4647
this._pending.add(item);
4748
}
4849

50+
release(item: I): void {
51+
if (this._pending.has(item)) {
52+
this._unreleased.delete(item);
53+
this._released.add(item);
54+
this._trigger();
55+
}
56+
}
57+
4958
delete(item: I) {
59+
this._unreleased.delete(item);
60+
this._released.delete(item);
5061
this._pending.delete(item);
5162
this._trigger();
5263
}
@@ -55,29 +66,31 @@ export class Publisher<I extends ContainsPromise, R> {
5566
let isDone = false;
5667

5768
const _next = async (): Promise<IteratorResult<R, void>> => {
58-
if (isDone) {
59-
return { value: undefined, done: true };
60-
}
61-
62-
await Promise.race(Array.from(this._pending).map((item) => item.promise));
63-
64-
if (isDone) {
65-
// a different call to next has exhausted all payloads
66-
return { value: undefined, done: true };
69+
// eslint-disable-next-line no-constant-condition
70+
while (true) {
71+
if (isDone) {
72+
return { value: undefined, done: true };
73+
}
74+
75+
for (const item of this._released) {
76+
this._pending.delete(item);
77+
}
78+
const released = this._released;
79+
this._released = new Set();
80+
81+
const result = this._update(released, this);
82+
83+
if (!this.hasNext()) {
84+
isDone = true;
85+
}
86+
87+
if (result !== undefined) {
88+
return { value: result, done: false };
89+
}
90+
91+
// eslint-disable-next-line no-await-in-loop
92+
await this._signalled;
6793
}
68-
69-
const result = this._update(this._pending, this);
70-
const hasNext = this._pending.size > 0;
71-
72-
if (result === undefined && hasNext) {
73-
return _next();
74-
}
75-
76-
if (!hasNext) {
77-
isDone = true;
78-
}
79-
80-
return { value: result as R, done: false };
8194
};
8295

8396
const _return = async (): Promise<IteratorResult<R, void>> => {

0 commit comments

Comments
 (0)