Skip to content

Commit d417920

Browse files
authored
Make Result[Symbol.asyncIterator].return wait for Summary (#965)
The lack of result wait for result summary was causing confusion with API usage. When `await it.return()` is called, the caller expect the result summary to be return by the method call.
1 parent dca78b3 commit d417920

File tree

2 files changed

+68
-18
lines changed

2 files changed

+68
-18
lines changed

packages/core/src/result.ts

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ interface ResultObserver {
9999
*/
100100
interface QueuedResultObserver extends ResultObserver {
101101
dequeue: () => Promise<IteratorResult<Record, ResultSummary>>
102+
dequeueUntilDone: () => Promise<IteratorResult<Record, ResultSummary>>
102103
head: () => Promise<IteratorResult<Record, ResultSummary>>
103104
size: number
104105
}
@@ -308,11 +309,19 @@ class Result implements Promise<QueryResult> {
308309
}
309310
return next
310311
},
311-
return: async (value: ResultSummary) => {
312-
state.finished = true
313-
state.summary = value
312+
return: async (value?: ResultSummary) => {
313+
if (state.finished) {
314+
if (assertSummary(state.summary)) {
315+
return { done: true, value: value ?? state.summary }
316+
}
317+
}
314318
state.streaming?.cancel()
315-
return { done: true, value: value }
319+
const queuedObserver = await initializeObserver()
320+
const last = await queuedObserver.dequeueUntilDone()
321+
state.finished = true
322+
last.value = value ?? last.value
323+
state.summary = last.value as ResultSummary
324+
return last
316325
},
317326
peek: async () => {
318327
if (state.finished) {
@@ -540,6 +549,19 @@ class Result implements Promise<QueryResult> {
540549
return elementOrError instanceof Error
541550
}
542551

552+
async function dequeue (): Promise<IteratorResult<Record, ResultSummary>> {
553+
if (buffer.length > 0) {
554+
const element = buffer.shift() ?? newError('Unexpected empty buffer', PROTOCOL_ERROR)
555+
onQueueSizeChanged()
556+
if (isError(element)) {
557+
throw element
558+
}
559+
return element
560+
}
561+
promiseHolder.resolvable = createResolvablePromise()
562+
return await promiseHolder.resolvable.promise
563+
}
564+
543565
const buffer: QueuedResultElementOrError[] = []
544566
const promiseHolder: {
545567
resolvable: ResolvablePromise<IteratorResult<Record, ResultSummary>> | null
@@ -569,17 +591,14 @@ class Result implements Promise<QueryResult> {
569591
onQueueSizeChanged()
570592
}
571593
},
572-
dequeue: async () => {
573-
if (buffer.length > 0) {
574-
const element = buffer.shift() ?? newError('Unexpected empty buffer', PROTOCOL_ERROR)
575-
onQueueSizeChanged()
576-
if (isError(element)) {
577-
throw element
594+
dequeue: dequeue,
595+
dequeueUntilDone: async () => {
596+
while (true) {
597+
const element = await dequeue()
598+
if (element.done === true) {
599+
return element
578600
}
579-
return element
580601
}
581-
promiseHolder.resolvable = createResolvablePromise()
582-
return await promiseHolder.resolvable.promise
583602
},
584603
head: async () => {
585604
if (buffer.length > 0) {

packages/core/test/result.test.ts

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -939,8 +939,33 @@ describe('Result', () => {
939939
const it = result[Symbol.asyncIterator]()
940940
await it.next()
941941
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
942-
const { value, done } = await it.return!(summary)
942+
const summaryPromise = it.return!(summary)
943943

944+
streamObserverMock.onCompleted({})
945+
946+
const { value, done } = await summaryPromise
947+
expect(value).toBe(summary)
948+
expect(done).toEqual(true)
949+
})
950+
951+
it('should return resultant summary when it get called without params', async () => {
952+
const keys = ['a', 'b']
953+
const rawRecord1 = [1, 2]
954+
const rawRecord2 = [3, 4]
955+
const summary = new ResultSummary('query', {}, {})
956+
957+
streamObserverMock.onKeys(keys)
958+
streamObserverMock.onNext(rawRecord1)
959+
streamObserverMock.onNext(rawRecord2)
960+
961+
const it = result[Symbol.asyncIterator]()
962+
await it.next()
963+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
964+
const summaryPromise = it.return!()
965+
966+
streamObserverMock.onCompleted({})
967+
968+
const { value, done } = await summaryPromise
944969
expect(value).toEqual(summary)
945970
expect(done).toEqual(true)
946971
})
@@ -953,6 +978,7 @@ describe('Result', () => {
953978
streamObserverMock.onKeys(keys)
954979
streamObserverMock.onNext(rawRecord1)
955980
streamObserverMock.onNext(rawRecord2)
981+
streamObserverMock.onCompleted({})
956982

957983
const it = result[Symbol.asyncIterator]()
958984

@@ -966,8 +992,9 @@ describe('Result', () => {
966992
expect(done).toEqual(true)
967993
})
968994

969-
it('should not subscribe to the observer when it is the first api called', async () => {
995+
it('should subscribe to the observer when it is the first api called', async () => {
970996
const subscribe = jest.spyOn(streamObserverMock, 'subscribe')
997+
streamObserverMock.onCompleted({})
971998

972999
const it = result[Symbol.asyncIterator]()
9731000

@@ -976,11 +1003,12 @@ describe('Result', () => {
9761003

9771004
await it.next()
9781005

979-
expect(subscribe).not.toBeCalled()
1006+
expect(subscribe).toBeCalled()
9801007
})
9811008

9821009
it('should not canceld stream when it is the first api called', async () => {
9831010
const cancel = jest.spyOn(streamObserverMock, 'cancel')
1011+
streamObserverMock.onCompleted({})
9841012

9851013
const it = result[Symbol.asyncIterator]()
9861014

@@ -1001,6 +1029,7 @@ describe('Result', () => {
10011029
streamObserverMock.onKeys(keys)
10021030
streamObserverMock.onNext(rawRecord1)
10031031
streamObserverMock.onNext(rawRecord2)
1032+
streamObserverMock.onCompleted({})
10041033

10051034
const it = result[Symbol.asyncIterator]()
10061035

@@ -1013,26 +1042,28 @@ describe('Result', () => {
10131042

10141043
it('should prevent following next requests to subscribe to the stream', async () => {
10151044
const subscribe = jest.spyOn(streamObserverMock, 'subscribe')
1045+
streamObserverMock.onCompleted({})
10161046

10171047
const it = result[Symbol.asyncIterator]()
10181048

10191049
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
10201050
await it.return!(new ResultSummary('query', {}, {}))
10211051
await it.next()
10221052

1023-
expect(subscribe).not.toBeCalled()
1053+
expect(subscribe).toBeCalledTimes(1)
10241054
})
10251055

10261056
it('should prevent following peek requests to subscribe to the stream', async () => {
10271057
const subscribe = jest.spyOn(streamObserverMock, 'subscribe')
1058+
streamObserverMock.onCompleted({})
10281059

10291060
const it = result[Symbol.asyncIterator]()
10301061

10311062
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
10321063
await it.return!(new ResultSummary('query', {}, {}))
10331064
await it.peek()
10341065

1035-
expect(subscribe).not.toBeCalled()
1066+
expect(subscribe).toBeCalledTimes(1)
10361067
})
10371068
})
10381069

0 commit comments

Comments
 (0)