Skip to content

Commit 0fda94a

Browse files
committed
extract publisher to separate file
1 parent 400bdf0 commit 0fda94a

File tree

2 files changed

+168
-154
lines changed

2 files changed

+168
-154
lines changed

src/execution/execute.ts

Lines changed: 3 additions & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import {
5353
collectSubfields as _collectSubfields,
5454
} from './collectFields.js';
5555
import { mapAsyncIterable } from './mapAsyncIterable.js';
56+
import { Publisher } from './publisher.js';
5657
import {
5758
getArgumentValues,
5859
getDirectiveValues,
@@ -2052,154 +2053,6 @@ async function executeStreamIterator(
20522053
}
20532054
}
20542055

2055-
/**
2056-
* @internal
2057-
*/
2058-
class Publisher {
2059-
subsequentPayloads: Set<AsyncPayloadRecord>;
2060-
2061-
constructor() {
2062-
this.subsequentPayloads = new Set();
2063-
}
2064-
2065-
add(payload: AsyncPayloadRecord) {
2066-
this.subsequentPayloads.add(payload);
2067-
}
2068-
2069-
filterSubsequentPayloads(
2070-
nullPath: Path,
2071-
currentAsyncRecord: AsyncPayloadRecord | undefined,
2072-
): void {
2073-
const nullPathArray = pathToArray(nullPath);
2074-
this.subsequentPayloads.forEach((asyncRecord) => {
2075-
if (asyncRecord === currentAsyncRecord) {
2076-
// don't remove payload from where error originates
2077-
return;
2078-
}
2079-
for (let i = 0; i < nullPathArray.length; i++) {
2080-
if (asyncRecord.path[i] !== nullPathArray[i]) {
2081-
// asyncRecord points to a path unaffected by this payload
2082-
return;
2083-
}
2084-
}
2085-
// asyncRecord path points to nulled error field
2086-
if (isStreamPayload(asyncRecord) && asyncRecord.iterator?.return) {
2087-
asyncRecord.iterator.return().catch(() => {
2088-
// ignore error
2089-
});
2090-
}
2091-
this.subsequentPayloads.delete(asyncRecord);
2092-
});
2093-
}
2094-
2095-
getCompletedIncrementalResults(): Array<IncrementalResult> {
2096-
const incrementalResults: Array<IncrementalResult> = [];
2097-
for (const asyncPayloadRecord of this.subsequentPayloads) {
2098-
const incrementalResult: IncrementalResult = {};
2099-
if (!asyncPayloadRecord.isCompleted) {
2100-
continue;
2101-
}
2102-
this.subsequentPayloads.delete(asyncPayloadRecord);
2103-
if (isStreamPayload(asyncPayloadRecord)) {
2104-
const items = asyncPayloadRecord.items;
2105-
if (asyncPayloadRecord.isCompletedIterator) {
2106-
// async iterable resolver just finished but there may be pending payloads
2107-
continue;
2108-
}
2109-
(incrementalResult as IncrementalStreamResult).items = items;
2110-
} else {
2111-
const data = asyncPayloadRecord.data;
2112-
(incrementalResult as IncrementalDeferResult).data = data ?? null;
2113-
}
2114-
2115-
incrementalResult.path = asyncPayloadRecord.path;
2116-
if (asyncPayloadRecord.label) {
2117-
incrementalResult.label = asyncPayloadRecord.label;
2118-
}
2119-
if (asyncPayloadRecord.errors.length > 0) {
2120-
incrementalResult.errors = asyncPayloadRecord.errors;
2121-
}
2122-
incrementalResults.push(incrementalResult);
2123-
}
2124-
return incrementalResults;
2125-
}
2126-
2127-
yieldSubsequentPayloads(): AsyncGenerator<
2128-
SubsequentIncrementalExecutionResult,
2129-
void,
2130-
void
2131-
> {
2132-
let isDone = false;
2133-
2134-
const next = async (): Promise<
2135-
IteratorResult<SubsequentIncrementalExecutionResult, void>
2136-
> => {
2137-
if (isDone) {
2138-
return { value: undefined, done: true };
2139-
}
2140-
2141-
await Promise.race(
2142-
Array.from(this.subsequentPayloads).map((p) => p.promise),
2143-
);
2144-
2145-
if (isDone) {
2146-
// a different call to next has exhausted all payloads
2147-
return { value: undefined, done: true };
2148-
}
2149-
2150-
const incremental = this.getCompletedIncrementalResults();
2151-
const hasNext = this.subsequentPayloads.size > 0;
2152-
2153-
if (!incremental.length && hasNext) {
2154-
return next();
2155-
}
2156-
2157-
if (!hasNext) {
2158-
isDone = true;
2159-
}
2160-
2161-
return {
2162-
value: incremental.length ? { incremental, hasNext } : { hasNext },
2163-
done: false,
2164-
};
2165-
};
2166-
2167-
const returnStreamIterators = () => {
2168-
const promises: Array<Promise<IteratorResult<unknown>>> = [];
2169-
this.subsequentPayloads.forEach((asyncPayloadRecord) => {
2170-
if (
2171-
isStreamPayload(asyncPayloadRecord) &&
2172-
asyncPayloadRecord.iterator?.return
2173-
) {
2174-
promises.push(asyncPayloadRecord.iterator.return());
2175-
}
2176-
});
2177-
return Promise.all(promises);
2178-
};
2179-
2180-
return {
2181-
[Symbol.asyncIterator]() {
2182-
return this;
2183-
},
2184-
next,
2185-
async return(): Promise<
2186-
IteratorResult<SubsequentIncrementalExecutionResult, void>
2187-
> {
2188-
await returnStreamIterators();
2189-
isDone = true;
2190-
return { value: undefined, done: true };
2191-
},
2192-
async throw(
2193-
error?: unknown,
2194-
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
2195-
await returnStreamIterators();
2196-
isDone = true;
2197-
return Promise.reject(error);
2198-
},
2199-
};
2200-
}
2201-
}
2202-
22032056
class DeferredFragmentRecord {
22042057
type: 'defer';
22052058
errors: Array<GraphQLError>;
@@ -2301,10 +2154,6 @@ class StreamRecord {
23012154
}
23022155
}
23032156

2304-
type AsyncPayloadRecord = DeferredFragmentRecord | StreamRecord;
2157+
export type { StreamRecord };
23052158

2306-
function isStreamPayload(
2307-
asyncPayload: AsyncPayloadRecord,
2308-
): asyncPayload is StreamRecord {
2309-
return asyncPayload.type === 'stream';
2310-
}
2159+
export type AsyncPayloadRecord = DeferredFragmentRecord | StreamRecord;

src/execution/publisher.ts

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import { Path, pathToArray } from '../jsutils/Path.js';
2+
3+
import type {
4+
AsyncPayloadRecord,
5+
StreamRecord,
6+
IncrementalDeferResult,
7+
IncrementalResult,
8+
IncrementalStreamResult,
9+
SubsequentIncrementalExecutionResult,
10+
} from './execute.js';
11+
12+
/**
13+
* @internal
14+
*/
15+
export class Publisher {
16+
subsequentPayloads: Set<AsyncPayloadRecord>;
17+
18+
constructor() {
19+
this.subsequentPayloads = new Set();
20+
}
21+
22+
add(payload: AsyncPayloadRecord) {
23+
this.subsequentPayloads.add(payload);
24+
}
25+
26+
filterSubsequentPayloads(
27+
nullPath: Path,
28+
currentAsyncRecord: AsyncPayloadRecord | undefined,
29+
): void {
30+
const nullPathArray = pathToArray(nullPath);
31+
this.subsequentPayloads.forEach((asyncRecord) => {
32+
if (asyncRecord === currentAsyncRecord) {
33+
// don't remove payload from where error originates
34+
return;
35+
}
36+
for (let i = 0; i < nullPathArray.length; i++) {
37+
if (asyncRecord.path[i] !== nullPathArray[i]) {
38+
// asyncRecord points to a path unaffected by this payload
39+
return;
40+
}
41+
}
42+
// asyncRecord path points to nulled error field
43+
if (isStreamPayload(asyncRecord) && asyncRecord.iterator?.return) {
44+
asyncRecord.iterator.return().catch(() => {
45+
// ignore error
46+
});
47+
}
48+
this.subsequentPayloads.delete(asyncRecord);
49+
});
50+
}
51+
52+
getCompletedIncrementalResults(): Array<IncrementalResult> {
53+
const incrementalResults: Array<IncrementalResult> = [];
54+
for (const asyncPayloadRecord of this.subsequentPayloads) {
55+
const incrementalResult: IncrementalResult = {};
56+
if (!asyncPayloadRecord.isCompleted) {
57+
continue;
58+
}
59+
this.subsequentPayloads.delete(asyncPayloadRecord);
60+
if (isStreamPayload(asyncPayloadRecord)) {
61+
const items = asyncPayloadRecord.items;
62+
if (asyncPayloadRecord.isCompletedIterator) {
63+
// async iterable resolver just finished but there may be pending payloads
64+
continue;
65+
}
66+
(incrementalResult as IncrementalStreamResult).items = items;
67+
} else {
68+
const data = asyncPayloadRecord.data;
69+
(incrementalResult as IncrementalDeferResult).data = data ?? null;
70+
}
71+
72+
incrementalResult.path = asyncPayloadRecord.path;
73+
if (asyncPayloadRecord.label) {
74+
incrementalResult.label = asyncPayloadRecord.label;
75+
}
76+
if (asyncPayloadRecord.errors.length > 0) {
77+
incrementalResult.errors = asyncPayloadRecord.errors;
78+
}
79+
incrementalResults.push(incrementalResult);
80+
}
81+
return incrementalResults;
82+
}
83+
84+
yieldSubsequentPayloads(): AsyncGenerator<
85+
SubsequentIncrementalExecutionResult,
86+
void,
87+
void
88+
> {
89+
let isDone = false;
90+
const publisher = this;
91+
92+
async function next(): Promise<
93+
IteratorResult<SubsequentIncrementalExecutionResult, void>
94+
> {
95+
if (isDone) {
96+
return { value: undefined, done: true };
97+
}
98+
99+
await Promise.race(
100+
Array.from(publisher.subsequentPayloads).map((p) => p.promise),
101+
);
102+
103+
if (isDone) {
104+
// a different call to next has exhausted all payloads
105+
return { value: undefined, done: true };
106+
}
107+
108+
const incremental = publisher.getCompletedIncrementalResults();
109+
const hasNext = publisher.subsequentPayloads.size > 0;
110+
111+
if (!incremental.length && hasNext) {
112+
return next();
113+
}
114+
115+
if (!hasNext) {
116+
isDone = true;
117+
}
118+
119+
return {
120+
value: incremental.length ? { incremental, hasNext } : { hasNext },
121+
done: false,
122+
};
123+
}
124+
125+
function returnStreamIterators() {
126+
const promises: Array<Promise<IteratorResult<unknown>>> = [];
127+
publisher.subsequentPayloads.forEach((asyncPayloadRecord) => {
128+
if (
129+
isStreamPayload(asyncPayloadRecord) &&
130+
asyncPayloadRecord.iterator?.return
131+
) {
132+
promises.push(asyncPayloadRecord.iterator.return());
133+
}
134+
});
135+
return Promise.all(promises);
136+
}
137+
138+
return {
139+
[Symbol.asyncIterator]() {
140+
return this;
141+
},
142+
next,
143+
async return(): Promise<
144+
IteratorResult<SubsequentIncrementalExecutionResult, void>
145+
> {
146+
await returnStreamIterators();
147+
isDone = true;
148+
return { value: undefined, done: true };
149+
},
150+
async throw(
151+
error?: unknown,
152+
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
153+
await returnStreamIterators();
154+
isDone = true;
155+
return Promise.reject(error);
156+
},
157+
};
158+
}
159+
}
160+
161+
function isStreamPayload(
162+
asyncPayload: AsyncPayloadRecord,
163+
): asyncPayload is StreamRecord {
164+
return asyncPayload.type === 'stream';
165+
}

0 commit comments

Comments
 (0)