Skip to content

Commit f324878

Browse files
committed
Improve failure handling for RESET and ACK_FAILURE
Both RESET and ACK_FAILURE messages will now be immediately flushed to the network to make sure connections are fully cleaned up before being returned to the pool. Outbound messages will only be sent if the connection is not broken. Failure to RESET or ACK_FAILURE will be considered a fatal error, unless ACK_FAILURE was ignored because of a subsequent RESET.
1 parent 1fe5df0 commit f324878

File tree

6 files changed

+259
-66
lines changed

6 files changed

+259
-66
lines changed

src/v1/internal/connection-holder.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,9 @@ export default class ConnectionHolder {
9797
_releaseConnection() {
9898
this._connectionPromise = this._connectionPromise.then(connection => {
9999
if (connection) {
100-
connection.reset();
101-
connection.sync();
102-
connection._release();
100+
return connection.resetAndFlush().finally(() => connection._release());
101+
} else {
102+
return Promise.resolve();
103103
}
104104
}).catch(ignoredError => {
105105
});

src/v1/internal/connector.js

Lines changed: 96 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import NodeChannel from './ch-node';
2222
import {Chunker, Dechunker} from './chunking';
2323
import packStreamUtil from './packstream-util';
2424
import {alloc} from './buf';
25-
import {newError} from './../error';
25+
import {newError, PROTOCOL_ERROR} from './../error';
2626
import ChannelConfig from './ch-config';
2727
import urlUtil from './url-util';
2828
import StreamObserver from './stream-observer';
@@ -120,7 +120,7 @@ class Connection {
120120
this._packer = packStreamUtil.createLatestPacker(this._chunker);
121121
this._unpacker = packStreamUtil.createLatestUnpacker(disableLosslessIntegers);
122122

123-
this._isHandlingFailure = false;
123+
this._ackFailureMuted = false;
124124
this._currentFailure = null;
125125

126126
this._state = new ConnectionState(this);
@@ -241,25 +241,8 @@ class Connection {
241241
this._currentObserver.onError( this._currentFailure );
242242
} finally {
243243
this._updateCurrentObserver();
244-
// Things are now broken. Pending observers will get FAILURE messages routed until
245-
// We are done handling this failure.
246-
if( !this._isHandlingFailure ) {
247-
this._isHandlingFailure = true;
248-
249-
// isHandlingFailure was false, meaning this is the first failure message
250-
// we see from this failure. We may see several others, one for each message
251-
// we had "optimistically" already sent after whatever it was that failed.
252-
// We only want to and need to ACK the first one, which is why we are tracking
253-
// this _isHandlingFailure thing.
254-
this._ackFailure({
255-
onNext: NO_OP,
256-
onError: NO_OP,
257-
onCompleted: () => {
258-
this._isHandlingFailure = false;
259-
this._currentFailure = null;
260-
}
261-
});
262-
}
244+
// Things are now broken. Pending observers will get FAILURE messages routed until we are done handling this failure.
245+
this._ackFailureIfNeeded();
263246
}
264247
break;
265248
case IGNORED:
@@ -268,7 +251,7 @@ class Connection {
268251
if (this._currentFailure && this._currentObserver.onError)
269252
this._currentObserver.onError(this._currentFailure);
270253
else if(this._currentObserver.onError)
271-
this._currentObserver.onError(payload);
254+
this._currentObserver.onError(newError('Ignored either because of an error or RESET'));
272255
} finally {
273256
this._updateCurrentObserver();
274257
}
@@ -282,72 +265,116 @@ class Connection {
282265
initialize( clientName, token, observer ) {
283266
log("C", "INIT", clientName, token);
284267
const initObserver = this._state.wrap(observer);
285-
this._queueObserver(initObserver);
286-
this._packer.packStruct( INIT, [this._packable(clientName), this._packable(token)],
287-
(err) => this._handleFatalError(err) );
288-
this._chunker.messageBoundary();
289-
this.sync();
268+
const queued = this._queueObserver(initObserver);
269+
if (queued) {
270+
this._packer.packStruct(INIT, [this._packable(clientName), this._packable(token)],
271+
(err) => this._handleFatalError(err));
272+
this._chunker.messageBoundary();
273+
this.sync();
274+
}
290275
}
291276

292277
/** Queue a RUN-message to be sent to the database */
293278
run( statement, params, observer ) {
294279
log("C", "RUN", statement, params);
295-
this._queueObserver(observer);
296-
this._packer.packStruct( RUN, [this._packable(statement), this._packable(params)],
297-
(err) => this._handleFatalError(err) );
298-
this._chunker.messageBoundary();
280+
const queued = this._queueObserver(observer);
281+
if (queued) {
282+
this._packer.packStruct(RUN, [this._packable(statement), this._packable(params)],
283+
(err) => this._handleFatalError(err));
284+
this._chunker.messageBoundary();
285+
}
299286
}
300287

301288
/** Queue a PULL_ALL-message to be sent to the database */
302289
pullAll( observer ) {
303290
log("C", "PULL_ALL");
304-
this._queueObserver(observer);
305-
this._packer.packStruct( PULL_ALL, [], (err) => this._handleFatalError(err) );
306-
this._chunker.messageBoundary();
291+
const queued = this._queueObserver(observer);
292+
if (queued) {
293+
this._packer.packStruct(PULL_ALL, [], (err) => this._handleFatalError(err));
294+
this._chunker.messageBoundary();
295+
}
307296
}
308297

309298
/** Queue a DISCARD_ALL-message to be sent to the database */
310299
discardAll( observer ) {
311300
log("C", "DISCARD_ALL");
312-
this._queueObserver(observer);
313-
this._packer.packStruct( DISCARD_ALL, [], (err) => this._handleFatalError(err) );
314-
this._chunker.messageBoundary();
301+
const queued = this._queueObserver(observer);
302+
if (queued) {
303+
this._packer.packStruct(DISCARD_ALL, [], (err) => this._handleFatalError(err));
304+
this._chunker.messageBoundary();
305+
}
315306
}
316307

317-
/** Queue a RESET-message to be sent to the database. Mutes failure handling. */
318-
reset(observer) {
308+
/**
309+
* Send a RESET-message to the database. Mutes failure handling.
310+
* Message is immediately flushed to the network. Separate {@link Connection#sync()} call is not required.
311+
* @return {Promise<void>} promise resolved when SUCCESS-message response arrives, or failed when other response messages arrives.
312+
*/
313+
resetAndFlush() {
319314
log('C', 'RESET');
320-
this._isHandlingFailure = true;
321-
let self = this;
322-
let wrappedObs = {
323-
onNext: observer ? observer.onNext : NO_OP,
324-
onError: observer ? observer.onError : NO_OP,
325-
onCompleted: () => {
326-
self._isHandlingFailure = false;
327-
if (observer) {
328-
observer.onCompleted();
315+
this._ackFailureMuted = true;
316+
317+
return new Promise((resolve, reject) => {
318+
const observer = {
319+
onNext: record => {
320+
const neo4jError = this._handleProtocolError('Received RECORD as a response for RESET: ' + JSON.stringify(record));
321+
reject(neo4jError);
322+
},
323+
onError: error => {
324+
const neo4jError = this._handleProtocolError('Received FAILURE as a response for RESET: ' + error);
325+
reject(neo4jError);
326+
},
327+
onCompleted: () => {
328+
this._ackFailureMuted = false;
329+
resolve();
329330
}
331+
};
332+
const queued = this._queueObserver(observer);
333+
if (queued) {
334+
this._packer.packStruct(RESET, [], err => this._handleFatalError(err));
335+
this._chunker.messageBoundary();
336+
this.sync();
330337
}
331-
};
332-
this._queueObserver(wrappedObs);
333-
this._packer.packStruct(RESET, [], (err) => this._handleFatalError(err));
334-
this._chunker.messageBoundary();
338+
});
335339
}
336340

337-
/** Queue a ACK_FAILURE-message to be sent to the database */
338-
_ackFailure( observer ) {
339-
log("C", "ACK_FAILURE");
340-
this._queueObserver(observer);
341-
this._packer.packStruct( ACK_FAILURE, [], (err) => this._handleFatalError(err) );
342-
this._chunker.messageBoundary();
341+
_ackFailureIfNeeded() {
342+
if (this._ackFailureMuted) {
343+
return;
344+
}
345+
346+
log('C', 'ACK_FAILURE');
347+
348+
const observer = {
349+
onNext: record => {
350+
this._handleProtocolError('Received RECORD as a response for ACK_FAILURE: ' + JSON.stringify(record));
351+
},
352+
onError: error => {
353+
if (!this._ackFailureMuted) {
354+
this._handleProtocolError('Received FAILURE as a response for ACK_FAILURE: ' + error);
355+
} else {
356+
this._currentFailure = null;
357+
}
358+
},
359+
onCompleted: () => {
360+
this._currentFailure = null;
361+
}
362+
};
363+
364+
const queued = this._queueObserver(observer);
365+
if (queued) {
366+
this._packer.packStruct(ACK_FAILURE, [], err => this._handleFatalError(err));
367+
this._chunker.messageBoundary();
368+
this.sync();
369+
}
343370
}
344371

345372
_queueObserver(observer) {
346373
if( this._isBroken ) {
347374
if( observer && observer.onError ) {
348375
observer.onError(this._error);
349376
}
350-
return;
377+
return false;
351378
}
352379
observer = observer || NO_OP_OBSERVER;
353380
observer.onCompleted = observer.onCompleted || NO_OP;
@@ -358,6 +385,7 @@ class Connection {
358385
} else {
359386
this._pendingObservers.push( observer );
360387
}
388+
return true;
361389
}
362390

363391
/**
@@ -419,6 +447,15 @@ class Connection {
419447
}
420448
}
421449
}
450+
451+
_handleProtocolError(message) {
452+
this._ackFailureMuted = false;
453+
this._currentFailure = null;
454+
this._updateCurrentObserver();
455+
const error = newError(message, PROTOCOL_ERROR);
456+
this._handleFatalError(error);
457+
return error;
458+
}
422459
}
423460

424461
class ConnectionState {

test/internal/connector.test.js

Lines changed: 115 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,16 @@ import {connect, Connection} from '../../src/v1/internal/connector';
2222
import {Packer} from '../../src/v1/internal/packstream-v1';
2323
import {Chunker} from '../../src/v1/internal/chunking';
2424
import {alloc} from '../../src/v1/internal/buf';
25-
import {Neo4jError} from '../../src/v1/error';
25+
import {Neo4jError, newError} from '../../src/v1/error';
2626
import sharedNeo4j from '../internal/shared-neo4j';
2727
import {ServerVersion} from '../../src/v1/internal/server-version';
2828
import lolex from 'lolex';
2929

30+
const ILLEGAL_MESSAGE = {signature: 42, fields: []};
31+
const SUCCESS_MESSAGE = {signature: 0x70, fields: [{}]};
32+
const FAILURE_MESSAGE = {signature: 0x7F, fields: [newError('Hello')]};
33+
const RECORD_MESSAGE = {signature: 0x71, fields: [{value: 'Hello'}]};
34+
3035
describe('connector', () => {
3136

3237
let clock;
@@ -241,6 +246,104 @@ describe('connector', () => {
241246
testConnectionTimeout(true, done);
242247
});
243248

249+
it('should not queue INIT observer when broken', () => {
250+
testQueueingOfObserversWithBrokenConnection(connection => connection.initialize('Hello', {}, {}));
251+
});
252+
253+
it('should not queue RUN observer when broken', () => {
254+
testQueueingOfObserversWithBrokenConnection(connection => connection.run('RETURN 1', {}, {}));
255+
});
256+
257+
it('should not queue PULL_ALL observer when broken', () => {
258+
testQueueingOfObserversWithBrokenConnection(connection => connection.pullAll({}));
259+
});
260+
261+
it('should not queue DISCARD_ALL observer when broken', () => {
262+
testQueueingOfObserversWithBrokenConnection(connection => connection.discardAll({}));
263+
});
264+
265+
it('should not queue RESET observer when broken', () => {
266+
const resetAction = connection => connection.resetAndFlush().catch(ignore => {
267+
});
268+
269+
testQueueingOfObserversWithBrokenConnection(resetAction);
270+
});
271+
272+
it('should not queue ACK_FAILURE observer when broken', () => {
273+
testQueueingOfObserversWithBrokenConnection(connection => connection._ackFailureIfNeeded());
274+
});
275+
276+
it('should reset and flush when SUCCESS received', done => {
277+
connection = connect('bolt://localhost');
278+
279+
connection.resetAndFlush().then(() => {
280+
expect(connection.isOpen()).toBeTruthy();
281+
done();
282+
}).catch(error => done.fail(error));
283+
284+
connection._handleMessage(SUCCESS_MESSAGE);
285+
});
286+
287+
it('should fail to reset and flush when FAILURE received', done => {
288+
connection = connect('bolt://localhost');
289+
290+
connection.resetAndFlush()
291+
.then(() => done.fail('Should fail'))
292+
.catch(error => {
293+
expect(error.message).toEqual('Received FAILURE as a response for RESET: Neo4jError: Hello');
294+
expect(connection._isBroken).toBeTruthy();
295+
expect(connection.isOpen()).toBeFalsy();
296+
done();
297+
});
298+
299+
connection._handleMessage(FAILURE_MESSAGE);
300+
});
301+
302+
it('should fail to reset and flush when RECORD received', done => {
303+
connection = connect('bolt://localhost');
304+
305+
connection.resetAndFlush()
306+
.then(() => done.fail('Should fail'))
307+
.catch(error => {
308+
expect(error.message).toEqual('Received RECORD as a response for RESET: {"value":"Hello"}');
309+
expect(connection._isBroken).toBeTruthy();
310+
expect(connection.isOpen()).toBeFalsy();
311+
done();
312+
});
313+
314+
connection._handleMessage(RECORD_MESSAGE);
315+
});
316+
317+
it('should ACK_FAILURE when SUCCESS received', () => {
318+
connection = connect('bolt://localhost');
319+
320+
connection._currentFailure = newError('Hello');
321+
connection._ackFailureIfNeeded();
322+
323+
connection._handleMessage(SUCCESS_MESSAGE);
324+
expect(connection._currentFailure).toBeNull();
325+
});
326+
327+
it('should fail the connection when ACK_FAILURE receives FAILURE', () => {
328+
connection = connect('bolt://localhost');
329+
330+
connection._ackFailureIfNeeded();
331+
332+
connection._handleMessage(FAILURE_MESSAGE);
333+
expect(connection._isBroken).toBeTruthy();
334+
expect(connection.isOpen()).toBeFalsy();
335+
});
336+
337+
it('should fail the connection when ACK_FAILURE receives RECORD', () => {
338+
connection = connect('bolt://localhost');
339+
340+
connection._ackFailureIfNeeded();
341+
342+
connection._handleMessage(RECORD_MESSAGE);
343+
expect(connection._isBroken).toBeTruthy();
344+
expect(connection.isOpen()).toBeFalsy();
345+
});
346+
244347
function packedHandshakeMessage() {
245348
const result = alloc(4);
246349
result.putInt32(0, 1);
@@ -301,4 +404,15 @@ describe('connector', () => {
301404
});
302405
}
303406

407+
function testQueueingOfObserversWithBrokenConnection(connectionAction) {
408+
connection = connect('bolt://localhost');
409+
410+
connection._handleMessage(ILLEGAL_MESSAGE);
411+
expect(connection.isOpen()).toBeFalsy();
412+
413+
expect(connection._pendingObservers.length).toEqual(0);
414+
connectionAction(connection);
415+
expect(connection._pendingObservers.length).toEqual(0);
416+
}
417+
304418
});

0 commit comments

Comments
 (0)