Skip to content

Commit 2de06f8

Browse files
committed
extract publisher to separate file
1 parent 773c90c commit 2de06f8

File tree

2 files changed

+168
-155
lines changed

2 files changed

+168
-155
lines changed

src/execution/execute.ts

Lines changed: 3 additions & 155 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import {
5454
} from './collectFields.js';
5555
import { flattenAsyncIterable } from './flattenAsyncIterable.js';
5656
import { mapAsyncIterable } from './mapAsyncIterable.js';
57+
import { Publisher } from './publisher.js';
5758
import {
5859
getArgumentValues,
5960
getDirectiveValues,
@@ -2104,155 +2105,6 @@ async function executeStreamIterator(
21042105
}
21052106
}
21062107

2107-
/**
2108-
* @internal
2109-
*/
2110-
class Publisher {
2111-
subsequentPayloads: Set<AsyncPayloadRecord>;
2112-
2113-
constructor() {
2114-
this.subsequentPayloads = new Set();
2115-
}
2116-
2117-
add(payload: AsyncPayloadRecord) {
2118-
this.subsequentPayloads.add(payload);
2119-
}
2120-
2121-
filterSubsequentPayloads(
2122-
nullPath: Path,
2123-
currentAsyncRecord: AsyncPayloadRecord | undefined,
2124-
): void {
2125-
const nullPathArray = pathToArray(nullPath);
2126-
this.subsequentPayloads.forEach((asyncRecord) => {
2127-
if (asyncRecord === currentAsyncRecord) {
2128-
// don't remove payload from where error originates
2129-
return;
2130-
}
2131-
for (let i = 0; i < nullPathArray.length; i++) {
2132-
if (asyncRecord.path[i] !== nullPathArray[i]) {
2133-
// asyncRecord points to a path unaffected by this payload
2134-
return;
2135-
}
2136-
}
2137-
// asyncRecord path points to nulled error field
2138-
if (isStreamPayload(asyncRecord) && asyncRecord.iterator?.return) {
2139-
asyncRecord.iterator.return().catch(() => {
2140-
// ignore error
2141-
});
2142-
}
2143-
this.subsequentPayloads.delete(asyncRecord);
2144-
});
2145-
}
2146-
2147-
getCompletedIncrementalResults(): Array<IncrementalResult> {
2148-
const incrementalResults: Array<IncrementalResult> = [];
2149-
for (const asyncPayloadRecord of this.subsequentPayloads) {
2150-
const incrementalResult: IncrementalResult = {};
2151-
if (!asyncPayloadRecord.isCompleted) {
2152-
continue;
2153-
}
2154-
this.subsequentPayloads.delete(asyncPayloadRecord);
2155-
if (isStreamPayload(asyncPayloadRecord)) {
2156-
const items = asyncPayloadRecord.items;
2157-
if (asyncPayloadRecord.isCompletedIterator) {
2158-
// async iterable resolver just finished but there may be pending payloads
2159-
continue;
2160-
}
2161-
(incrementalResult as IncrementalStreamResult).items = items;
2162-
} else {
2163-
const data = asyncPayloadRecord.data;
2164-
(incrementalResult as IncrementalDeferResult).data = data ?? null;
2165-
}
2166-
2167-
incrementalResult.path = asyncPayloadRecord.path;
2168-
if (asyncPayloadRecord.label) {
2169-
incrementalResult.label = asyncPayloadRecord.label;
2170-
}
2171-
if (asyncPayloadRecord.errors.length > 0) {
2172-
incrementalResult.errors = asyncPayloadRecord.errors;
2173-
}
2174-
incrementalResults.push(incrementalResult);
2175-
}
2176-
return incrementalResults;
2177-
}
2178-
2179-
yieldSubsequentPayloads(): AsyncGenerator<
2180-
SubsequentIncrementalExecutionResult,
2181-
void,
2182-
void
2183-
> {
2184-
let isDone = false;
2185-
const publisher = this;
2186-
2187-
async function next(): Promise<
2188-
IteratorResult<SubsequentIncrementalExecutionResult, void>
2189-
> {
2190-
if (isDone) {
2191-
return { value: undefined, done: true };
2192-
}
2193-
2194-
await Promise.race(
2195-
Array.from(publisher.subsequentPayloads).map((p) => p.promise),
2196-
);
2197-
2198-
if (isDone) {
2199-
// a different call to next has exhausted all payloads
2200-
return { value: undefined, done: true };
2201-
}
2202-
2203-
const incremental = publisher.getCompletedIncrementalResults();
2204-
const hasNext = publisher.subsequentPayloads.size > 0;
2205-
2206-
if (!incremental.length && hasNext) {
2207-
return next();
2208-
}
2209-
2210-
if (!hasNext) {
2211-
isDone = true;
2212-
}
2213-
2214-
return {
2215-
value: incremental.length ? { incremental, hasNext } : { hasNext },
2216-
done: false,
2217-
};
2218-
}
2219-
2220-
function returnStreamIterators() {
2221-
const promises: Array<Promise<IteratorResult<unknown>>> = [];
2222-
publisher.subsequentPayloads.forEach((asyncPayloadRecord) => {
2223-
if (
2224-
isStreamPayload(asyncPayloadRecord) &&
2225-
asyncPayloadRecord.iterator?.return
2226-
) {
2227-
promises.push(asyncPayloadRecord.iterator.return());
2228-
}
2229-
});
2230-
return Promise.all(promises);
2231-
}
2232-
2233-
return {
2234-
[Symbol.asyncIterator]() {
2235-
return this;
2236-
},
2237-
next,
2238-
async return(): Promise<
2239-
IteratorResult<SubsequentIncrementalExecutionResult, void>
2240-
> {
2241-
await returnStreamIterators();
2242-
isDone = true;
2243-
return { value: undefined, done: true };
2244-
},
2245-
async throw(
2246-
error?: unknown,
2247-
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
2248-
await returnStreamIterators();
2249-
isDone = true;
2250-
return Promise.reject(error);
2251-
},
2252-
};
2253-
}
2254-
}
2255-
22562108
class DeferredFragmentRecord {
22572109
type: 'defer';
22582110
errors: Array<GraphQLError>;
@@ -2354,10 +2206,6 @@ class StreamRecord {
23542206
}
23552207
}
23562208

2357-
type AsyncPayloadRecord = DeferredFragmentRecord | StreamRecord;
2209+
export type { StreamRecord };
23582210

2359-
function isStreamPayload(
2360-
asyncPayload: AsyncPayloadRecord,
2361-
): asyncPayload is StreamRecord {
2362-
return asyncPayload.type === 'stream';
2363-
}
2211+
export type AsyncPayloadRecord = DeferredFragmentRecord | StreamRecord;

src/execution/publisher.ts

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import { Path, pathToArray } from '../jsutils/Path.js';
2+
3+
import type {
4+
AsyncPayloadRecord,
5+
StreamRecord,
6+
IncrementalDeferResult,
7+
IncrementalResult,
8+
IncrementalStreamResult,
9+
SubsequentIncrementalExecutionResult,
10+
} from './execute.js';
11+
12+
/**
13+
* @internal
14+
*/
15+
export class Publisher {
16+
subsequentPayloads: Set<AsyncPayloadRecord>;
17+
18+
constructor() {
19+
this.subsequentPayloads = new Set();
20+
}
21+
22+
add(payload: AsyncPayloadRecord) {
23+
this.subsequentPayloads.add(payload);
24+
}
25+
26+
filterSubsequentPayloads(
27+
nullPath: Path,
28+
currentAsyncRecord: AsyncPayloadRecord | undefined,
29+
): void {
30+
const nullPathArray = pathToArray(nullPath);
31+
this.subsequentPayloads.forEach((asyncRecord) => {
32+
if (asyncRecord === currentAsyncRecord) {
33+
// don't remove payload from where error originates
34+
return;
35+
}
36+
for (let i = 0; i < nullPathArray.length; i++) {
37+
if (asyncRecord.path[i] !== nullPathArray[i]) {
38+
// asyncRecord points to a path unaffected by this payload
39+
return;
40+
}
41+
}
42+
// asyncRecord path points to nulled error field
43+
if (isStreamPayload(asyncRecord) && asyncRecord.iterator?.return) {
44+
asyncRecord.iterator.return().catch(() => {
45+
// ignore error
46+
});
47+
}
48+
this.subsequentPayloads.delete(asyncRecord);
49+
});
50+
}
51+
52+
getCompletedIncrementalResults(): Array<IncrementalResult> {
53+
const incrementalResults: Array<IncrementalResult> = [];
54+
for (const asyncPayloadRecord of this.subsequentPayloads) {
55+
const incrementalResult: IncrementalResult = {};
56+
if (!asyncPayloadRecord.isCompleted) {
57+
continue;
58+
}
59+
this.subsequentPayloads.delete(asyncPayloadRecord);
60+
if (isStreamPayload(asyncPayloadRecord)) {
61+
const items = asyncPayloadRecord.items;
62+
if (asyncPayloadRecord.isCompletedIterator) {
63+
// async iterable resolver just finished but there may be pending payloads
64+
continue;
65+
}
66+
(incrementalResult as IncrementalStreamResult).items = items;
67+
} else {
68+
const data = asyncPayloadRecord.data;
69+
(incrementalResult as IncrementalDeferResult).data = data ?? null;
70+
}
71+
72+
incrementalResult.path = asyncPayloadRecord.path;
73+
if (asyncPayloadRecord.label) {
74+
incrementalResult.label = asyncPayloadRecord.label;
75+
}
76+
if (asyncPayloadRecord.errors.length > 0) {
77+
incrementalResult.errors = asyncPayloadRecord.errors;
78+
}
79+
incrementalResults.push(incrementalResult);
80+
}
81+
return incrementalResults;
82+
}
83+
84+
yieldSubsequentPayloads(): AsyncGenerator<
85+
SubsequentIncrementalExecutionResult,
86+
void,
87+
void
88+
> {
89+
let isDone = false;
90+
const publisher = this;
91+
92+
async function next(): Promise<
93+
IteratorResult<SubsequentIncrementalExecutionResult, void>
94+
> {
95+
if (isDone) {
96+
return { value: undefined, done: true };
97+
}
98+
99+
await Promise.race(
100+
Array.from(publisher.subsequentPayloads).map((p) => p.promise),
101+
);
102+
103+
if (isDone) {
104+
// a different call to next has exhausted all payloads
105+
return { value: undefined, done: true };
106+
}
107+
108+
const incremental = publisher.getCompletedIncrementalResults();
109+
const hasNext = publisher.subsequentPayloads.size > 0;
110+
111+
if (!incremental.length && hasNext) {
112+
return next();
113+
}
114+
115+
if (!hasNext) {
116+
isDone = true;
117+
}
118+
119+
return {
120+
value: incremental.length ? { incremental, hasNext } : { hasNext },
121+
done: false,
122+
};
123+
}
124+
125+
function returnStreamIterators() {
126+
const promises: Array<Promise<IteratorResult<unknown>>> = [];
127+
publisher.subsequentPayloads.forEach((asyncPayloadRecord) => {
128+
if (
129+
isStreamPayload(asyncPayloadRecord) &&
130+
asyncPayloadRecord.iterator?.return
131+
) {
132+
promises.push(asyncPayloadRecord.iterator.return());
133+
}
134+
});
135+
return Promise.all(promises);
136+
}
137+
138+
return {
139+
[Symbol.asyncIterator]() {
140+
return this;
141+
},
142+
next,
143+
async return(): Promise<
144+
IteratorResult<SubsequentIncrementalExecutionResult, void>
145+
> {
146+
await returnStreamIterators();
147+
isDone = true;
148+
return { value: undefined, done: true };
149+
},
150+
async throw(
151+
error?: unknown,
152+
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
153+
await returnStreamIterators();
154+
isDone = true;
155+
return Promise.reject(error);
156+
},
157+
};
158+
}
159+
}
160+
161+
function isStreamPayload(
162+
asyncPayload: AsyncPayloadRecord,
163+
): asyncPayload is StreamRecord {
164+
return asyncPayload.type === 'stream';
165+
}

0 commit comments

Comments
 (0)