Skip to content

Commit f934150

Browse files
committed
Prevent resolve or iterate over already consumed Result
This `result.then` and `for await (const r of result)` in already consumed results could block for ever since no new event will came from wire. Blocking this kind of access improves not correct access to the Result object. This change also includes the addition of `Result.isOpen()`
1 parent 88c07fd commit f934150

File tree

5 files changed

+118
-6
lines changed

5 files changed

+118
-6
lines changed

packages/core/src/result.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import ResultSummary from './result-summary'
2121
import Record from './record'
2222
import { Query, PeekableAsyncIterator } from './types'
2323
import { observer, util, connectionHolder } from './internal'
24+
import { newError } from './error'
2425

2526
const { EMPTY_CONNECTION_HOLDER } = connectionHolder
2627

@@ -208,6 +209,9 @@ class Result implements Promise<QueryResult> {
208209
* @return {Promise} new Promise.
209210
*/
210211
private _getOrCreatePromise(): Promise<QueryResult> {
212+
if (!this.isOpen()) {
213+
return Promise.reject(newError('Result is already consumed'))
214+
}
211215
if (!this._p) {
212216
this._p = new Promise((resolve, reject) => {
213217
const records: Record[] = []
@@ -238,6 +242,13 @@ class Result implements Promise<QueryResult> {
238242
* @returns {PeekableAsyncIterator<Record, ResultSummary>} The async iterator for the Results
239243
*/
240244
[Symbol.asyncIterator](): PeekableAsyncIterator<Record, ResultSummary> {
245+
if (!this.isOpen()) {
246+
const error = newError('Result is already consumed')
247+
return {
248+
next: () => Promise.reject(error),
249+
peek: () => Promise.reject(error),
250+
}
251+
}
241252
const state: {
242253
paused: boolean,
243254
firstRun: boolean,
@@ -362,6 +373,10 @@ class Result implements Promise<QueryResult> {
362373
.catch(() => {})
363374
}
364375

376+
isOpen (): boolean {
377+
return this._summary === null && this._error === null
378+
}
379+
365380
/**
366381
* Stream records to observer as they come in, this is a more efficient method
367382
* of handling the results, and allows you to handle arbitrarily large results.

packages/core/src/transaction.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,7 @@ function finishTransaction(
600600
.then(connection => {
601601
onConnection()
602602
pendingResults.forEach(r => r._cancel())
603-
return Promise.all(pendingResults).then(results => {
603+
return Promise.all(pendingResults.map(result => result.summary())).then(results => {
604604
if (connection) {
605605
if (commit) {
606606
return connection.protocol().commitTransaction({

packages/core/test/result.test.ts

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,22 @@ describe('Result', () => {
451451
result.catch(() => { }).finally(done)
452452
})
453453

454+
it.each([
455+
['success', async (stream: any) => stream.onCompleted({})],
456+
//['error', async (stream: any) => stream.onError(new Error('error'))],
457+
])('should thrown over an consumed result [%s]', async(_, completeStream) => {
458+
completeStream(streamObserverMock)
459+
460+
await result.summary().catch(() => {})
461+
462+
try {
463+
await result
464+
expect('not finish iteration over consumed result').toBe(true)
465+
} catch (e) {
466+
expect(e).toEqual(newError('Result is already consumed'))
467+
}
468+
})
469+
454470
describe.each([
455471
['query', {}, { query: 'query', parameters: {} }],
456472
['query', { a: 1 }, { query: 'query', parameters: { a: 1 } }],
@@ -860,6 +876,26 @@ describe('Result', () => {
860876
])
861877
})
862878

879+
it.each([
880+
['success', async (stream: any) => stream.onCompleted({})],
881+
['error', async (stream: any) => stream.onError(new Error('error'))],
882+
])('should thrown on iterate over an consumed result [%s]', async(_, completeStream) => {
883+
completeStream(streamObserverMock)
884+
885+
await result.summary().catch(() => {})
886+
887+
try {
888+
for await (const _ of result) {
889+
expect('not iterate over consumed result').toBe(true)
890+
}
891+
expect('not finish iteration over consumed result').toBe(true)
892+
} catch (e) {
893+
expect(e).toEqual(newError('Result is already consumed'))
894+
}
895+
896+
expect('not finish iteration over consumed result')
897+
})
898+
863899
describe('.return()', () => {
864900
it('should finished the operator when it get called', async () => {
865901
const keys = ['a', 'b']
@@ -1205,6 +1241,30 @@ describe('Result', () => {
12051241
})
12061242
})
12071243
})
1244+
1245+
describe('.isOpen()', () => {
1246+
it('should return true when the stream is open', async () => {
1247+
await result._subscribe({}).catch(() => {})
1248+
1249+
expect(result.isOpen()).toBe(true)
1250+
})
1251+
1252+
it('should return false when the stream is closed', async () => {
1253+
streamObserverMock.onCompleted({})
1254+
1255+
await result._subscribe({}).catch(() => {})
1256+
1257+
expect(result.isOpen()).toBe(false)
1258+
})
1259+
1260+
it('should return false when the stream is failed', async () => {
1261+
streamObserverMock.onError(new Error('test'))
1262+
1263+
await result._subscribe({}).catch(() => {})
1264+
1265+
expect(result.isOpen()).toBe(false)
1266+
})
1267+
})
12081268
})
12091269

12101270
describe.each([
@@ -1319,6 +1379,19 @@ describe('Result', () => {
13191379
})
13201380
})
13211381
})
1382+
1383+
describe('isOpen()', () => {
1384+
it('should be true', () => {
1385+
expect(result.isOpen()).toBe(true)
1386+
})
1387+
1388+
it('should be false after any interactio with the stream', async () => {
1389+
const it = result[Symbol.asyncIterator]()
1390+
await it.next()
1391+
1392+
expect(result.isOpen()).toBe(false)
1393+
})
1394+
})
13221395
})
13231396

13241397
describe.each([
@@ -1381,6 +1454,30 @@ describe('Result', () => {
13811454
})
13821455
})
13831456

1457+
describe('.isOpen()', () => {
1458+
it('should be true', async () => {
1459+
expect(result.isOpen()).toBe(true)
1460+
1461+
// consume the stream to avoid unhaddled errors
1462+
try {
1463+
await result.summary()
1464+
} catch (error) {
1465+
}
1466+
})
1467+
1468+
it('should be false after any interactio with the stream', async () => {
1469+
const it = result[Symbol.asyncIterator]()
1470+
1471+
try {
1472+
await it.next()
1473+
} catch (error) {
1474+
// this's fine
1475+
}
1476+
1477+
expect(result.isOpen()).toBe(false)
1478+
})
1479+
})
1480+
13841481
function shouldReturnRejectedPromiseWithTheExpectedError<T>(
13851482
supplier: () => Promise<T>
13861483
) {

packages/testkit-backend/src/controller/local.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ export default class LocalController extends Controller {
6363
const id = this._contexts.get(contextId).addError(e)
6464
this._writeResponse(contextId, newResponse('DriverError', {
6565
id,
66-
msg: e.message + ' (' + e.code + ')',
66+
msg: e.message,
6767
code: e.code
6868
}))
6969
}

packages/testkit-backend/src/skipped-tests/common.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,10 @@ const skippedTests = [
138138
'stub.iteration.test_result_list.TestResultList.test_session_run_result_list_pulls_all_records_at_once_next_before_list'
139139
)
140140
),
141-
skip(
142-
'Results are always valid but don\'t return records when out of scope',
143-
ifStartsWith('stub.iteration.test_result_scope.TestResultScope.')
144-
),
141+
// skip(
142+
// 'Results are always valid but don\'t return records when out of scope',
143+
// ifStartsWith('stub.iteration.test_result_scope.TestResultScope.')
144+
// ),
145145
skip(
146146
'Driver (still) allows explicit managing of managed transaction',
147147
ifEquals('stub.tx_lifetime.test_tx_lifetime.TestTxLifetime.test_managed_tx_raises_tx_managed_exec')

0 commit comments

Comments
 (0)