Skip to content

Commit 381fd53

Browse files
committed
make Publisher generic
in terms of sources, incremental results, and final payload
1 parent 743f53e commit 381fd53

File tree

2 files changed

+61
-82
lines changed

2 files changed

+61
-82
lines changed

src/execution/execute.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,11 @@ export interface ExecutionContext {
122122
typeResolver: GraphQLTypeResolver<any, any>;
123123
subscribeFieldResolver: GraphQLFieldResolver<any, any>;
124124
errors: Array<GraphQLError>;
125-
publisher: Publisher;
125+
publisher: Publisher<
126+
AsyncPayloadRecord,
127+
IncrementalResult,
128+
SubsequentIncrementalExecutionResult
129+
>;
126130
}
127131

128132
/**
@@ -365,7 +369,7 @@ function executeImpl(
365369
...initialResult,
366370
hasNext: true,
367371
},
368-
subsequentResults: publisher.yieldSubsequentPayloads(),
372+
subsequentResults: publisher.subscribe(),
369373
};
370374
}
371375
return initialResult;
@@ -384,7 +388,7 @@ function executeImpl(
384388
...initialResult,
385389
hasNext: true,
386390
},
387-
subsequentResults: publisher.yieldSubsequentPayloads(),
391+
subsequentResults: publisher.subscribe(),
388392
};
389393
}
390394
return initialResult;
@@ -2187,9 +2191,7 @@ class StreamRecord {
21872191
}
21882192
}
21892193

2190-
export type { StreamRecord };
2191-
2192-
export type AsyncPayloadRecord = DeferredFragmentRecord | StreamRecord;
2194+
type AsyncPayloadRecord = DeferredFragmentRecord | StreamRecord;
21932195

21942196
function isStreamPayload(
21952197
asyncPayload: AsyncPayloadRecord,

src/execution/publisher.ts

Lines changed: 53 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,100 +1,90 @@
1-
import type {
2-
AsyncPayloadRecord,
3-
StreamRecord,
4-
IncrementalResult,
5-
SubsequentIncrementalExecutionResult,
6-
} from './execute.js';
7-
8-
type ToIncrementalResult = (
9-
asyncPayloadRecord: AsyncPayloadRecord,
10-
) => IncrementalResult;
11-
12-
type ToPayload = (
13-
incremental: ReadonlyArray<IncrementalResult>,
1+
interface Source {
2+
promise: Promise<void>;
3+
isCompleted: boolean;
4+
isCompletedIterator?: boolean | undefined;
5+
iterator?: AsyncIterator<unknown> | undefined;
6+
}
7+
8+
type ToIncrementalResult<TSource extends Source, TIncremental> = (
9+
source: TSource,
10+
) => TIncremental;
11+
12+
type ToPayload<TIncremental, TPayload> = (
13+
incremental: ReadonlyArray<TIncremental>,
1414
hasNext: boolean,
15-
) => SubsequentIncrementalExecutionResult;
15+
) => TPayload;
1616

1717
/**
1818
* @internal
1919
*/
20-
export class Publisher {
21-
subsequentPayloads: Set<AsyncPayloadRecord>;
22-
toIncrementalResult: ToIncrementalResult;
23-
toPayload: ToPayload;
24-
25-
constructor(toIncrementalResult: ToIncrementalResult, toPayload: ToPayload) {
26-
this.subsequentPayloads = new Set();
20+
export class Publisher<TSource extends Source, TIncremental, TPayload> {
21+
sources: Set<TSource>;
22+
toIncrementalResult: ToIncrementalResult<TSource, TIncremental>;
23+
toPayload: ToPayload<TIncremental, TPayload>;
24+
25+
constructor(
26+
toIncrementalResult: ToIncrementalResult<TSource, TIncremental>,
27+
toPayload: ToPayload<TIncremental, TPayload>,
28+
) {
29+
this.sources = new Set();
2730
this.toIncrementalResult = toIncrementalResult;
2831
this.toPayload = toPayload;
2932
}
3033

31-
add(payload: AsyncPayloadRecord) {
32-
this.subsequentPayloads.add(payload);
34+
add(source: TSource) {
35+
this.sources.add(source);
3336
}
3437

3538
hasNext(): boolean {
36-
return this.subsequentPayloads.size > 0;
39+
return this.sources.size > 0;
3740
}
3841

39-
filter(predicate: (payload: AsyncPayloadRecord) => boolean): void {
40-
this.subsequentPayloads.forEach((asyncRecord) => {
41-
if (predicate(asyncRecord)) {
42+
filter(predicate: (source: TSource) => boolean): void {
43+
this.sources.forEach((source) => {
44+
if (predicate(source)) {
4245
return;
4346
}
44-
// asyncRecord path points to nulled error field
45-
if (isStreamPayload(asyncRecord) && asyncRecord.iterator?.return) {
46-
asyncRecord.iterator.return().catch(() => {
47+
if (source.iterator?.return) {
48+
source.iterator.return().catch(() => {
4749
// ignore error
4850
});
4951
}
50-
this.subsequentPayloads.delete(asyncRecord);
52+
this.sources.delete(source);
5153
});
5254
}
5355

54-
getCompletedIncrementalResults(): Array<IncrementalResult> {
55-
const incrementalResults: Array<IncrementalResult> = [];
56-
for (const asyncPayloadRecord of this.subsequentPayloads) {
57-
if (!asyncPayloadRecord.isCompleted) {
56+
getCompletedIncrementalResults(): Array<TIncremental> {
57+
const incrementalResults: Array<TIncremental> = [];
58+
for (const source of this.sources) {
59+
if (!source.isCompleted) {
5860
continue;
5961
}
60-
this.subsequentPayloads.delete(asyncPayloadRecord);
61-
if (isStreamPayload(asyncPayloadRecord)) {
62-
if (asyncPayloadRecord.isCompletedIterator) {
63-
// async iterable resolver just finished but there may be pending payloads
64-
continue;
65-
}
62+
this.sources.delete(source);
63+
if (source.isCompletedIterator) {
64+
continue;
6665
}
67-
incrementalResults.push(this.toIncrementalResult(asyncPayloadRecord));
66+
incrementalResults.push(this.toIncrementalResult(source));
6867
}
6968
return incrementalResults;
7069
}
7170

72-
yieldSubsequentPayloads(): AsyncGenerator<
73-
SubsequentIncrementalExecutionResult,
74-
void,
75-
void
76-
> {
71+
subscribe(): AsyncGenerator<TPayload, void, void> {
7772
let isDone = false;
7873
const publisher = this;
7974

80-
async function next(): Promise<
81-
IteratorResult<SubsequentIncrementalExecutionResult, void>
82-
> {
75+
async function next(): Promise<IteratorResult<TPayload, void>> {
8376
if (isDone) {
8477
return { value: undefined, done: true };
8578
}
8679

87-
await Promise.race(
88-
Array.from(publisher.subsequentPayloads).map((p) => p.promise),
89-
);
80+
await Promise.race(Array.from(publisher.sources).map((p) => p.promise));
9081

9182
if (isDone) {
92-
// a different call to next has exhausted all payloads
9383
return { value: undefined, done: true };
9484
}
9585

9686
const incremental = publisher.getCompletedIncrementalResults();
97-
const hasNext = publisher.subsequentPayloads.size > 0;
87+
const hasNext = publisher.sources.size > 0;
9888

9989
if (!incremental.length && hasNext) {
10090
return next();
@@ -110,14 +100,11 @@ export class Publisher {
110100
};
111101
}
112102

113-
function returnStreamIterators() {
103+
function returnIterators() {
114104
const promises: Array<Promise<IteratorResult<unknown>>> = [];
115-
publisher.subsequentPayloads.forEach((asyncPayloadRecord) => {
116-
if (
117-
isStreamPayload(asyncPayloadRecord) &&
118-
asyncPayloadRecord.iterator?.return
119-
) {
120-
promises.push(asyncPayloadRecord.iterator.return());
105+
publisher.sources.forEach((source) => {
106+
if (source.iterator?.return) {
107+
promises.push(source.iterator.return());
121108
}
122109
});
123110
return Promise.all(promises);
@@ -128,26 +115,16 @@ export class Publisher {
128115
return this;
129116
},
130117
next,
131-
async return(): Promise<
132-
IteratorResult<SubsequentIncrementalExecutionResult, void>
133-
> {
134-
await returnStreamIterators();
118+
async return(): Promise<IteratorResult<TPayload, void>> {
119+
await returnIterators();
135120
isDone = true;
136121
return { value: undefined, done: true };
137122
},
138-
async throw(
139-
error?: unknown,
140-
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> {
141-
await returnStreamIterators();
123+
async throw(error?: unknown): Promise<IteratorResult<TPayload, void>> {
124+
await returnIterators();
142125
isDone = true;
143126
return Promise.reject(error);
144127
},
145128
};
146129
}
147130
}
148-
149-
function isStreamPayload(
150-
asyncPayload: AsyncPayloadRecord,
151-
): asyncPayload is StreamRecord {
152-
return asyncPayload.type === 'stream';
153-
}

0 commit comments

Comments
 (0)