Skip to content

Commit b56e737

Browse files
committed
extract current Publisher without change
1 parent d75872f commit b56e737

File tree

2 files changed

+158
-93
lines changed

2 files changed

+158
-93
lines changed

src/execution/execute.ts

Lines changed: 52 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { addPath, pathToArray } from '../jsutils/Path.js';
1212
import { promiseForObject } from '../jsutils/promiseForObject.js';
1313
import type { PromiseOrValue } from '../jsutils/PromiseOrValue.js';
1414
import { promiseReduce } from '../jsutils/promiseReduce.js';
15+
import { Publisher } from '../jsutils/Publisher.js';
1516

1617
import type { GraphQLFormattedError } from '../error/GraphQLError.js';
1718
import { GraphQLError } from '../error/GraphQLError.js';
@@ -121,7 +122,10 @@ export interface ExecutionContext {
121122
typeResolver: GraphQLTypeResolver<any, any>;
122123
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
123124
errors: Array<GraphQLError>;
124-
subsequentPayloads: Set<IncrementalDataRecord>;
125+
publisher: Publisher<
126+
IncrementalDataRecord,
127+
SubsequentIncrementalExecutionResult
128+
>;
125129
}
126130

127131
/**
@@ -351,43 +355,44 @@ function executeImpl(
351355
// Errors from sub-fields of a NonNull type may propagate to the top level,
352356
// at which point we still log the error and null the parent field, which
353357
// in this case is the entire response.
358+
const { publisher, errors } = exeContext;
354359
try {
355360
const result = executeOperation(exeContext);
356361
if (isPromise(result)) {
357362
return result.then(
358363
(data) => {
359-
const initialResult = buildResponse(data, exeContext.errors);
360-
if (exeContext.subsequentPayloads.size > 0) {
364+
const initialResult = buildResponse(data, errors);
365+
if (publisher.hasNext()) {
361366
return {
362367
initialResult: {
363368
...initialResult,
364369
hasNext: true,
365370
},
366-
subsequentResults: yieldSubsequentPayloads(exeContext),
371+
subsequentResults: publisher.subscribe(),
367372
};
368373
}
369374
return initialResult;
370375
},
371376
(error) => {
372-
exeContext.errors.push(error);
373-
return buildResponse(null, exeContext.errors);
377+
errors.push(error);
378+
return buildResponse(null, errors);
374379
},
375380
);
376381
}
377-
const initialResult = buildResponse(result, exeContext.errors);
378-
if (exeContext.subsequentPayloads.size > 0) {
382+
const initialResult = buildResponse(result, errors);
383+
if (publisher.hasNext()) {
379384
return {
380385
initialResult: {
381386
...initialResult,
382387
hasNext: true,
383388
},
384-
subsequentResults: yieldSubsequentPayloads(exeContext),
389+
subsequentResults: publisher.subscribe(),
385390
};
386391
}
387392
return initialResult;
388393
} catch (error) {
389-
exeContext.errors.push(error);
390-
return buildResponse(null, exeContext.errors);
394+
errors.push(error);
395+
return buildResponse(null, errors);
391396
}
392397
}
393398

@@ -503,7 +508,7 @@ export function buildExecutionContext(
503508
fieldResolver: fieldResolver ?? defaultFieldResolver,
504509
typeResolver: typeResolver ?? defaultTypeResolver,
505510
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
506-
subsequentPayloads: new Set(),
511+
publisher: new Publisher(getIncrementalResult, returnStreamIterators),
507512
errors: [],
508513
};
509514
}
@@ -515,7 +520,7 @@ function buildPerEventExecutionContext(
515520
return {
516521
...exeContext,
517522
rootValue: payload,
518-
subsequentPayloads: new Set(),
523+
// no need to update publisher, incremental delivery is not supported for subscriptions
519524
errors: [],
520525
};
521526
}
@@ -2098,7 +2103,8 @@ function filterSubsequentPayloads(
20982103
currentIncrementalDataRecord: IncrementalDataRecord | undefined,
20992104
): void {
21002105
const nullPathArray = pathToArray(nullPath);
2101-
exeContext.subsequentPayloads.forEach((incrementalDataRecord) => {
2106+
const publisher = exeContext.publisher;
2107+
publisher.getPending().forEach((incrementalDataRecord) => {
21022108
if (incrementalDataRecord === currentIncrementalDataRecord) {
21032109
// don't remove payload from where error originates
21042110
return;
@@ -2118,24 +2124,30 @@ function filterSubsequentPayloads(
21182124
// ignore error
21192125
});
21202126
}
2121-
exeContext.subsequentPayloads.delete(incrementalDataRecord);
2127+
publisher.delete(incrementalDataRecord);
21222128
});
21232129
}
21242130

2125-
function getCompletedIncrementalResults(
2126-
exeContext: ExecutionContext,
2127-
): Array<IncrementalResult> {
2131+
function getIncrementalResult(
2132+
pending: ReadonlySet<IncrementalDataRecord>,
2133+
publisher: Publisher<
2134+
IncrementalDataRecord,
2135+
SubsequentIncrementalExecutionResult
2136+
>,
2137+
): SubsequentIncrementalExecutionResult | undefined {
21282138
const incrementalResults: Array<IncrementalResult> = [];
2129-
for (const incrementalDataRecord of exeContext.subsequentPayloads) {
2139+
let encounteredCompletedAsyncIterator = false;
2140+
for (const incrementalDataRecord of pending) {
21302141
const incrementalResult: IncrementalResult = {};
21312142
if (!incrementalDataRecord.isCompleted) {
21322143
continue;
21332144
}
2134-
exeContext.subsequentPayloads.delete(incrementalDataRecord);
2145+
publisher.delete(incrementalDataRecord);
21352146
if (isStreamItemsRecord(incrementalDataRecord)) {
21362147
const items = incrementalDataRecord.items;
21372148
if (incrementalDataRecord.isCompletedAsyncIterator) {
21382149
// async iterable resolver just finished but there may be pending payloads
2150+
encounteredCompletedAsyncIterator = true;
21392151
continue;
21402152
}
21412153
(incrementalResult as IncrementalStreamResult).items = items;
@@ -2153,80 +2165,27 @@ function getCompletedIncrementalResults(
21532165
}
21542166
incrementalResults.push(incrementalResult);
21552167
}
2156-
return incrementalResults;
2157-
}
2158-
2159-
function yieldSubsequentPayloads(
2160-
exeContext: ExecutionContext,
2161-
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
2162-
let isDone = false;
2163-
2164-
async function next(): Promise<
2165-
IteratorResult<SubsequentIncrementalExecutionResult, void>
2166-
> {
2167-
if (isDone) {
2168-
return { value: undefined, done: true };
2169-
}
2170-
2171-
await Promise.race(
2172-
Array.from(exeContext.subsequentPayloads).map((p) => p.promise),
2173-
);
21742168

2175-
if (isDone) {
2176-
// a different call to next has exhausted all payloads
2177-
return { value: undefined, done: true };
2178-
}
2179-
2180-
const incremental = getCompletedIncrementalResults(exeContext);
2181-
const hasNext = exeContext.subsequentPayloads.size > 0;
2182-
2183-
if (!incremental.length && hasNext) {
2184-
return next();
2185-
}
2169+
return incrementalResults.length
2170+
? { incremental: incrementalResults, hasNext: publisher.hasNext() }
2171+
: encounteredCompletedAsyncIterator && !publisher.hasNext()
2172+
? { hasNext: false }
2173+
: undefined;
2174+
}
21862175

2187-
if (!hasNext) {
2188-
isDone = true;
2176+
async function returnStreamIterators(
2177+
pendingRecords: ReadonlySet<IncrementalDataRecord>,
2178+
): Promise<void> {
2179+
const promises: Array<Promise<IteratorResult<unknown>>> = [];
2180+
pendingRecords.forEach((incrementalDataRecord) => {
2181+
if (
2182+
isStreamItemsRecord(incrementalDataRecord) &&
2183+
incrementalDataRecord.asyncIterator?.return
2184+
) {
2185+
promises.push(incrementalDataRecord.asyncIterator.return());
21892186
}
2190-
2191-
return {
2192-
value: incremental.length ? { incremental, hasNext } : { hasNext },
2193-
done: false,
2194-
};
2195-
}
2196-
2197-
function returnStreamIterators() {
2198-
const promises: Array<Promise<IteratorResult<unknown>>> = [];
2199-
exeContext.subsequentPayloads.forEach((incrementalDataRecord) => {
2200-
if (
2201-
isStreamItemsRecord(incrementalDataRecord) &&
2202-
incrementalDataRecord.asyncIterator?.return
2203-
) {
2204-
promises.push(incrementalDataRecord.asyncIterator.return());
2205-
}
2206-
});
2207-
return Promise.all(promises);
2208-
}
2209-
2210-
return {
2211-
[Symbol.asyncIterator]() {
2212-
return this;
2213-
},
2214-
next,
2215-
async return(): Promise<
2216-
IteratorResult<SubsequentIncrementalExecutionResult, void>
2217-
> {
2218-
await returnStreamIterators();
2219-
isDone = true;
2220-
return { value: undefined, done: true };
2221-
},
2222-
async throw(
2223-
error?: unknown,
2224-
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
2225-
await returnStreamIterators();
2226-
isDone = true;
2227-
return Promise.reject(error);
2228-
},
2229-
};
2187+
});
2188+
await Promise.all(promises);
22302189
}
22312190

22322191
class DeferredFragmentRecord {
@@ -2252,7 +2211,7 @@ class DeferredFragmentRecord {
22522211
this.parentContext = opts.parentContext;
22532212
this.errors = [];
22542213
this._exeContext = opts.exeContext;
2255-
this._exeContext.subsequentPayloads.add(this);
2214+
this._exeContext.publisher.add(this);
22562215
this.isCompleted = false;
22572216
this.data = null;
22582217
this.promise = new Promise<ObjMap<unknown> | null>((resolve) => {
@@ -2303,7 +2262,7 @@ class StreamItemsRecord {
23032262
this.asyncIterator = opts.asyncIterator;
23042263
this.errors = [];
23052264
this._exeContext = opts.exeContext;
2306-
this._exeContext.subsequentPayloads.add(this);
2265+
this._exeContext.publisher.add(this);
23072266
this.isCompleted = false;
23082267
this.items = null;
23092268
this.promise = new Promise<Array<unknown> | null>((resolve) => {

src/jsutils/Publisher.ts

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
interface ContainsPromise {
2+
promise: Promise<void>;
3+
}
4+
5+
/** @internal */
6+
export class Publisher<I extends ContainsPromise, R> {
7+
_pending: Set<I>;
8+
_update: (completed: Set<I>, publisher: Publisher<I, R>) => R | undefined;
9+
_onAbruptClose: (pending: ReadonlySet<I>) => Promise<void>;
10+
11+
// these are assigned within the Promise executor called synchronously within the constructor
12+
_signalled!: Promise<void>;
13+
_resolve!: () => void;
14+
15+
constructor(
16+
update: (
17+
released: ReadonlySet<I>,
18+
publisher: Publisher<I, R>,
19+
) => R | undefined,
20+
onAbruptClose: (pending: ReadonlySet<I>) => Promise<void>,
21+
) {
22+
this._pending = new Set();
23+
this._update = update;
24+
this._onAbruptClose = onAbruptClose;
25+
this._reset();
26+
}
27+
28+
_trigger() {
29+
this._resolve();
30+
this._reset();
31+
}
32+
33+
_reset() {
34+
this._signalled = new Promise<void>((resolve) => (this._resolve = resolve));
35+
}
36+
37+
getPending(): ReadonlySet<I> {
38+
return this._pending;
39+
}
40+
41+
hasNext(): boolean {
42+
return this._pending.size > 0;
43+
}
44+
45+
add(item: I) {
46+
this._pending.add(item);
47+
}
48+
49+
delete(item: I) {
50+
this._pending.delete(item);
51+
this._trigger();
52+
}
53+
54+
subscribe(): AsyncGenerator<R, void, void> {
55+
let isDone = false;
56+
57+
const _next = async (): Promise<IteratorResult<R, void>> => {
58+
if (isDone) {
59+
return { value: undefined, done: true };
60+
}
61+
62+
await Promise.race(Array.from(this._pending).map((item) => item.promise));
63+
64+
if (isDone) {
65+
// a different call to next has exhausted all payloads
66+
return { value: undefined, done: true };
67+
}
68+
69+
const result = this._update(this._pending, this);
70+
const hasNext = this._pending.size > 0;
71+
72+
if (result === undefined && hasNext) {
73+
return _next();
74+
}
75+
76+
if (!hasNext) {
77+
isDone = true;
78+
}
79+
80+
return { value: result as R, done: false };
81+
};
82+
83+
const _return = async (): Promise<IteratorResult<R, void>> => {
84+
isDone = true;
85+
await this._onAbruptClose(this._pending);
86+
return { value: undefined, done: true };
87+
};
88+
89+
const _throw = async (
90+
error?: unknown,
91+
): Promise<IteratorResult<R, void>> => {
92+
isDone = true;
93+
await this._onAbruptClose(this._pending);
94+
return Promise.reject(error);
95+
};
96+
97+
return {
98+
[Symbol.asyncIterator]() {
99+
return this;
100+
},
101+
next: _next,
102+
return: _return,
103+
throw: _throw,
104+
};
105+
}
106+
}

0 commit comments

Comments
 (0)