Skip to content

Commit 34080c0

Browse files
committed
Add executeRead and executeWrite to RxSession
1 parent 4aff4be commit 34080c0

12 files changed

+376
-6
lines changed

packages/neo4j-driver/src/session-rx.js

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ export default class RxSession {
8484
* Executes the provided unit of work in a {@link READ} reactive transaction which is created with the provided
8585
* transaction configuration.
8686
* @public
87+
* @deprecated This method will be removed in version 6.0. Please, use {@link RxSession#executeRead} instead.
8788
* @param {function(txc: RxTransaction): Observable} work - A unit of work to be executed.
8889
* @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver.
8990
* @returns {Observable} - A reactive stream returned by the unit of work.
@@ -96,6 +97,7 @@ export default class RxSession {
9697
* Executes the provided unit of work in a {@link WRITE} reactive transaction which is created with the provided
9798
* transaction configuration.
9899
* @public
100+
* @deprecated This method will be removed in version 6.0. Please, use {@link RxSession#executeWrite} instead.
99101
* @param {function(txc: RxTransaction): Observable} work - A unit of work to be executed.
100102
* @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver.
101103
* @returns {Observable} - A reactive stream returned by the unit of work.
@@ -104,6 +106,45 @@ export default class RxSession {
104106
return this._runTransaction(ACCESS_MODE_WRITE, work, transactionConfig)
105107
}
106108

109+
110+
/**
111+
* Executes the provided unit of work in a {@link READ} reactive transaction which is created with the provided
112+
* transaction configuration.
113+
* @public
114+
* @param {function(txc: RxManagedTransaction): Observable} work - A unit of work to be executed.
115+
* @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver.
116+
* @returns {Observable} - A reactive stream returned by the unit of work.
117+
*/
118+
executeRead (work, transactionConfig) {
119+
return this._executeInTransaction(ACCESS_MODE_READ, work, transactionConfig)
120+
}
121+
122+
/**
123+
* Executes the provided unit of work in a {@link WRITE} reactive transaction which is created with the provided
124+
* transaction configuration.
125+
* @public
126+
* @param {function(txc: RxManagedTransaction): Observable} work - A unit of work to be executed.
127+
* @param {TransactionConfig} transactionConfig - Configuration for the enclosing transaction created by the driver.
128+
* @returns {Observable} - A reactive stream returned by the unit of work.
129+
*/
130+
executeWrite (work, transactionConfig) {
131+
return this._executeInTransaction(ACCESS_MODE_WRITE, work, transactionConfig)
132+
}
133+
134+
/**
135+
* @private
136+
* @param {function(txc: RxManagedTransaction): Observable} work
137+
* @param {TransactionConfig} transactionConfig
138+
* @returns {Observable}
139+
*/
140+
_executeInTransaction (accessMode, work, transactionConfig) {
141+
const wrapper = txc => new RxManagedTransaction({
142+
run: txc.run.bind(txc),
143+
isOpen: txc.isOpen.bind(txc)
144+
})
145+
return this._runTransaction(accessMode, work, transactionConfig, wrapper)
146+
}
147+
107148
/**
108149
* Closes this reactive session.
109150
*
@@ -181,7 +222,7 @@ export default class RxSession {
181222
/**
182223
* @private
183224
*/
184-
_runTransaction (accessMode, work, transactionConfig) {
225+
_runTransaction (accessMode, work, transactionConfig, transactionWrapper = (tx) => tx) {
185226
let txConfig = TxConfig.empty()
186227
if (transactionConfig) {
187228
txConfig = new TxConfig(transactionConfig)
@@ -192,7 +233,7 @@ export default class RxSession {
192233
flatMap(txc =>
193234
defer(() => {
194235
try {
195-
return work(txc)
236+
return work(transactionWrapper(txc))
196237
} catch (err) {
197238
return throwError(err)
198239
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
/**
21+
* Represents a rx transaction that is managed by the transaction executor.
22+
*
23+
* @public
24+
*/
25+
class RxManagedTransaction {
26+
constructor({ run, isOpen }) {
27+
this._run = run
28+
this._isOpen = isOpen
29+
}
30+
31+
/**
32+
* Creates a reactive result that will execute the query in this transaction, with the provided parameters.
33+
*
34+
* @public
35+
* @param {string} query - Query to be executed.
36+
* @param {Object} parameters - Parameter values to use in query execution.
37+
* @returns {RxResult} - A reactive result
38+
*/
39+
run (query, parameters) {
40+
return this._run(query, parameters)
41+
}
42+
43+
/**
44+
* Check if this transaction is active, which means commit and rollback did not happen.
45+
* @return {boolean} `true` when not committed and not rolled back, `false` otherwise.
46+
*/
47+
isOpen() {
48+
return this._isOpen()
49+
}
50+
}

packages/neo4j-driver/src/transaction-rx.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ export default class RxTransaction {
4141
* @param {Object} parameters - Parameter values to use in query execution.
4242
* @returns {RxResult} - A reactive result
4343
*/
44-
4544
run (query, parameters) {
4645
return new RxResult(
4746
new Observable(observer => {
@@ -91,6 +90,14 @@ export default class RxTransaction {
9190
})
9291
}
9392

93+
/**
94+
* Check if this transaction is active, which means commit and rollback did not happen.
95+
* @return {boolean} `true` when not committed and not rolled back, `false` otherwise.
96+
*/
97+
isOpen() {
98+
return this._txc.isOpen()
99+
}
100+
94101
/**
95102
* Closes the transaction
96103
*

packages/neo4j-driver/test/rx/session.test.js

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,138 @@ describe('#integration rx-session', () => {
239239
expect(await countNodes('Person')).toBe(0)
240240
}, 60000)
241241

242+
describe('.executeWrite()', () => {
243+
it('should run transactions without retries', async () => {
244+
if (protocolVersion < 4.0) {
245+
return
246+
}
247+
248+
const txcWork = new ConfigurableTransactionWork({
249+
query: 'CREATE (:WithoutRetry) RETURN 5'
250+
})
251+
252+
const result = await session
253+
.executeWrite(txc => txcWork.work(txc))
254+
.pipe(materialize(), toArray())
255+
.toPromise()
256+
expect(result).toEqual([
257+
Notification.createNext(5),
258+
Notification.createComplete()
259+
])
260+
261+
expect(txcWork.invocations).toBe(1)
262+
expect(await countNodes('WithoutRetry')).toBe(1)
263+
}, 60000)
264+
265+
266+
it('should run transaction with retries on reactive failures', async () => {
267+
if (protocolVersion < 4.0) {
268+
return
269+
}
270+
271+
const txcWork = new ConfigurableTransactionWork({
272+
query: 'CREATE (:WithReactiveFailure) RETURN 7',
273+
reactiveFailures: [
274+
newError('service unavailable', SERVICE_UNAVAILABLE),
275+
newError('session expired', SESSION_EXPIRED),
276+
newError('transient error', 'Neo.TransientError.Transaction.NotStarted')
277+
]
278+
})
279+
280+
const result = await session
281+
.executeWrite(txc => txcWork.work(txc))
282+
.pipe(materialize(), toArray())
283+
.toPromise()
284+
expect(result).toEqual([
285+
Notification.createNext(7),
286+
Notification.createComplete()
287+
])
288+
289+
expect(txcWork.invocations).toBe(4)
290+
expect(await countNodes('WithReactiveFailure')).toBe(1)
291+
}, 60000)
292+
293+
it('should run transaction with retries on synchronous failures', async () => {
294+
if (protocolVersion < 4.0) {
295+
return
296+
}
297+
298+
const txcWork = new ConfigurableTransactionWork({
299+
query: 'CREATE (:WithSyncFailure) RETURN 9',
300+
syncFailures: [
301+
newError('service unavailable', SERVICE_UNAVAILABLE),
302+
newError('session expired', SESSION_EXPIRED),
303+
newError('transient error', 'Neo.TransientError.Transaction.NotStarted')
304+
]
305+
})
306+
307+
const result = await session
308+
.executeWrite(txc => txcWork.work(txc))
309+
.pipe(materialize(), toArray())
310+
.toPromise()
311+
expect(result).toEqual([
312+
Notification.createNext(9),
313+
Notification.createComplete()
314+
])
315+
316+
expect(txcWork.invocations).toBe(4)
317+
expect(await countNodes('WithSyncFailure')).toBe(1)
318+
}, 60000)
319+
320+
it('should fail on transactions that cannot be retried', async () => {
321+
if (protocolVersion < 4.0) {
322+
return
323+
}
324+
325+
const txcWork = new ConfigurableTransactionWork({
326+
query: 'UNWIND [10, 5, 0] AS x CREATE (:Hi) RETURN 10/x'
327+
})
328+
329+
const result = await session
330+
.executeWrite(txc => txcWork.work(txc))
331+
.pipe(materialize(), toArray())
332+
.toPromise()
333+
expect(result).toEqual([
334+
Notification.createNext(1),
335+
Notification.createNext(2),
336+
Notification.createError(jasmine.stringMatching(/\/ by zero/))
337+
])
338+
339+
expect(txcWork.invocations).toBe(1)
340+
expect(await countNodes('Hi')).toBe(0)
341+
}, 60000)
342+
343+
it('should fail even after a transient error', async () => {
344+
if (protocolVersion < 4.0) {
345+
return
346+
}
347+
348+
const txcWork = new ConfigurableTransactionWork({
349+
query: 'CREATE (:Person) RETURN 1',
350+
syncFailures: [
351+
newError(
352+
'a transient error',
353+
'Neo.TransientError.Transaction.NotStarted'
354+
)
355+
],
356+
reactiveFailures: [
357+
newError('a database error', 'Neo.Database.Not.Started')
358+
]
359+
})
360+
361+
const result = await session
362+
.executeWrite(txc => txcWork.work(txc))
363+
.pipe(materialize(), toArray())
364+
.toPromise()
365+
expect(result).toEqual([
366+
Notification.createError(jasmine.stringMatching(/a database error/))
367+
])
368+
369+
expect(txcWork.invocations).toBe(2)
370+
expect(await countNodes('Person')).toBe(0)
371+
}, 60000)
372+
})
373+
242374
async function countNodes (label) {
243375
const session = driver.rxSession()
244376
return await session

packages/neo4j-driver/test/types/session-rx.test.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import RxSession from '../../types/session-rx'
2121
import RxTransaction from '../../types/transaction-rx'
22+
import { RxManagedTransaction } from '../../types'
2223
import RxResult from '../../types/result-rx'
2324
import {
2425
Integer,
@@ -152,3 +153,27 @@ const observable6: Observable<number> = rxSession.writeTransaction(
152153
(tx: RxTransaction) => of(42),
153154
txConfig4
154155
)
156+
157+
const observable7: Observable<number> = rxSession.executeRead(
158+
(tx: RxManagedTransaction) => {
159+
return of(10)
160+
}
161+
)
162+
163+
const observable8: Observable<string> = rxSession.executeRead(
164+
(tx: RxManagedTransaction) => {
165+
return of('42')
166+
}
167+
)
168+
169+
const observable9: Observable<number> = rxSession.executeWrite(
170+
(tx: RxManagedTransaction) => {
171+
return of(10)
172+
}
173+
)
174+
175+
const observable10: Observable<string> = rxSession.executeWrite(
176+
(tx: RxManagedTransaction) => {
177+
return of('42')
178+
}
179+
)
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/**
2+
* Copyright (c) "Neo4j"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import RxManagedTransaction from '../../types/transaction-managed-rx'
21+
import { Record, ResultSummary } from 'neo4j-driver-core'
22+
import RxResult from '../../types/result-rx'
23+
import { Observable, of, Observer, throwError } from 'rxjs'
24+
import { concat, finalize, catchError } from 'rxjs/operators'
25+
26+
const dummy: any = null
27+
28+
const stringObserver: Observer<string> = {
29+
next: value => console.log(value),
30+
complete: () => console.log('complete'),
31+
error: error => console.log(`error: ${error}`)
32+
}
33+
34+
const keysObserver: Observer<string[]> = {
35+
next: value => console.log(`keys: ${value}`),
36+
complete: () => console.log('keys complete'),
37+
error: error => console.log(`keys error: ${error}`)
38+
}
39+
40+
const recordsObserver: Observer<Record> = {
41+
next: value => console.log(`record: ${value}`),
42+
complete: () => console.log('records complete'),
43+
error: error => console.log(`records error: ${error}`)
44+
}
45+
46+
const summaryObserver: Observer<ResultSummary> = {
47+
next: value => console.log(`summary: ${value}`),
48+
complete: () => console.log('summary complete'),
49+
error: error => console.log(`summary error: ${error}`)
50+
}
51+
52+
const tx: RxManagedTransaction = dummy
53+
54+
const result1: RxResult = tx.run('RETURN 1')
55+
result1.keys().subscribe(keysObserver)
56+
result1.records().subscribe(recordsObserver)
57+
result1.consume().subscribe(summaryObserver)
58+
59+
const result2: RxResult = tx.run('RETURN $value', { value: '42' })
60+
result2.keys().subscribe(keysObserver)
61+
result2.records().subscribe(recordsObserver)
62+
result2.consume().subscribe(summaryObserver)
63+
64+
const isOpen: boolean = tx.isOpen()

0 commit comments

Comments
 (0)