Skip to content

Fixed purging of all connections in a pool #266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions src/v1/internal/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ class Pool {
}

purgeAll() {
for (let key in this._pools.keys) {
if (this._pools.hasOwnPropertykey) {
this.purge(key);
}
}
Object.keys(this._pools).forEach(key => this.purge(key));
}

has(key) {
Expand All @@ -84,7 +80,8 @@ class Pool {
_release(key, resource) {
let pool = this._pools[key];
if (!pool) {
//key has been purged, don't put it back
// key has been purged, don't put it back, just destroy the resource
this._destroy(resource);
return;
}
if( pool.length >= this._maxIdle || !this._validate(resource) ) {
Expand All @@ -96,4 +93,3 @@ class Pool {
}

export default Pool

271 changes: 191 additions & 80 deletions test/internal/pool.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,145 +17,256 @@
* limitations under the License.
*/

var Pool = require('../../lib/v1/internal/pool').default;
import Pool from '../../src/v1/internal/pool';

describe('Pool', function() {
it('allocates if pool is empty', function() {
describe('Pool', () => {

it('allocates if pool is empty', () => {
// Given
var counter = 0;
var key = "bolt://localhost:7687";
var pool = new Pool( function (url, release) { return new Resource(url, counter++, release) } );
let counter = 0;
const key = 'bolt://localhost:7687';
const pool = new Pool((url, release) => new Resource(url, counter++, release));

// When
var r0 = pool.acquire(key);
var r1 = pool.acquire(key);
const r0 = pool.acquire(key);
const r1 = pool.acquire(key);

// Then
expect( r0.id ).toBe( 0 );
expect( r1.id ).toBe( 1 );
expect(r0.id).toBe(0);
expect(r1.id).toBe(1);
expect(r0).not.toBe(r1);
});

it('pools if resources are returned', function() {
it('pools if resources are returned', () => {
// Given a pool that allocates
var counter = 0;
var key = "bolt://localhost:7687";
var pool = new Pool( function (url, release) { return new Resource(url, counter++, release) } );
let counter = 0;
const key = 'bolt://localhost:7687';
const pool = new Pool((url, release) => new Resource(url, counter++, release));

// When
var r0 = pool.acquire(key);
const r0 = pool.acquire(key);
r0.close();
var r1 = pool.acquire(key);
const r1 = pool.acquire(key);

// Then
expect( r0.id ).toBe( 0 );
expect( r1.id ).toBe( 0 );
expect(r0.id).toBe(0);
expect(r1.id).toBe(0);
expect(r0).toBe(r1);
});

it('handles multiple keys', function() {
it('handles multiple keys', () => {
// Given a pool that allocates
var counter = 0;
var key1 = "bolt://localhost:7687";
var key2 = "bolt://localhost:7688";
var pool = new Pool( function (url, release) { return new Resource(url, counter++, release) } );
let counter = 0;
const key1 = 'bolt://localhost:7687';
const key2 = 'bolt://localhost:7688';
const pool = new Pool((url, release) => new Resource(url, counter++, release));

// When
var r0 = pool.acquire(key1);
var r1 = pool.acquire(key2);
const r0 = pool.acquire(key1);
const r1 = pool.acquire(key2);
r0.close();
var r2 = pool.acquire(key1);
var r3 = pool.acquire(key2);
const r2 = pool.acquire(key1);
const r3 = pool.acquire(key2);

// Then
expect( r0.id ).toBe( 0 );
expect( r1.id ).toBe( 1 );
expect( r2.id ).toBe( 0 );
expect( r3.id ).toBe( 2 );
expect(r0.id).toBe(0);
expect(r1.id).toBe(1);
expect(r2.id).toBe(0);
expect(r3.id).toBe(2);

expect(r0).toBe(r2);
expect(r1).not.toBe(r3);
});

it('frees if pool reaches max size', function() {
it('frees if pool reaches max size', () => {
// Given a pool that tracks destroyed resources
var counter = 0,
destroyed = [];
var key = "bolt://localhost:7687";
var pool = new Pool(
function (url, release) { return new Resource(url, counter++, release) },
function (resource) { destroyed.push(resource); },
function (resource) { return true; },
let counter = 0;
let destroyed = [];
const key = 'bolt://localhost:7687';
const pool = new Pool(
(url, release) => new Resource(url, counter++, release),
resource => {
destroyed.push(resource);
},
resource => true,
2 // maxIdle
);

// When
var r0 = pool.acquire(key);
var r1 = pool.acquire(key);
var r2 = pool.acquire(key);
const r0 = pool.acquire(key);
const r1 = pool.acquire(key);
const r2 = pool.acquire(key);
r0.close();
r1.close();
r2.close();

// Then
expect( destroyed.length ).toBe( 1 );
expect( destroyed[0].id ).toBe( r2.id );
expect(destroyed.length).toBe(1);
expect(destroyed[0].id).toBe(r2.id);
});

it('frees if validate returns false', function() {
it('frees if validate returns false', () => {
// Given a pool that allocates
var counter = 0,
destroyed = [];
var key = "bolt://localhost:7687";
var pool = new Pool(
function (url, release) { return new Resource(url, counter++, release) },
function (resource) { destroyed.push(resource); },
function (resource) { return false; },
let counter = 0;
let destroyed = [];
const key = 'bolt://localhost:7687';
const pool = new Pool(
(url, release) => new Resource(url, counter++, release),
resource => {
destroyed.push(resource);
},
resource => false,
1000 // maxIdle
);

// When
var r0 = pool.acquire(key);
var r1 = pool.acquire(key);
const r0 = pool.acquire(key);
const r1 = pool.acquire(key);
r0.close();
r1.close();

// Then
expect( destroyed.length ).toBe( 2 );
expect( destroyed[0].id ).toBe( r0.id );
expect( destroyed[1].id ).toBe( r1.id );
expect(destroyed.length).toBe(2);
expect(destroyed[0].id).toBe(r0.id);
expect(destroyed[1].id).toBe(r1.id);
});


it('purges keys', function() {
it('purges keys', () => {
// Given a pool that allocates
var counter = 0;
var key1 = "bolt://localhost:7687";
var key2 = "bolt://localhost:7688";
var pool = new Pool( function (url, release) { return new Resource(url, counter++, release) },
function (res) {res.destroyed = true; return true}
let counter = 0;
const key1 = 'bolt://localhost:7687';
const key2 = 'bolt://localhost:7688';
const pool = new Pool((url, release) => new Resource(url, counter++, release),
res => {
res.destroyed = true;
return true;
}
);

// When
var r0 = pool.acquire(key1);
var r1 = pool.acquire(key2);
const r0 = pool.acquire(key1);
const r1 = pool.acquire(key2);
r0.close();
r1.close();
expect(pool.has(key1)).toBe(true);
expect(pool.has(key2)).toBe(true);
expect(pool.has(key1)).toBeTruthy();
expect(pool.has(key2)).toBeTruthy();
pool.purge(key1);
expect(pool.has(key1)).toBe(false);
expect(pool.has(key2)).toBe(true);
expect(pool.has(key1)).toBeFalsy();
expect(pool.has(key2)).toBeTruthy();

var r2 = pool.acquire(key1);
var r3 = pool.acquire(key2);
const r2 = pool.acquire(key1);
const r3 = pool.acquire(key2);

// Then
expect( r0.id ).toBe( 0 );
expect( r0.destroyed ).toBe( true );
expect( r1.id ).toBe( 1 );
expect( r2.id ).toBe( 2 );
expect( r3.id ).toBe( 1 );
expect(r0.id).toBe(0);
expect(r0.destroyed).toBeTruthy();
expect(r1.id).toBe(1);
expect(r2.id).toBe(2);
expect(r3.id).toBe(1);
});

it('destroys resource when key was purged', () => {
let counter = 0;
const key = 'bolt://localhost:7687';
const pool = new Pool((url, release) => new Resource(url, counter++, release),
res => {
res.destroyed = true;
return true;
}
);

const r0 = pool.acquire(key);
expect(pool.has(key)).toBeTruthy();
expect(r0.id).toEqual(0);

pool.purge(key);
expect(pool.has(key)).toBeFalsy();
expect(r0.destroyed).toBeFalsy();

r0.close();
expect(pool.has(key)).toBeFalsy();
expect(r0.destroyed).toBeTruthy();
});

it('purges all keys', () => {
let counter = 0;

const key1 = 'bolt://localhost:7687';
const key2 = 'bolt://localhost:7688';
const key3 = 'bolt://localhost:7689';

const pool = new Pool((url, release) => new Resource(url, counter++, release),
res => {
res.destroyed = true;
return true;
}
);

const acquiredResources = [
pool.acquire(key1),
pool.acquire(key2),
pool.acquire(key3),
pool.acquire(key1),
pool.acquire(key2),
pool.acquire(key3)
];
acquiredResources.forEach(resource => resource.close());

pool.purgeAll();

acquiredResources.forEach(resource => expect(resource.destroyed).toBeTruthy());
});

it('skips broken connections during acquire', () => {
let validated = false;
let counter = 0;
const key = 'bolt://localhost:7687';
const pool = new Pool((url, release) => new Resource(url, counter++, release),
res => {
res.destroyed = true;
return true;
},
() => {
if (validated) {
return false;
}
validated = true;
return true;
}
);

const r0 = pool.acquire(key);
r0.close();

const r1 = pool.acquire(key);
expect(r1).not.toBe(r0);
});

it('reports presence of the key', () => {
const existingKey = 'bolt://localhost:7687';
const absentKey = 'bolt://localhost:7688';

const pool = new Pool((url, release) => new Resource(url, 42, release));

pool.acquire(existingKey);
pool.acquire(existingKey);

expect(pool.has(existingKey)).toBeTruthy();
expect(pool.has(absentKey)).toBeFalsy();
});
});

function Resource( key, id, release) {
var self = this;
this.id = id;
this.close = function() { release(key, self); };
class Resource {

constructor(key, id, release) {
this.id = id;
this.key = key;
this.release = release;
this.destroyed = false;
}

close() {
this.release(this.key, this);
}
}
14 changes: 12 additions & 2 deletions test/v1/routing.driver.boltkit.it.js
Original file line number Diff line number Diff line change
Expand Up @@ -747,13 +747,19 @@ describe('routing driver', () => {
const session1 = driver.session(neo4j.session.WRITE);
session1.run("CREATE (n {name:'Bob'})").then(() => {
session1.close(() => {
const connections = Object.keys(driver._openSessions).length;
const openConnectionsCount = numberOfOpenConnections(driver);
const session2 = driver.session(neo4j.session.WRITE);
session2.run("CREATE ()").then(() => {
// driver should have same amount of open connections at this point;
// no new connections should be created, existing connections should be reused
expect(numberOfOpenConnections(driver)).toEqual(openConnectionsCount);
driver.close();

// all connections should be closed when driver is closed
expect(numberOfOpenConnections(driver)).toEqual(0);

seedServer.exit(code1 => {
writeServer.exit(code2 => {
expect(connections).toEqual(Object.keys(driver._openSessions).length);
expect(code1).toEqual(0);
expect(code2).toEqual(0);
done();
Expand Down Expand Up @@ -2064,6 +2070,10 @@ describe('routing driver', () => {
return '[' + array.map(s => '"' + s + '"').join(',') + ']';
}

function numberOfOpenConnections(driver) {
return Object.keys(driver._openSessions).length;
}

class MemorizingRoutingTable extends RoutingTable {

constructor(initialTable) {
Expand Down