Skip to content

Commit 53a0c9d

Browse files
authored
Merge pull request #397 from lutovich/1.6-tx-function-retry-fix
Rollback transaction after failure in transaction functions
2 parents 6deff52 + 1dd24f9 commit 53a0c9d

File tree

4 files changed

+249
-49
lines changed

4 files changed

+249
-49
lines changed

src/v1/internal/transaction-executor.js

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -76,37 +76,62 @@ export default class TransactionExecutor {
7676
}
7777

7878
_executeTransactionInsidePromise(transactionCreator, transactionWork, resolve, reject) {
79+
let tx;
7980
try {
80-
const tx = transactionCreator();
81-
const transactionWorkResult = transactionWork(tx);
81+
tx = transactionCreator();
82+
} catch (error) {
83+
// failed to create a transaction
84+
reject(error);
85+
return;
86+
}
87+
88+
const resultPromise = this._safeExecuteTransactionWork(tx, transactionWork);
8289

90+
resultPromise
91+
.then(result => this._handleTransactionWorkSuccess(result, tx, resolve, reject))
92+
.catch(error => this._handleTransactionWorkFailure(error, tx, reject));
93+
}
94+
95+
_safeExecuteTransactionWork(tx, transactionWork) {
96+
try {
97+
const result = transactionWork(tx);
8398
// user defined callback is supposed to return a promise, but it might not; so to protect against an
8499
// incorrect API usage we wrap the returned value with a resolved promise; this is effectively a
85100
// validation step without type checks
86-
const resultPromise = Promise.resolve(transactionWorkResult);
87-
88-
resultPromise.then(result => {
89-
if (tx.isOpen()) {
90-
// transaction work returned resolved promise and transaction has not been committed/rolled back
91-
// try to commit the transaction
92-
tx.commit().then(() => {
93-
// transaction was committed, return result to the user
94-
resolve(result);
95-
}).catch(error => {
96-
// transaction failed to commit, propagate the failure
97-
reject(error);
98-
});
99-
} else {
100-
// transaction work returned resolved promise and transaction is already committed/rolled back
101-
// return the result returned by given transaction work
102-
resolve(result);
103-
}
101+
return Promise.resolve(result);
102+
} catch (error) {
103+
return Promise.reject(error);
104+
}
105+
}
106+
107+
_handleTransactionWorkSuccess(result, tx, resolve, reject) {
108+
if (tx.isOpen()) {
109+
// transaction work returned resolved promise and transaction has not been committed/rolled back
110+
// try to commit the transaction
111+
tx.commit().then(() => {
112+
// transaction was committed, return result to the user
113+
resolve(result);
104114
}).catch(error => {
105-
// transaction work returned rejected promise, propagate the failure
115+
// transaction failed to commit, propagate the failure
106116
reject(error);
107117
});
118+
} else {
119+
// transaction work returned resolved promise and transaction is already committed/rolled back
120+
// return the result returned by given transaction work
121+
resolve(result);
122+
}
123+
}
108124

109-
} catch (error) {
125+
_handleTransactionWorkFailure(error, tx, reject) {
126+
if (tx.isOpen()) {
127+
// transaction work failed and the transaction is still open, roll it back and propagate the failure
128+
tx.rollback()
129+
.catch(ignore => {
130+
// ignore the rollback error
131+
})
132+
.then(() => reject(error)); // propagate the original error we got from the transaction work
133+
} else {
134+
// transaction is already rolled back, propagate the error
110135
reject(error);
111136
}
112137
}

test/internal/transaction-executor.test.js

Lines changed: 74 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,13 @@ describe('TransactionExecutor', () => {
8080

8181
it('should stop retrying when time expires', done => {
8282
const executor = new TransactionExecutor();
83-
let workInvocationCounter = 0;
83+
const usedTransactions = [];
8484
const realWork = transactionWork([SERVICE_UNAVAILABLE, SESSION_EXPIRED, TRANSIENT_ERROR_1, TRANSIENT_ERROR_2], 42);
8585

8686
const result = executor.execute(transactionCreator(), tx => {
8787
expect(tx).toBeDefined();
88-
workInvocationCounter++;
89-
if (workInvocationCounter === 3) {
88+
usedTransactions.push(tx);
89+
if (usedTransactions.length === 3) {
9090
const currentTime = Date.now();
9191
clock = lolex.install();
9292
clock.setSystemTime(currentTime + 30001); // move `Date.now()` call forward by 30 seconds
@@ -95,7 +95,8 @@ describe('TransactionExecutor', () => {
9595
});
9696

9797
result.catch(error => {
98-
expect(workInvocationCounter).toEqual(3);
98+
expect(usedTransactions.length).toEqual(3);
99+
expectAllTransactionsToBeClosed(usedTransactions);
99100
expect(error.code).toEqual(TRANSIENT_ERROR_1);
100101
done();
101102
});
@@ -152,6 +153,14 @@ describe('TransactionExecutor', () => {
152153
);
153154
});
154155

156+
it('should retry when transaction work throws and rollback fails', done => {
157+
testRetryWhenTransactionWorkThrowsAndRollbackFails(
158+
[SERVICE_UNAVAILABLE, TRANSIENT_ERROR_2, SESSION_EXPIRED, SESSION_EXPIRED],
159+
[SESSION_EXPIRED, TRANSIENT_ERROR_1],
160+
done
161+
);
162+
});
163+
155164
it('should cancel in-flight timeouts when closed', done => {
156165
const executor = new TransactionExecutor();
157166
// do not execute setTimeout callbacks
@@ -190,16 +199,16 @@ describe('TransactionExecutor', () => {
190199
function testRetryWhenTransactionCreatorFails(errorCodes, done) {
191200
const executor = new TransactionExecutor();
192201
const transactionCreator = throwingTransactionCreator(errorCodes, new FakeTransaction());
193-
let workInvocationCounter = 0;
202+
const usedTransactions = [];
194203

195204
const result = executor.execute(transactionCreator, tx => {
196205
expect(tx).toBeDefined();
197-
workInvocationCounter++;
206+
usedTransactions.push(tx);
198207
return Promise.resolve(42);
199208
});
200209

201210
result.then(value => {
202-
expect(workInvocationCounter).toEqual(1);
211+
expect(usedTransactions.length).toEqual(1);
203212
expect(value).toEqual(42);
204213
verifyRetryDelays(fakeSetTimeout, errorCodes.length);
205214
done();
@@ -208,18 +217,19 @@ describe('TransactionExecutor', () => {
208217

209218
function testRetryWhenTransactionWorkReturnsRejectedPromise(errorCodes, done) {
210219
const executor = new TransactionExecutor();
211-
let workInvocationCounter = 0;
220+
const usedTransactions = [];
212221
const realWork = transactionWork(errorCodes, 42);
213222

214223
const result = executor.execute(transactionCreator(), tx => {
215224
expect(tx).toBeDefined();
216-
workInvocationCounter++;
225+
usedTransactions.push(tx);
217226
return realWork();
218227
});
219228

220229
result.then(value => {
221230
// work should have failed 'failures.length' times and succeeded 1 time
222-
expect(workInvocationCounter).toEqual(errorCodes.length + 1);
231+
expect(usedTransactions.length).toEqual(errorCodes.length + 1);
232+
expectAllTransactionsToBeClosed(usedTransactions);
223233
expect(value).toEqual(42);
224234
verifyRetryDelays(fakeSetTimeout, errorCodes.length);
225235
done();
@@ -228,18 +238,19 @@ describe('TransactionExecutor', () => {
228238

229239
function testRetryWhenTransactionCommitReturnsRejectedPromise(errorCodes, done) {
230240
const executor = new TransactionExecutor();
231-
let workInvocationCounter = 0;
241+
const usedTransactions = [];
232242
const realWork = () => Promise.resolve(4242);
233243

234244
const result = executor.execute(transactionCreator(errorCodes), tx => {
235245
expect(tx).toBeDefined();
236-
workInvocationCounter++;
246+
usedTransactions.push(tx);
237247
return realWork();
238248
});
239249

240250
result.then(value => {
241251
// work should have failed 'failures.length' times and succeeded 1 time
242-
expect(workInvocationCounter).toEqual(errorCodes.length + 1);
252+
expect(usedTransactions.length).toEqual(errorCodes.length + 1);
253+
expectAllTransactionsToBeClosed(usedTransactions);
243254
expect(value).toEqual(4242);
244255
verifyRetryDelays(fakeSetTimeout, errorCodes.length);
245256
done();
@@ -248,37 +259,60 @@ describe('TransactionExecutor', () => {
248259

249260
function testRetryWhenTransactionWorkThrows(errorCodes, done) {
250261
const executor = new TransactionExecutor();
251-
let workInvocationCounter = 0;
262+
const usedTransactions = [];
252263
const realWork = throwingTransactionWork(errorCodes, 42);
253264

254265
const result = executor.execute(transactionCreator(), tx => {
255266
expect(tx).toBeDefined();
256-
workInvocationCounter++;
267+
usedTransactions.push(tx);
257268
return realWork();
258269
});
259270

260271
result.then(value => {
261272
// work should have failed 'failures.length' times and succeeded 1 time
262-
expect(workInvocationCounter).toEqual(errorCodes.length + 1);
273+
expect(usedTransactions.length).toEqual(errorCodes.length + 1);
274+
expectAllTransactionsToBeClosed(usedTransactions);
263275
expect(value).toEqual(42);
264276
verifyRetryDelays(fakeSetTimeout, errorCodes.length);
265277
done();
266278
});
267279
}
268280

281+
function testRetryWhenTransactionWorkThrowsAndRollbackFails(txWorkErrorCodes, rollbackErrorCodes, done) {
282+
const executor = new TransactionExecutor();
283+
const usedTransactions = [];
284+
const realWork = throwingTransactionWork(txWorkErrorCodes, 424242);
285+
286+
const result = executor.execute(transactionCreator([], rollbackErrorCodes), tx => {
287+
expect(tx).toBeDefined();
288+
usedTransactions.push(tx);
289+
return realWork();
290+
});
291+
292+
result.then(value => {
293+
// work should have failed 'failures.length' times and succeeded 1 time
294+
expect(usedTransactions.length).toEqual(txWorkErrorCodes.length + 1);
295+
expectAllTransactionsToBeClosed(usedTransactions);
296+
expect(value).toEqual(424242);
297+
verifyRetryDelays(fakeSetTimeout, txWorkErrorCodes.length);
298+
done();
299+
});
300+
}
301+
269302
function testNoRetryOnUnknownError(errorCodes, expectedWorkInvocationCount, done) {
270303
const executor = new TransactionExecutor();
271-
let workInvocationCounter = 0;
304+
const usedTransactions = [];
272305
const realWork = transactionWork(errorCodes, 42);
273306

274307
const result = executor.execute(transactionCreator(), tx => {
275308
expect(tx).toBeDefined();
276-
workInvocationCounter++;
309+
usedTransactions.push(tx);
277310
return realWork();
278311
});
279312

280313
result.catch(error => {
281-
expect(workInvocationCounter).toEqual(expectedWorkInvocationCount);
314+
expect(usedTransactions.length).toEqual(expectedWorkInvocationCount);
315+
expectAllTransactionsToBeClosed(usedTransactions);
282316
if (errorCodes.length === 1) {
283317
expect(error.code).toEqual(errorCodes[0]);
284318
} else {
@@ -290,9 +324,10 @@ describe('TransactionExecutor', () => {
290324

291325
});
292326

293-
function transactionCreator(commitErrorCodes) {
294-
const remainingErrorCodes = (commitErrorCodes || []).slice().reverse();
295-
return () => new FakeTransaction(remainingErrorCodes.pop());
327+
function transactionCreator(commitErrorCodes, rollbackErrorCodes) {
328+
const remainingCommitErrorCodes = (commitErrorCodes || []).slice().reverse();
329+
const remainingRollbackErrorCodes = (rollbackErrorCodes || []).slice().reverse();
330+
return () => new FakeTransaction(remainingCommitErrorCodes.pop(), remainingRollbackErrorCodes.pop());
296331
}
297332

298333
function throwingTransactionCreator(errorCodes, result) {
@@ -348,20 +383,35 @@ function verifyRetryDelays(fakeSetTimeout, expectedInvocationCount) {
348383
});
349384
}
350385

386+
function expectAllTransactionsToBeClosed(transactions) {
387+
transactions.forEach(tx => expect(tx.isOpen()).toBeFalsy());
388+
}
389+
351390
class FakeTransaction {
352391

353-
constructor(commitErrorCode) {
392+
constructor(commitErrorCode, rollbackErrorCode) {
354393
this._commitErrorCode = commitErrorCode;
394+
this._rollbackErrorCode = rollbackErrorCode;
395+
this._open = true;
355396
}
356397

357398
isOpen() {
358-
return true;
399+
return this._open;
359400
}
360401

361402
commit() {
403+
this._open = false;
362404
if (this._commitErrorCode) {
363405
return Promise.reject(error(this._commitErrorCode));
364406
}
365407
return Promise.resolve();
366408
}
409+
410+
rollback() {
411+
this._open = false;
412+
if (this._rollbackErrorCode) {
413+
return Promise.reject(error(this._rollbackErrorCode));
414+
}
415+
return Promise.resolve();
416+
}
367417
}

test/v1/session.test.js

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import sharedNeo4j from '../internal/shared-neo4j';
2727
import _ from 'lodash';
2828
import {ServerVersion, VERSION_3_1_0} from '../../src/v1/internal/server-version';
2929
import {isString} from '../../src/v1/internal/util';
30+
import {newError, PROTOCOL_ERROR, SESSION_EXPIRED} from '../../src/v1/error';
3031

3132
describe('session', () => {
3233

@@ -1092,6 +1093,75 @@ describe('session', () => {
10921093
testUnsupportedQueryParameter(new neo4j.types.Path(node1, node2, []), done);
10931094
});
10941095

1096+
it('should retry transaction until success when function throws', done => {
1097+
testTransactionRetryUntilSuccess(() => {
1098+
throw newError('Error that can be retried', SESSION_EXPIRED);
1099+
}, done);
1100+
});
1101+
1102+
it('should retry transaction until success when function returns rejected promise', done => {
1103+
testTransactionRetryUntilSuccess(() => Promise.reject(newError('Error that can be retried', SESSION_EXPIRED)), done);
1104+
});
1105+
1106+
it('should not retry transaction when function throws fatal error', done => {
1107+
testTransactionRetryOnFatalError(() => {
1108+
throw newError('Error that is fatal', PROTOCOL_ERROR);
1109+
}, done);
1110+
});
1111+
1112+
it('should not retry transaction when function returns promise rejected with fatal error', done => {
1113+
testTransactionRetryOnFatalError(() => Promise.reject(newError('Error that is fatal', 'ReallyFatalErrorCode')), done);
1114+
});
1115+
1116+
function testTransactionRetryUntilSuccess(failureResponseFunction, done) {
1117+
const session = driver.session();
1118+
1119+
const failures = 3;
1120+
const usedTransactions = [];
1121+
1122+
const resultPromise = session.writeTransaction(tx => {
1123+
usedTransactions.push(tx);
1124+
if (usedTransactions.length < failures) {
1125+
return failureResponseFunction();
1126+
} else {
1127+
return tx.run('RETURN "424242"');
1128+
}
1129+
});
1130+
1131+
resultPromise.then(result => {
1132+
expect(result.records[0].get(0)).toEqual('424242');
1133+
expect(usedTransactions.length).toEqual(3);
1134+
usedTransactions.forEach(tx => expect(tx.isOpen()).toBeFalsy());
1135+
session.close();
1136+
done();
1137+
}).catch(error => {
1138+
done.fail(error);
1139+
});
1140+
}
1141+
1142+
function testTransactionRetryOnFatalError(failureResponseFunction, done) {
1143+
const session = driver.session();
1144+
1145+
const usedTransactions = [];
1146+
1147+
const resultPromise = session.writeTransaction(tx => {
1148+
usedTransactions.push(tx);
1149+
return failureResponseFunction();
1150+
});
1151+
1152+
resultPromise.then(result => {
1153+
session.close();
1154+
done.fail('Retries should not succeed: ' + JSON.stringify(result));
1155+
}).catch(error => {
1156+
session.close();
1157+
expect(error).toBeDefined();
1158+
expect(error).not.toBeNull();
1159+
expect(usedTransactions.length).toEqual(1);
1160+
expect(usedTransactions[0].isOpen()).toBeFalsy();
1161+
done();
1162+
});
1163+
}
1164+
10951165
function serverIs31OrLater(done) {
10961166
if (serverVersion.compareTo(VERSION_3_1_0) < 0) {
10971167
done();

0 commit comments

Comments
 (0)