Skip to content

Commit c537db4

Browse files
committed
move publisher to separate file
1 parent 9204d78 commit c537db4

File tree

2 files changed

+130
-130
lines changed

2 files changed

+130
-130
lines changed

src/execution/execute.ts

Lines changed: 1 addition & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import {
5353
collectSubfields as _collectSubfields,
5454
} from './collectFields.js';
5555
import { mapAsyncIterable } from './mapAsyncIterable.js';
56+
import { Publisher } from './publisher.js';
5657
import {
5758
getArgumentValues,
5859
getDirectiveValues,
@@ -2197,133 +2198,3 @@ function isStreamPayload(
21972198
): asyncPayload is StreamRecord {
21982199
return asyncPayload.type === 'stream';
21992200
}
2200-
2201-
interface Source {
2202-
promise: Promise<void>;
2203-
isCompleted: boolean;
2204-
isCompletedIterator?: boolean | undefined;
2205-
iterator?: AsyncIterator<unknown> | undefined;
2206-
}
2207-
2208-
type ToIncrementalResult<TSource extends Source, TIncremental> = (
2209-
source: TSource,
2210-
) => TIncremental;
2211-
2212-
type ToPayload<TIncremental, TPayload> = (
2213-
incremental: ReadonlyArray<TIncremental>,
2214-
hasNext: boolean,
2215-
) => TPayload;
2216-
2217-
/**
2218-
* @internal
2219-
*/
2220-
export class Publisher<TSource extends Source, TIncremental, TPayload> {
2221-
sources: Set<TSource>;
2222-
toIncrementalResult: ToIncrementalResult<TSource, TIncremental>;
2223-
toPayload: ToPayload<TIncremental, TPayload>;
2224-
2225-
constructor(
2226-
toIncrementalResult: ToIncrementalResult<TSource, TIncremental>,
2227-
toPayload: ToPayload<TIncremental, TPayload>,
2228-
) {
2229-
this.sources = new Set();
2230-
this.toIncrementalResult = toIncrementalResult;
2231-
this.toPayload = toPayload;
2232-
}
2233-
2234-
add(source: TSource) {
2235-
this.sources.add(source);
2236-
}
2237-
2238-
hasNext(): boolean {
2239-
return this.sources.size > 0;
2240-
}
2241-
2242-
filter(predicate: (source: TSource) => boolean): void {
2243-
this.sources.forEach((source) => {
2244-
if (predicate(source)) {
2245-
return;
2246-
}
2247-
if (source.iterator?.return) {
2248-
source.iterator.return().catch(() => {
2249-
// ignore error
2250-
});
2251-
}
2252-
this.sources.delete(source);
2253-
});
2254-
}
2255-
2256-
_getCompletedIncrementalResults(): Array<TIncremental> {
2257-
const incrementalResults: Array<TIncremental> = [];
2258-
for (const source of this.sources) {
2259-
if (!source.isCompleted) {
2260-
continue;
2261-
}
2262-
this.sources.delete(source);
2263-
if (source.isCompletedIterator) {
2264-
continue;
2265-
}
2266-
incrementalResults.push(this.toIncrementalResult(source));
2267-
}
2268-
return incrementalResults;
2269-
}
2270-
2271-
subscribe(): AsyncGenerator<TPayload, void, void> {
2272-
let isDone = false;
2273-
2274-
const next = async (): Promise<IteratorResult<TPayload, void>> => {
2275-
if (isDone) {
2276-
return { value: undefined, done: true };
2277-
}
2278-
2279-
await Promise.race(Array.from(this.sources).map((p) => p.promise));
2280-
2281-
if (isDone) {
2282-
return { value: undefined, done: true };
2283-
}
2284-
2285-
const incremental = this._getCompletedIncrementalResults();
2286-
const hasNext = this.sources.size > 0;
2287-
2288-
if (!incremental.length && hasNext) {
2289-
return next();
2290-
}
2291-
2292-
if (!hasNext) {
2293-
isDone = true;
2294-
}
2295-
2296-
return {
2297-
value: this.toPayload(incremental, hasNext),
2298-
done: false,
2299-
};
2300-
};
2301-
2302-
const returnIterators = () => {
2303-
const promises: Array<Promise<IteratorResult<unknown>>> = [];
2304-
this.sources.forEach((source) => {
2305-
if (source.iterator?.return) {
2306-
promises.push(source.iterator.return());
2307-
}
2308-
});
2309-
return Promise.all(promises);
2310-
};
2311-
2312-
return {
2313-
[Symbol.asyncIterator]() {
2314-
return this;
2315-
},
2316-
next,
2317-
async return(): Promise<IteratorResult<TPayload, void>> {
2318-
await returnIterators();
2319-
isDone = true;
2320-
return { value: undefined, done: true };
2321-
},
2322-
async throw(error?: unknown): Promise<IteratorResult<TPayload, void>> {
2323-
await returnIterators();
2324-
isDone = true;
2325-
return Promise.reject(error);
2326-
},
2327-
};
2328-
}
2329-
}

src/execution/publisher.ts

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
interface Source {
2+
promise: Promise<void>;
3+
isCompleted: boolean;
4+
isCompletedIterator?: boolean | undefined;
5+
iterator?: AsyncIterator<unknown> | undefined;
6+
}
7+
8+
type ToIncrementalResult<TSource extends Source, TIncremental> = (
9+
source: TSource,
10+
) => TIncremental;
11+
12+
type ToPayload<TIncremental, TPayload> = (
13+
incremental: ReadonlyArray<TIncremental>,
14+
hasNext: boolean,
15+
) => TPayload;
16+
17+
/**
18+
* @internal
19+
*/
20+
export class Publisher<TSource extends Source, TIncremental, TPayload> {
21+
sources: Set<TSource>;
22+
toIncrementalResult: ToIncrementalResult<TSource, TIncremental>;
23+
toPayload: ToPayload<TIncremental, TPayload>;
24+
25+
constructor(
26+
toIncrementalResult: ToIncrementalResult<TSource, TIncremental>,
27+
toPayload: ToPayload<TIncremental, TPayload>,
28+
) {
29+
this.sources = new Set();
30+
this.toIncrementalResult = toIncrementalResult;
31+
this.toPayload = toPayload;
32+
}
33+
34+
add(source: TSource) {
35+
this.sources.add(source);
36+
}
37+
38+
hasNext(): boolean {
39+
return this.sources.size > 0;
40+
}
41+
42+
filter(predicate: (source: TSource) => boolean): void {
43+
this.sources.forEach((source) => {
44+
if (predicate(source)) {
45+
return;
46+
}
47+
if (source.iterator?.return) {
48+
source.iterator.return().catch(() => {
49+
// ignore error
50+
});
51+
}
52+
this.sources.delete(source);
53+
});
54+
}
55+
56+
_getCompletedIncrementalResults(): Array<TIncremental> {
57+
const incrementalResults: Array<TIncremental> = [];
58+
for (const source of this.sources) {
59+
if (!source.isCompleted) {
60+
continue;
61+
}
62+
this.sources.delete(source);
63+
if (source.isCompletedIterator) {
64+
continue;
65+
}
66+
incrementalResults.push(this.toIncrementalResult(source));
67+
}
68+
return incrementalResults;
69+
}
70+
71+
subscribe(): AsyncGenerator<TPayload, void, void> {
72+
let isDone = false;
73+
74+
const next = async (): Promise<IteratorResult<TPayload, void>> => {
75+
if (isDone) {
76+
return { value: undefined, done: true };
77+
}
78+
79+
await Promise.race(Array.from(this.sources).map((p) => p.promise));
80+
81+
if (isDone) {
82+
return { value: undefined, done: true };
83+
}
84+
85+
const incremental = this._getCompletedIncrementalResults();
86+
const hasNext = this.sources.size > 0;
87+
88+
if (!incremental.length && hasNext) {
89+
return next();
90+
}
91+
92+
if (!hasNext) {
93+
isDone = true;
94+
}
95+
96+
return {
97+
value: this.toPayload(incremental, hasNext),
98+
done: false,
99+
};
100+
};
101+
102+
const returnIterators = () => {
103+
const promises: Array<Promise<IteratorResult<unknown>>> = [];
104+
this.sources.forEach((source) => {
105+
if (source.iterator?.return) {
106+
promises.push(source.iterator.return());
107+
}
108+
});
109+
return Promise.all(promises);
110+
};
111+
112+
return {
113+
[Symbol.asyncIterator]() {
114+
return this;
115+
},
116+
next,
117+
async return(): Promise<IteratorResult<TPayload, void>> {
118+
await returnIterators();
119+
isDone = true;
120+
return { value: undefined, done: true };
121+
},
122+
async throw(error?: unknown): Promise<IteratorResult<TPayload, void>> {
123+
await returnIterators();
124+
isDone = true;
125+
return Promise.reject(error);
126+
},
127+
};
128+
}
129+
}

0 commit comments

Comments
 (0)