Skip to content

Commit 9910908

Browse files
committed
add generic ToIncrementalResult and ToPayload methods to Publisher
1 parent f9ff0d4 commit 9910908

File tree

2 files changed

+53
-21
lines changed

2 files changed

+53
-21
lines changed

src/execution/execute.ts

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ export function buildExecutionContext(
497497
fieldResolver: fieldResolver ?? defaultFieldResolver,
498498
typeResolver: typeResolver ?? defaultTypeResolver,
499499
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
500-
publisher: new Publisher(),
500+
publisher: new Publisher(toIncrementalResult, toPayload),
501501
errors: [],
502502
};
503503
}
@@ -509,7 +509,7 @@ function buildPerEventExecutionContext(
509509
return {
510510
...exeContext,
511511
rootValue: payload,
512-
publisher: new Publisher(),
512+
publisher: new Publisher(toIncrementalResult, toPayload),
513513
errors: [],
514514
};
515515
}
@@ -2090,6 +2090,35 @@ async function executeStreamIterator(
20902090
}
20912091
}
20922092

2093+
function toIncrementalResult(
2094+
asyncPayloadRecord: AsyncPayloadRecord,
2095+
): IncrementalResult {
2096+
const incrementalResult: IncrementalResult = {};
2097+
if (isStreamPayload(asyncPayloadRecord)) {
2098+
const items = asyncPayloadRecord.items;
2099+
(incrementalResult as IncrementalStreamResult).items = items;
2100+
} else {
2101+
const data = asyncPayloadRecord.data;
2102+
(incrementalResult as IncrementalDeferResult).data = data ?? null;
2103+
}
2104+
2105+
incrementalResult.path = asyncPayloadRecord.path;
2106+
if (asyncPayloadRecord.label) {
2107+
incrementalResult.label = asyncPayloadRecord.label;
2108+
}
2109+
if (asyncPayloadRecord.errors.length > 0) {
2110+
incrementalResult.errors = asyncPayloadRecord.errors;
2111+
}
2112+
return incrementalResult;
2113+
}
2114+
2115+
function toPayload(
2116+
incremental: ReadonlyArray<IncrementalResult>,
2117+
hasNext: boolean,
2118+
): SubsequentIncrementalExecutionResult {
2119+
return incremental.length ? { incremental, hasNext } : { hasNext };
2120+
}
2121+
20932122
function filterSubsequentPayloads(
20942123
exeContext: ExecutionContext,
20952124
nullPath: Path,
@@ -2216,3 +2245,9 @@ class StreamRecord {
22162245
export type { StreamRecord };
22172246

22182247
export type AsyncPayloadRecord = DeferredFragmentRecord | StreamRecord;
2248+
2249+
function isStreamPayload(
2250+
asyncPayload: AsyncPayloadRecord,
2251+
): asyncPayload is StreamRecord {
2252+
return asyncPayload.type === 'stream';
2253+
}

src/execution/publisher.ts

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,31 @@
11
import type {
22
AsyncPayloadRecord,
33
StreamRecord,
4-
IncrementalDeferResult,
54
IncrementalResult,
6-
IncrementalStreamResult,
75
SubsequentIncrementalExecutionResult,
86
} from './execute.js';
97

8+
type ToIncrementalResult = (
9+
asyncPayloadRecord: AsyncPayloadRecord,
10+
) => IncrementalResult;
11+
12+
type ToPayload = (
13+
incremental: ReadonlyArray<IncrementalResult>,
14+
hasNext: boolean,
15+
) => SubsequentIncrementalExecutionResult;
16+
1017
/**
1118
* @internal
1219
*/
1320
export class Publisher {
1421
subsequentPayloads: Set<AsyncPayloadRecord>;
22+
toIncrementalResult: ToIncrementalResult;
23+
toPayload: ToPayload;
1524

16-
constructor() {
25+
constructor(toIncrementalResult: ToIncrementalResult, toPayload: ToPayload) {
1726
this.subsequentPayloads = new Set();
27+
this.toIncrementalResult = toIncrementalResult;
28+
this.toPayload = toPayload;
1829
}
1930

2031
add(payload: AsyncPayloadRecord) {
@@ -39,31 +50,17 @@ export class Publisher {
3950
getCompletedIncrementalResults(): Array<IncrementalResult> {
4051
const incrementalResults: Array<IncrementalResult> = [];
4152
for (const asyncPayloadRecord of this.subsequentPayloads) {
42-
const incrementalResult: IncrementalResult = {};
4353
if (!asyncPayloadRecord.isCompleted) {
4454
continue;
4555
}
4656
this.subsequentPayloads.delete(asyncPayloadRecord);
4757
if (isStreamPayload(asyncPayloadRecord)) {
48-
const items = asyncPayloadRecord.items;
4958
if (asyncPayloadRecord.isCompletedIterator) {
5059
// async iterable resolver just finished but there may be pending payloads
5160
continue;
5261
}
53-
(incrementalResult as IncrementalStreamResult).items = items;
54-
} else {
55-
const data = asyncPayloadRecord.data;
56-
(incrementalResult as IncrementalDeferResult).data = data ?? null;
57-
}
58-
59-
incrementalResult.path = asyncPayloadRecord.path;
60-
if (asyncPayloadRecord.label) {
61-
incrementalResult.label = asyncPayloadRecord.label;
62-
}
63-
if (asyncPayloadRecord.errors.length > 0) {
64-
incrementalResult.errors = asyncPayloadRecord.errors;
6562
}
66-
incrementalResults.push(incrementalResult);
63+
incrementalResults.push(this.toIncrementalResult(asyncPayloadRecord));
6764
}
6865
return incrementalResults;
6966
}
@@ -104,7 +101,7 @@ export class Publisher {
104101
}
105102

106103
return {
107-
value: incremental.length ? { incremental, hasNext } : { hasNext },
104+
value: publisher.toPayload(incremental, hasNext),
108105
done: false,
109106
};
110107
}

0 commit comments

Comments
 (0)