Skip to content

Commit 598608e

Browse files
committed
the IncrementalPublisher should handle response building
1 parent fae5da5 commit 598608e

File tree

10 files changed

+190
-215
lines changed

10 files changed

+190
-215
lines changed

src/execution/IncrementalPublisher.ts

Lines changed: 161 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,63 @@ import type {
88
GraphQLFormattedError,
99
} from '../error/GraphQLError.js';
1010

11+
/**
12+
* The result of GraphQL execution.
13+
*
14+
* - `errors` is included when any errors occurred as a non-empty array.
15+
* - `data` is the result of a successful execution of the query.
16+
* - `hasNext` is true if a future payload is expected.
17+
* - `extensions` is reserved for adding non-standard properties.
18+
* - `incremental` is a list of the results from defer/stream directives.
19+
*/
20+
export interface ExecutionResult<
21+
TData = ObjMap<unknown>,
22+
TExtensions = ObjMap<unknown>,
23+
> {
24+
errors?: ReadonlyArray<GraphQLError>;
25+
data?: TData | null;
26+
extensions?: TExtensions;
27+
}
28+
29+
export interface FormattedExecutionResult<
30+
TData = ObjMap<unknown>,
31+
TExtensions = ObjMap<unknown>,
32+
> {
33+
errors?: ReadonlyArray<GraphQLFormattedError>;
34+
data?: TData | null;
35+
extensions?: TExtensions;
36+
}
37+
38+
export interface ExperimentalIncrementalExecutionResults<
39+
TData = ObjMap<unknown>,
40+
TExtensions = ObjMap<unknown>,
41+
> {
42+
initialResult: InitialIncrementalExecutionResult<TData, TExtensions>;
43+
subsequentResults: AsyncGenerator<
44+
SubsequentIncrementalExecutionResult<TData, TExtensions>,
45+
void,
46+
void
47+
>;
48+
}
49+
50+
export interface InitialIncrementalExecutionResult<
51+
TData = ObjMap<unknown>,
52+
TExtensions = ObjMap<unknown>,
53+
> extends ExecutionResult<TData, TExtensions> {
54+
hasNext: boolean;
55+
incremental?: ReadonlyArray<IncrementalResult<TData, TExtensions>>;
56+
extensions?: TExtensions;
57+
}
58+
59+
export interface FormattedInitialIncrementalExecutionResult<
60+
TData = ObjMap<unknown>,
61+
TExtensions = ObjMap<unknown>,
62+
> extends FormattedExecutionResult<TData, TExtensions> {
63+
hasNext: boolean;
64+
incremental?: ReadonlyArray<FormattedIncrementalResult<TData, TExtensions>>;
65+
extensions?: TExtensions;
66+
}
67+
1168
export interface SubsequentIncrementalExecutionResult<
1269
TData = ObjMap<unknown>,
1370
TExtensions = ObjMap<unknown>,
@@ -113,86 +170,6 @@ export class IncrementalPublisher {
113170
this._reset();
114171
}
115172

116-
hasNext(): boolean {
117-
return this._pending.size > 0;
118-
}
119-
120-
subscribe(): AsyncGenerator<
121-
SubsequentIncrementalExecutionResult,
122-
void,
123-
void
124-
> {
125-
let isDone = false;
126-
127-
const _next = async (): Promise<
128-
IteratorResult<SubsequentIncrementalExecutionResult, void>
129-
> => {
130-
// eslint-disable-next-line no-constant-condition
131-
while (true) {
132-
if (isDone) {
133-
return { value: undefined, done: true };
134-
}
135-
136-
for (const item of this._released) {
137-
this._pending.delete(item);
138-
}
139-
const released = this._released;
140-
this._released = new Set();
141-
142-
const result = this._getIncrementalResult(released);
143-
144-
if (!this.hasNext()) {
145-
isDone = true;
146-
}
147-
148-
if (result !== undefined) {
149-
return { value: result, done: false };
150-
}
151-
152-
// eslint-disable-next-line no-await-in-loop
153-
await this._signalled;
154-
}
155-
};
156-
157-
const returnStreamIterators = async (): Promise<void> => {
158-
const promises: Array<Promise<IteratorResult<unknown>>> = [];
159-
this._pending.forEach((incrementalDataRecord) => {
160-
if (
161-
isStreamItemsRecord(incrementalDataRecord) &&
162-
incrementalDataRecord.asyncIterator?.return
163-
) {
164-
promises.push(incrementalDataRecord.asyncIterator.return());
165-
}
166-
});
167-
await Promise.all(promises);
168-
};
169-
170-
const _return = async (): Promise<
171-
IteratorResult<SubsequentIncrementalExecutionResult, void>
172-
> => {
173-
isDone = true;
174-
await returnStreamIterators();
175-
return { value: undefined, done: true };
176-
};
177-
178-
const _throw = async (
179-
error?: unknown,
180-
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
181-
isDone = true;
182-
await returnStreamIterators();
183-
return Promise.reject(error);
184-
};
185-
186-
return {
187-
[Symbol.asyncIterator]() {
188-
return this;
189-
},
190-
next: _next,
191-
return: _return,
192-
throw: _throw,
193-
};
194-
}
195-
196173
prepareInitialResultRecord(): InitialResultRecord {
197174
return {
198175
errors: [],
@@ -256,19 +233,38 @@ export class IncrementalPublisher {
256233
incrementalDataRecord.errors.push(error);
257234
}
258235

259-
publishInitial(initialResult: InitialResultRecord) {
260-
for (const child of initialResult.children) {
236+
buildDataResponse(
237+
initialResultRecord: InitialResultRecord,
238+
data: ObjMap<unknown> | null,
239+
): ExecutionResult | ExperimentalIncrementalExecutionResults {
240+
for (const child of initialResultRecord.children) {
261241
if (child.filtered) {
262242
continue;
263243
}
264244
this._publish(child);
265245
}
246+
247+
const errors = initialResultRecord.errors;
248+
const initialResult = errors.length === 0 ? { data } : { errors, data };
249+
if (this._pending.size > 0) {
250+
return {
251+
initialResult: {
252+
...initialResult,
253+
hasNext: true,
254+
},
255+
subsequentResults: this._subscribe(),
256+
};
257+
}
258+
return initialResult;
266259
}
267260

268-
getInitialErrors(
269-
initialResult: InitialResultRecord,
270-
): ReadonlyArray<GraphQLError> {
271-
return initialResult.errors;
261+
buildErrorResponse(
262+
initialResultRecord: InitialResultRecord,
263+
error: GraphQLError,
264+
): ExecutionResult {
265+
const errors = initialResultRecord.errors;
266+
errors.push(error);
267+
return { data: null, errors };
272268
}
273269

274270
filter(nullPath: Path, erroringIncrementalDataRecord: IncrementalDataRecord) {
@@ -301,6 +297,82 @@ export class IncrementalPublisher {
301297
});
302298
}
303299

300+
private _subscribe(): AsyncGenerator<
301+
SubsequentIncrementalExecutionResult,
302+
void,
303+
void
304+
> {
305+
let isDone = false;
306+
307+
const _next = async (): Promise<
308+
IteratorResult<SubsequentIncrementalExecutionResult, void>
309+
> => {
310+
// eslint-disable-next-line no-constant-condition
311+
while (true) {
312+
if (isDone) {
313+
return { value: undefined, done: true };
314+
}
315+
316+
for (const item of this._released) {
317+
this._pending.delete(item);
318+
}
319+
const released = this._released;
320+
this._released = new Set();
321+
322+
const result = this._getIncrementalResult(released);
323+
324+
if (this._pending.size === 0) {
325+
isDone = true;
326+
}
327+
328+
if (result !== undefined) {
329+
return { value: result, done: false };
330+
}
331+
332+
// eslint-disable-next-line no-await-in-loop
333+
await this._signalled;
334+
}
335+
};
336+
337+
const returnStreamIterators = async (): Promise<void> => {
338+
const promises: Array<Promise<IteratorResult<unknown>>> = [];
339+
this._pending.forEach((incrementalDataRecord) => {
340+
if (
341+
isStreamItemsRecord(incrementalDataRecord) &&
342+
incrementalDataRecord.asyncIterator?.return
343+
) {
344+
promises.push(incrementalDataRecord.asyncIterator.return());
345+
}
346+
});
347+
await Promise.all(promises);
348+
};
349+
350+
const _return = async (): Promise<
351+
IteratorResult<SubsequentIncrementalExecutionResult, void>
352+
> => {
353+
isDone = true;
354+
await returnStreamIterators();
355+
return { value: undefined, done: true };
356+
};
357+
358+
const _throw = async (
359+
error?: unknown,
360+
): Promise<IteratorResult<SubsequentIncrementalExecutionResult, void>> => {
361+
isDone = true;
362+
await returnStreamIterators();
363+
return Promise.reject(error);
364+
};
365+
366+
return {
367+
[Symbol.asyncIterator]() {
368+
return this;
369+
},
370+
next: _next,
371+
return: _return,
372+
throw: _throw,
373+
};
374+
}
375+
304376
private _trigger() {
305377
this._resolve();
306378
this._reset();
@@ -368,9 +440,10 @@ export class IncrementalPublisher {
368440
incrementalResults.push(incrementalResult);
369441
}
370442

443+
const hasNext = this._pending.size > 0;
371444
return incrementalResults.length
372-
? { incremental: incrementalResults, hasNext: this.hasNext() }
373-
: encounteredCompletedAsyncIterator && !this.hasNext()
445+
? { incremental: incrementalResults, hasNext }
446+
: encounteredCompletedAsyncIterator && !hasNext
374447
? { hasNext: false }
375448
: undefined;
376449
}

src/execution/__tests__/defer-test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ import {
1616
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
1717
import { GraphQLSchema } from '../../type/schema.js';
1818

19-
import type { InitialIncrementalExecutionResult } from '../execute.js';
2019
import { execute, experimentalExecuteIncrementally } from '../execute.js';
21-
import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js';
20+
import type {
21+
InitialIncrementalExecutionResult,
22+
SubsequentIncrementalExecutionResult,
23+
} from '../IncrementalPublisher.js';
2224

2325
const friendType = new GraphQLObjectType({
2426
fields: {

src/execution/__tests__/lists-test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ import { GraphQLSchema } from '../../type/schema.js';
1818

1919
import { buildSchema } from '../../utilities/buildASTSchema.js';
2020

21-
import type { ExecutionResult } from '../execute.js';
2221
import { execute, executeSync } from '../execute.js';
22+
import type { ExecutionResult } from '../IncrementalPublisher.js';
2323

2424
describe('Execute: Accepts any iterable as list value', () => {
2525
function complete(rootValue: unknown) {

src/execution/__tests__/nonnull-test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ import { GraphQLSchema } from '../../type/schema.js';
1313

1414
import { buildSchema } from '../../utilities/buildASTSchema.js';
1515

16-
import type { ExecutionResult } from '../execute.js';
1716
import { execute, executeSync } from '../execute.js';
17+
import type { ExecutionResult } from '../IncrementalPublisher.js';
1818

1919
const syncError = new Error('sync');
2020
const syncNonNullError = new Error('syncNonNull');

src/execution/__tests__/oneof-test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import { parse } from '../../language/parser.js';
66

77
import { buildSchema } from '../../utilities/buildASTSchema.js';
88

9-
import type { ExecutionResult } from '../execute.js';
109
import { execute } from '../execute.js';
10+
import type { ExecutionResult } from '../IncrementalPublisher.js';
1111

1212
const schema = buildSchema(`
1313
type Query {

src/execution/__tests__/stream-test.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ import {
1717
import { GraphQLID, GraphQLString } from '../../type/scalars.js';
1818
import { GraphQLSchema } from '../../type/schema.js';
1919

20-
import type { InitialIncrementalExecutionResult } from '../execute.js';
2120
import { experimentalExecuteIncrementally } from '../execute.js';
22-
import type { SubsequentIncrementalExecutionResult } from '../IncrementalPublisher.js';
21+
import type {
22+
InitialIncrementalExecutionResult,
23+
SubsequentIncrementalExecutionResult,
24+
} from '../IncrementalPublisher.js';
2325

2426
const friendType = new GraphQLObjectType({
2527
fields: {

src/execution/__tests__/subscribe-test.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ import {
2020
} from '../../type/scalars.js';
2121
import { GraphQLSchema } from '../../type/schema.js';
2222

23-
import type { ExecutionArgs, ExecutionResult } from '../execute.js';
23+
import type { ExecutionArgs } from '../execute.js';
2424
import { createSourceEventStream, subscribe } from '../execute.js';
25+
import type { ExecutionResult } from '../IncrementalPublisher.js';
2526

2627
import { SimplePubSub } from './simplePubSub.js';
2728

0 commit comments

Comments
 (0)