Skip to content

Make Result[Symbol.asyncIterator].return wait for Summary #965

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions packages/core/src/result.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ interface ResultObserver {
*/
interface QueuedResultObserver extends ResultObserver {
dequeue: () => Promise<IteratorResult<Record, ResultSummary>>
dequeueUntilDone: () => Promise<IteratorResult<Record, ResultSummary>>
head: () => Promise<IteratorResult<Record, ResultSummary>>
size: number
}
Expand Down Expand Up @@ -308,11 +309,19 @@ class Result implements Promise<QueryResult> {
}
return next
},
return: async (value: ResultSummary) => {
state.finished = true
state.summary = value
return: async (value?: ResultSummary) => {
if (state.finished) {
if (assertSummary(state.summary)) {
return { done: true, value: value ?? state.summary }
}
}
state.streaming?.cancel()
return { done: true, value: value }
const queuedObserver = await initializeObserver()
const last = await queuedObserver.dequeueUntilDone()
state.finished = true
last.value = value ?? last.value
state.summary = last.value as ResultSummary
return last
},
peek: async () => {
if (state.finished) {
Expand Down Expand Up @@ -540,6 +549,19 @@ class Result implements Promise<QueryResult> {
return elementOrError instanceof Error
}

async function dequeue (): Promise<IteratorResult<Record, ResultSummary>> {
if (buffer.length > 0) {
const element = buffer.shift() ?? newError('Unexpected empty buffer', PROTOCOL_ERROR)
onQueueSizeChanged()
if (isError(element)) {
throw element
}
return element
}
promiseHolder.resolvable = createResolvablePromise()
return await promiseHolder.resolvable.promise
}

const buffer: QueuedResultElementOrError[] = []
const promiseHolder: {
resolvable: ResolvablePromise<IteratorResult<Record, ResultSummary>> | null
Expand Down Expand Up @@ -569,17 +591,14 @@ class Result implements Promise<QueryResult> {
onQueueSizeChanged()
}
},
dequeue: async () => {
if (buffer.length > 0) {
const element = buffer.shift() ?? newError('Unexpected empty buffer', PROTOCOL_ERROR)
onQueueSizeChanged()
if (isError(element)) {
throw element
dequeue: dequeue,
dequeueUntilDone: async () => {
while (true) {
const element = await dequeue()
if (element.done === true) {
return element
}
return element
}
promiseHolder.resolvable = createResolvablePromise()
return await promiseHolder.resolvable.promise
},
head: async () => {
if (buffer.length > 0) {
Expand Down
41 changes: 36 additions & 5 deletions packages/core/test/result.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -939,8 +939,33 @@ describe('Result', () => {
const it = result[Symbol.asyncIterator]()
await it.next()
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const { value, done } = await it.return!(summary)
const summaryPromise = it.return!(summary)

streamObserverMock.onCompleted({})

const { value, done } = await summaryPromise
expect(value).toBe(summary)
expect(done).toEqual(true)
})

it('should return resultant summary when it get called without params', async () => {
const keys = ['a', 'b']
const rawRecord1 = [1, 2]
const rawRecord2 = [3, 4]
const summary = new ResultSummary('query', {}, {})

streamObserverMock.onKeys(keys)
streamObserverMock.onNext(rawRecord1)
streamObserverMock.onNext(rawRecord2)

const it = result[Symbol.asyncIterator]()
await it.next()
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const summaryPromise = it.return!()

streamObserverMock.onCompleted({})

const { value, done } = await summaryPromise
expect(value).toEqual(summary)
expect(done).toEqual(true)
})
Expand All @@ -953,6 +978,7 @@ describe('Result', () => {
streamObserverMock.onKeys(keys)
streamObserverMock.onNext(rawRecord1)
streamObserverMock.onNext(rawRecord2)
streamObserverMock.onCompleted({})

const it = result[Symbol.asyncIterator]()

Expand All @@ -966,8 +992,9 @@ describe('Result', () => {
expect(done).toEqual(true)
})

it('should not subscribe to the observer when it is the first api called', async () => {
it('should subscribe to the observer when it is the first api called', async () => {
const subscribe = jest.spyOn(streamObserverMock, 'subscribe')
streamObserverMock.onCompleted({})

const it = result[Symbol.asyncIterator]()

Expand All @@ -976,11 +1003,12 @@ describe('Result', () => {

await it.next()

expect(subscribe).not.toBeCalled()
expect(subscribe).toBeCalled()
})

it('should not canceld stream when it is the first api called', async () => {
const cancel = jest.spyOn(streamObserverMock, 'cancel')
streamObserverMock.onCompleted({})

const it = result[Symbol.asyncIterator]()

Expand All @@ -1001,6 +1029,7 @@ describe('Result', () => {
streamObserverMock.onKeys(keys)
streamObserverMock.onNext(rawRecord1)
streamObserverMock.onNext(rawRecord2)
streamObserverMock.onCompleted({})

const it = result[Symbol.asyncIterator]()

Expand All @@ -1013,26 +1042,28 @@ describe('Result', () => {

it('should prevent following next requests to subscribe to the stream', async () => {
const subscribe = jest.spyOn(streamObserverMock, 'subscribe')
streamObserverMock.onCompleted({})

const it = result[Symbol.asyncIterator]()

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await it.return!(new ResultSummary('query', {}, {}))
await it.next()

expect(subscribe).not.toBeCalled()
expect(subscribe).toBeCalledTimes(1)
})

it('should prevent following peek requests to subscribe to the stream', async () => {
const subscribe = jest.spyOn(streamObserverMock, 'subscribe')
streamObserverMock.onCompleted({})

const it = result[Symbol.asyncIterator]()

// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
await it.return!(new ResultSummary('query', {}, {}))
await it.peek()

expect(subscribe).not.toBeCalled()
expect(subscribe).toBeCalledTimes(1)
})
})

Expand Down