Skip to content

Commit 9204d78

Browse files
committed
extract publisher
1 parent 7a609a2 commit 9204d78

File tree

1 file changed

+172
-119
lines changed

1 file changed

+172
-119
lines changed

src/execution/execute.ts

Lines changed: 172 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,11 @@ export interface ExecutionContext {
121121
typeResolver: GraphQLTypeResolver<any, any>;
122122
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
123123
errors: Array<GraphQLError>;
124-
subsequentPayloads: Set<AsyncPayloadRecord>;
124+
publisher: Publisher<
125+
AsyncPayloadRecord,
126+
IncrementalResult,
127+
SubsequentIncrementalExecutionResult
128+
>;
125129
}
126130

127131
/**
@@ -357,13 +361,14 @@ function executeImpl(
357361
return result.then(
358362
(data) => {
359363
const initialResult = buildResponse(data, exeContext.errors);
360-
if (exeContext.subsequentPayloads.size > 0) {
364+
const publisher = exeContext.publisher;
365+
if (publisher.hasNext()) {
361366
return {
362367
initialResult: {
363368
...initialResult,
364369
hasNext: true,
365370
},
366-
subsequentResults: yieldSubsequentPayloads(exeContext),
371+
subsequentResults: publisher.subscribe(),
367372
};
368373
}
369374
return initialResult;
@@ -375,13 +380,14 @@ function executeImpl(
375380
);
376381
}
377382
const initialResult = buildResponse(result, exeContext.errors);
378-
if (exeContext.subsequentPayloads.size > 0) {
383+
const publisher = exeContext.publisher;
384+
if (publisher.hasNext()) {
379385
return {
380386
initialResult: {
381387
...initialResult,
382388
hasNext: true,
383389
},
384-
subsequentResults: yieldSubsequentPayloads(exeContext),
390+
subsequentResults: publisher.subscribe(),
385391
};
386392
}
387393
return initialResult;
@@ -503,7 +509,7 @@ export function buildExecutionContext(
503509
fieldResolver: fieldResolver ?? defaultFieldResolver,
504510
typeResolver: typeResolver ?? defaultTypeResolver,
505511
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
506-
subsequentPayloads: new Set(),
512+
publisher: new Publisher(resultFromAsyncPayloadRecord, payloadFromResults),
507513
errors: [],
508514
};
509515
}
@@ -515,7 +521,7 @@ function buildPerEventExecutionContext(
515521
return {
516522
...exeContext,
517523
rootValue: payload,
518-
subsequentPayloads: new Set(),
524+
publisher: new Publisher(resultFromAsyncPayloadRecord, payloadFromResults),
519525
errors: [],
520526
};
521527
}
@@ -2038,132 +2044,49 @@ function filterSubsequentPayloads(
20382044
currentAsyncRecord: AsyncPayloadRecord | undefined,
20392045
): void {
20402046
const nullPathArray = pathToArray(nullPath);
2041-
exeContext.subsequentPayloads.forEach((asyncRecord) => {
2047+
exeContext.publisher.filter((asyncRecord) => {
20422048
if (asyncRecord === currentAsyncRecord) {
20432049
// don't remove payload from where error originates
2044-
return;
2050+
return true;
20452051
}
20462052
for (let i = 0; i < nullPathArray.length; i++) {
20472053
if (asyncRecord.path[i] !== nullPathArray[i]) {
20482054
// asyncRecord points to a path unaffected by this payload
2049-
return;
2055+
return true;
20502056
}
20512057
}
2052-
// asyncRecord path points to nulled error field
2053-
if (isStreamPayload(asyncRecord) && asyncRecord.iterator?.return) {
2054-
asyncRecord.iterator.return().catch(() => {
2055-
// ignore error
2056-
});
2057-
}
2058-
exeContext.subsequentPayloads.delete(asyncRecord);
2058+
2059+
return false;
20592060
});
20602061
}
20612062

2062-
function getCompletedIncrementalResults(
2063-
exeContext: ExecutionContext,
2064-
): Array<IncrementalResult> {
2065-
const incrementalResults: Array<IncrementalResult> = [];
2066-
for (const asyncPayloadRecord of exeContext.subsequentPayloads) {
2067-
const incrementalResult: IncrementalResult = {};
2068-
if (!asyncPayloadRecord.isCompleted) {
2069-
continue;
2070-
}
2071-
exeContext.subsequentPayloads.delete(asyncPayloadRecord);
2072-
if (isStreamPayload(asyncPayloadRecord)) {
2073-
const items = asyncPayloadRecord.items;
2074-
if (asyncPayloadRecord.isCompletedIterator) {
2075-
// async iterable resolver just finished but there may be pending payloads
2076-
continue;
2077-
}
2078-
(incrementalResult as IncrementalStreamResult).items = items;
2079-
} else {
2080-
const data = asyncPayloadRecord.data;
2081-
(incrementalResult as IncrementalDeferResult).data = data ?? null;
2082-
}
2083-
2084-
incrementalResult.path = asyncPayloadRecord.path;
2085-
if (asyncPayloadRecord.label) {
2086-
incrementalResult.label = asyncPayloadRecord.label;
2087-
}
2088-
if (asyncPayloadRecord.errors.length > 0) {
2089-
incrementalResult.errors = asyncPayloadRecord.errors;
2090-
}
2091-
incrementalResults.push(incrementalResult);
2063+
function resultFromAsyncPayloadRecord(
2064+
asyncPayloadRecord: AsyncPayloadRecord,
2065+
): IncrementalResult {
2066+
const incrementalResult: IncrementalResult = {};
2067+
if (isStreamPayload(asyncPayloadRecord)) {
2068+
const items = asyncPayloadRecord.items;
2069+
(incrementalResult as IncrementalStreamResult).items = items;
2070+
} else {
2071+
const data = asyncPayloadRecord.data;
2072+
(incrementalResult as IncrementalDeferResult).data = data ?? null;
20922073
}
2093-
return incrementalResults;
2094-
}
2095-
2096-
function yieldSubsequentPayloads(
2097-
exeContext: ExecutionContext,
2098-
): AsyncGenerator<SubsequentIncrementalExecutionResult, void, void> {
2099-
let isDone = false;
2100-
2101-
async function next(): Promise<
2102-
IteratorResult<SubsequentIncrementalExecutionResult, void>
2103-
> {
2104-
if (isDone) {
2105-
return { value: undefined, done: true };
2106-
}
2107-
2108-
await Promise.race(
2109-
Array.from(exeContext.subsequentPayloads).map((p) => p.promise),
2110-
);
2111-
2112-
if (isDone) {
2113-
// a different call to next has exhausted all payloads
2114-
return { value: undefined, done: true };
2115-
}
21162074

2117-
const incremental = getCompletedIncrementalResults(exeContext);
2118-
const hasNext = exeContext.subsequentPayloads.size > 0;
2119-
2120-
if (!incremental.length && hasNext) {
2121-
return next();
2122-
}
2123-
2124-
if (!hasNext) {
2125-
isDone = true;
2126-
}
2127-
2128-
return {
2129-
value: incremental.length ? { incremental, hasNext } : { hasNext },
2130-
done: false,
2131-
};
2075+
incrementalResult.path = asyncPayloadRecord.path;
2076+
if (asyncPayloadRecord.label) {
2077+
incrementalResult.label = asyncPayloadRecord.label;
21322078
}
2133-
2134-
function returnStreamIterators() {
2135-
const promises: Array<Promise<IteratorResult<unknown>>> = [];
2136-
exeContext.subsequentPayloads.forEach((asyncPayloadRecord) => {
2137-
if (
2138-
isStreamPayload(asyncPayloadRecord) &&
2139-
asyncPayloadRecord.iterator?.return
2140-
) {
2141-
promises.push(asyncPayloadRecord.iterator.return());
2142-
}
2143-
});
2144-
return Promise.all(promises);
2079+
if (asyncPayloadRecord.errors.length > 0) {
2080+
incrementalResult.errors = asyncPayloadRecord.errors;
21452081
}
2082+
return incrementalResult;
2083+
}
21462084

2147-
return {
2148-
[Symbol.asyncIterator]() {
2149-
return this;
2150-
},
2151-
next,
2152-
async return(): Promise<
2153-
IteratorResult<SubsequentIncrementalExecutionResult, void>
2154-
> {
2155-
await returnStreamIterators();
2156-
isDone = true;
2157-
return { value: undefined, done: true };
2158-
},
2159-
async throw(
2160-
error?: unknown,
2161-
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
2162-
await returnStreamIterators();
2163-
isDone = true;
2164-
return Promise.reject(error);
2165-
},
2166-
};
2085+
function payloadFromResults(
2086+
incremental: ReadonlyArray<IncrementalResult>,
2087+
hasNext: boolean,
2088+
): SubsequentIncrementalExecutionResult {
2089+
return incremental.length ? { incremental, hasNext } : { hasNext };
21672090
}
21682091

21692092
class DeferredFragmentRecord {
@@ -2189,7 +2112,7 @@ class DeferredFragmentRecord {
21892112
this.parentContext = opts.parentContext;
21902113
this.errors = [];
21912114
this._exeContext = opts.exeContext;
2192-
this._exeContext.subsequentPayloads.add(this);
2115+
this._exeContext.publisher.add(this);
21932116
this.isCompleted = false;
21942117
this.data = null;
21952118
this.promise = new Promise<ObjMap<unknown> | null>((resolve) => {
@@ -2240,7 +2163,7 @@ class StreamRecord {
22402163
this.iterator = opts.iterator;
22412164
this.errors = [];
22422165
this._exeContext = opts.exeContext;
2243-
this._exeContext.subsequentPayloads.add(this);
2166+
this._exeContext.publisher.add(this);
22442167
this.isCompleted = false;
22452168
this.items = null;
22462169
this.promise = new Promise<Array<unknown> | null>((resolve) => {
@@ -2274,3 +2197,133 @@ function isStreamPayload(
22742197
): asyncPayload is StreamRecord {
22752198
return asyncPayload.type === 'stream';
22762199
}
2200+
2201+
interface Source {
2202+
promise: Promise<void>;
2203+
isCompleted: boolean;
2204+
isCompletedIterator?: boolean | undefined;
2205+
iterator?: AsyncIterator<unknown> | undefined;
2206+
}
2207+
2208+
type ToIncrementalResult<TSource extends Source, TIncremental> = (
2209+
source: TSource,
2210+
) => TIncremental;
2211+
2212+
type ToPayload<TIncremental, TPayload> = (
2213+
incremental: ReadonlyArray<TIncremental>,
2214+
hasNext: boolean,
2215+
) => TPayload;
2216+
2217+
/**
2218+
* @internal
2219+
*/
2220+
export class Publisher<TSource extends Source, TIncremental, TPayload> {
2221+
sources: Set<TSource>;
2222+
toIncrementalResult: ToIncrementalResult<TSource, TIncremental>;
2223+
toPayload: ToPayload<TIncremental, TPayload>;
2224+
2225+
constructor(
2226+
toIncrementalResult: ToIncrementalResult<TSource, TIncremental>,
2227+
toPayload: ToPayload<TIncremental, TPayload>,
2228+
) {
2229+
this.sources = new Set();
2230+
this.toIncrementalResult = toIncrementalResult;
2231+
this.toPayload = toPayload;
2232+
}
2233+
2234+
add(source: TSource) {
2235+
this.sources.add(source);
2236+
}
2237+
2238+
hasNext(): boolean {
2239+
return this.sources.size > 0;
2240+
}
2241+
2242+
filter(predicate: (source: TSource) => boolean): void {
2243+
this.sources.forEach((source) => {
2244+
if (predicate(source)) {
2245+
return;
2246+
}
2247+
if (source.iterator?.return) {
2248+
source.iterator.return().catch(() => {
2249+
// ignore error
2250+
});
2251+
}
2252+
this.sources.delete(source);
2253+
});
2254+
}
2255+
2256+
_getCompletedIncrementalResults(): Array<TIncremental> {
2257+
const incrementalResults: Array<TIncremental> = [];
2258+
for (const source of this.sources) {
2259+
if (!source.isCompleted) {
2260+
continue;
2261+
}
2262+
this.sources.delete(source);
2263+
if (source.isCompletedIterator) {
2264+
continue;
2265+
}
2266+
incrementalResults.push(this.toIncrementalResult(source));
2267+
}
2268+
return incrementalResults;
2269+
}
2270+
2271+
subscribe(): AsyncGenerator<TPayload, void, void> {
2272+
let isDone = false;
2273+
2274+
const next = async (): Promise<IteratorResult<TPayload, void>> => {
2275+
if (isDone) {
2276+
return { value: undefined, done: true };
2277+
}
2278+
2279+
await Promise.race(Array.from(this.sources).map((p) => p.promise));
2280+
2281+
if (isDone) {
2282+
return { value: undefined, done: true };
2283+
}
2284+
2285+
const incremental = this._getCompletedIncrementalResults();
2286+
const hasNext = this.sources.size > 0;
2287+
2288+
if (!incremental.length && hasNext) {
2289+
return next();
2290+
}
2291+
2292+
if (!hasNext) {
2293+
isDone = true;
2294+
}
2295+
2296+
return {
2297+
value: this.toPayload(incremental, hasNext),
2298+
done: false,
2299+
};
2300+
};
2301+
2302+
const returnIterators = () => {
2303+
const promises: Array<Promise<IteratorResult<unknown>>> = [];
2304+
this.sources.forEach((source) => {
2305+
if (source.iterator?.return) {
2306+
promises.push(source.iterator.return());
2307+
}
2308+
});
2309+
return Promise.all(promises);
2310+
};
2311+
2312+
return {
2313+
[Symbol.asyncIterator]() {
2314+
return this;
2315+
},
2316+
next,
2317+
async return(): Promise<IteratorResult<TPayload, void>> {
2318+
await returnIterators();
2319+
isDone = true;
2320+
return { value: undefined, done: true };
2321+
},
2322+
async throw(error?: unknown): Promise<IteratorResult<TPayload, void>> {
2323+
await returnIterators();
2324+
isDone = true;
2325+
return Promise.reject(error);
2326+
},
2327+
};
2328+
}
2329+
}

0 commit comments

Comments
 (0)