Skip to content

Fix a connection leak issue #465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions src/v1/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,15 @@ class Driver {
this._authToken = authToken
this._config = config
this._log = Logger.create(config)
this._pool = new Pool(
this._createConnection.bind(this),
this._destroyConnection.bind(this),
this._validateConnection.bind(this),
PoolConfig.fromDriverConfig(config),
this._log
)
this._pool = new Pool({
create: this._createConnection.bind(this),
destroy: this._destroyConnection.bind(this),
validate: this._validateConnection.bind(this),
installIdleObserver: this._installIdleObserverOnConnection.bind(this),
removeIdleObserver: this._removeIdleObserverOnConnection.bind(this),
config: PoolConfig.fromDriverConfig(config),
log: this._log
})

/**
* Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}.
Expand Down Expand Up @@ -177,6 +179,14 @@ class Driver {
return lifetime <= maxConnectionLifetime
}

_installIdleObserverOnConnection (conn, observer) {
conn._queueObserver(observer)
}

_removeIdleObserverOnConnection (conn) {
conn._updateCurrentObserver()
}

/**
* Dispose of a connection.
* @return {Connection} the connection to dispose.
Expand Down
28 changes: 23 additions & 5 deletions src/v1/internal/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,25 @@ class Pool {
* @param {function} validate called at various times (like when an instance is acquired and
* when it is returned). If this returns false, the resource will
* be evicted
* @param {function} installIdleObserver called when the resource is released back to pool
* @param {function} removeIdleObserver called when the resource is acquired from the pool
* @param {PoolConfig} config configuration for the new driver.
* @param {Logger} log the driver logger.
*/
constructor (
create,
destroy = () => true,
validate = () => true,
constructor ({
create = (address, release) => {},
destroy = conn => true,
validate = conn => true,
installIdleObserver = (conn, observer) => {},
removeIdleObserver = conn => {},
config = PoolConfig.defaultConfig(),
log = Logger.noOp()
) {
} = {}) {
this._create = create
this._destroy = destroy
this._validate = validate
this._installIdleObserver = installIdleObserver
this._removeIdleObserver = removeIdleObserver
this._maxSize = config.maxSize
this._acquisitionTimeout = config.acquisitionTimeout
this._pools = {}
Expand Down Expand Up @@ -165,6 +171,10 @@ class Pool {
const resource = pool.pop()

if (this._validate(resource)) {
if (this._removeIdleObserver) {
this._removeIdleObserver(resource)
}

// idle resource is valid and can be acquired
return Promise.resolve(resource)
} else {
Expand Down Expand Up @@ -197,6 +207,14 @@ class Pool {
if (this._log.isDebugEnabled()) {
this._log.debug(`${resource} released to the pool ${key}`)
}
if (this._installIdleObserver) {
this._installIdleObserver(resource, {
onError: () => {
this._pools[key] = this._pools[key].filter(r => r !== resource)
this._destroy(resource)
}
})
}
pool.push(resource)
}
} else {
Expand Down
7 changes: 4 additions & 3 deletions test/internal/connection-providers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1210,9 +1210,10 @@ function setupLoadBalancerToRememberRouters (loadBalancer, routersArray) {
}

function newPool () {
return new Pool((address, release) =>
Promise.resolve(new FakeConnection(address, release))
)
return new Pool({
create: (address, release) =>
Promise.resolve(new FakeConnection(address, release))
})
}

function expectRoutingTable (loadBalancer, routers, readers, writers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ describe('LeastConnectedLoadBalancingStrategy', () => {

class DummyPool extends Pool {
constructor (activeConnections) {
super(() => 42)
super({ create: () => 42 })
this._activeConnections = activeConnections
}

Expand Down
54 changes: 53 additions & 1 deletion test/internal/node/direct.driver.boltkit.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import neo4j from '../../../src/v1'
import { READ, WRITE } from '../../../src/v1/driver'
import boltStub from '../bolt-stub'
import { SERVICE_UNAVAILABLE } from '../../../src/v1/error'
import { newError, SERVICE_UNAVAILABLE } from '../../../src/v1/error'

describe('direct driver with stub server', () => {
let originalTimeout
Expand Down Expand Up @@ -423,6 +423,58 @@ describe('direct driver with stub server', () => {
})
})

it('should close connection if it dies sitting idle in connection pool', done => {
if (!boltStub.supported) {
done()
return
}

const server = boltStub.start(
'./test/resources/boltstub/read_server_v3_read.script',
9001
)

boltStub.run(() => {
const driver = boltStub.newDriver('bolt://127.0.0.1:9001')
const session = driver.session(READ)

session
.run('MATCH (n) RETURN n.name')
.then(result => {
const records = result.records
expect(records.length).toEqual(3)
expect(records[0].get(0)).toBe('Bob')
expect(records[1].get(0)).toBe('Alice')
expect(records[2].get(0)).toBe('Tina')

const connectionKey = Object.keys(driver._openConnections)[0]
expect(connectionKey).toBeTruthy()

const connection = driver._openConnections[connectionKey]
session.close(() => {
// generate a fake fatal error
connection._handleFatalError(
newError('connection reset', SERVICE_UNAVAILABLE)
)

// expect that the connection to be removed from the pool
expect(driver._pool._pools['127.0.0.1:9001'].length).toEqual(0)
expect(
driver._pool._activeResourceCounts['127.0.0.1:9001']
).toBeFalsy()
// expect that the connection to be unregistered from the open connections registry
expect(driver._openConnections[connectionKey]).toBeFalsy()
driver.close()
server.exit(code => {
expect(code).toEqual(0)
done()
})
})
})
.catch(error => done.fail(error))
})
})

describe('should fail if commit fails due to broken connection', () => {
it('v1', done => {
verifyFailureOnConnectionFailureWhenExplicitTransactionIsCommitted(
Expand Down
Loading