Skip to content

Commit 4dbdc64

Browse files
committed
add generic ToIncrementalResult and ToPayload methods to Publisher
1 parent 51dfa20 commit 4dbdc64

File tree

2 files changed

+53
-21
lines changed

2 files changed

+53
-21
lines changed

src/execution/execute.ts

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ export function buildExecutionContext(
506506
fieldResolver: fieldResolver ?? defaultFieldResolver,
507507
typeResolver: typeResolver ?? defaultTypeResolver,
508508
subscribeFieldResolver: subscribeFieldResolver ?? defaultFieldResolver,
509-
publisher: new Publisher(),
509+
publisher: new Publisher(toIncrementalResult, toPayload),
510510
errors: [],
511511
};
512512
}
@@ -518,7 +518,7 @@ function buildPerEventExecutionContext(
518518
return {
519519
...exeContext,
520520
rootValue: payload,
521-
publisher: new Publisher(),
521+
publisher: new Publisher(toIncrementalResult, toPayload),
522522
errors: [],
523523
};
524524
}
@@ -2035,6 +2035,35 @@ async function executeStreamIterator(
20352035
}
20362036
}
20372037

2038+
function toIncrementalResult(
2039+
asyncPayloadRecord: AsyncPayloadRecord,
2040+
): IncrementalResult {
2041+
const incrementalResult: IncrementalResult = {};
2042+
if (isStreamPayload(asyncPayloadRecord)) {
2043+
const items = asyncPayloadRecord.items;
2044+
(incrementalResult as IncrementalStreamResult).items = items;
2045+
} else {
2046+
const data = asyncPayloadRecord.data;
2047+
(incrementalResult as IncrementalDeferResult).data = data ?? null;
2048+
}
2049+
2050+
incrementalResult.path = asyncPayloadRecord.path;
2051+
if (asyncPayloadRecord.label) {
2052+
incrementalResult.label = asyncPayloadRecord.label;
2053+
}
2054+
if (asyncPayloadRecord.errors.length > 0) {
2055+
incrementalResult.errors = asyncPayloadRecord.errors;
2056+
}
2057+
return incrementalResult;
2058+
}
2059+
2060+
function toPayload(
2061+
incremental: ReadonlyArray<IncrementalResult>,
2062+
hasNext: boolean,
2063+
): SubsequentIncrementalExecutionResult {
2064+
return incremental.length ? { incremental, hasNext } : { hasNext };
2065+
}
2066+
20382067
function filterSubsequentPayloads(
20392068
exeContext: ExecutionContext,
20402069
nullPath: Path,
@@ -2161,3 +2190,9 @@ class StreamRecord {
21612190
export type { StreamRecord };
21622191

21632192
export type AsyncPayloadRecord = DeferredFragmentRecord | StreamRecord;
2193+
2194+
function isStreamPayload(
2195+
asyncPayload: AsyncPayloadRecord,
2196+
): asyncPayload is StreamRecord {
2197+
return asyncPayload.type === 'stream';
2198+
}

src/execution/publisher.ts

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,31 @@
11
import type {
22
AsyncPayloadRecord,
33
StreamRecord,
4-
IncrementalDeferResult,
54
IncrementalResult,
6-
IncrementalStreamResult,
75
SubsequentIncrementalExecutionResult,
86
} from './execute.js';
97

8+
type ToIncrementalResult = (
9+
asyncPayloadRecord: AsyncPayloadRecord,
10+
) => IncrementalResult;
11+
12+
type ToPayload = (
13+
incremental: ReadonlyArray<IncrementalResult>,
14+
hasNext: boolean,
15+
) => SubsequentIncrementalExecutionResult;
16+
1017
/**
1118
* @internal
1219
*/
1320
export class Publisher {
1421
subsequentPayloads: Set<AsyncPayloadRecord>;
22+
toIncrementalResult: ToIncrementalResult;
23+
toPayload: ToPayload;
1524

16-
constructor() {
25+
constructor(toIncrementalResult: ToIncrementalResult, toPayload: ToPayload) {
1726
this.subsequentPayloads = new Set();
27+
this.toIncrementalResult = toIncrementalResult;
28+
this.toPayload = toPayload;
1829
}
1930

2031
add(payload: AsyncPayloadRecord) {
@@ -39,31 +50,17 @@ export class Publisher {
3950
getCompletedIncrementalResults(): Array<IncrementalResult> {
4051
const incrementalResults: Array<IncrementalResult> = [];
4152
for (const asyncPayloadRecord of this.subsequentPayloads) {
42-
const incrementalResult: IncrementalResult = {};
4353
if (!asyncPayloadRecord.isCompleted) {
4454
continue;
4555
}
4656
this.subsequentPayloads.delete(asyncPayloadRecord);
4757
if (isStreamPayload(asyncPayloadRecord)) {
48-
const items = asyncPayloadRecord.items;
4958
if (asyncPayloadRecord.isCompletedIterator) {
5059
// async iterable resolver just finished but there may be pending payloads
5160
continue;
5261
}
53-
(incrementalResult as IncrementalStreamResult).items = items;
54-
} else {
55-
const data = asyncPayloadRecord.data;
56-
(incrementalResult as IncrementalDeferResult).data = data ?? null;
57-
}
58-
59-
incrementalResult.path = asyncPayloadRecord.path;
60-
if (asyncPayloadRecord.label) {
61-
incrementalResult.label = asyncPayloadRecord.label;
62-
}
63-
if (asyncPayloadRecord.errors.length > 0) {
64-
incrementalResult.errors = asyncPayloadRecord.errors;
6562
}
66-
incrementalResults.push(incrementalResult);
63+
incrementalResults.push(this.toIncrementalResult(asyncPayloadRecord));
6764
}
6865
return incrementalResults;
6966
}
@@ -104,7 +101,7 @@ export class Publisher {
104101
}
105102

106103
return {
107-
value: incremental.length ? { incremental, hasNext } : { hasNext },
104+
value: publisher.toPayload(incremental, hasNext),
108105
done: false,
109106
};
110107
}

0 commit comments

Comments
 (0)