Skip to content

Multi-database routing #471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jul 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 52 additions & 7 deletions gulpfile.babel.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ const file = require('gulp-file')
const semver = require('semver')
const sharedNeo4j = require('./test/internal/shared-neo4j').default
const ts = require('gulp-typescript')
const JasmineConsoleReporter = require('jasmine-console-reporter')
const JasmineReporter = require('jasmine-spec-reporter').SpecReporter
const karma = require('karma')
const log = require('fancy-log')
const JasmineExec = require('jasmine')

/**
* Useful to investigate resource leaks in tests. Enable to see active sockets and file handles after the 'test' task.
Expand Down Expand Up @@ -112,6 +113,18 @@ gulp.task(
})
)

gulp.task('test-nodejs-unit', () => {
return runJasmineTests('#unit*')
})

gulp.task('test-nodejs-stub', () => {
return runJasmineTests('#stub*')
})

gulp.task('test-nodejs-integration', () => {
return runJasmineTests('#integration*')
})

gulp.task('run-browser-test-chrome', function (cb) {
runKarma('chrome', cb)
})
Expand Down Expand Up @@ -218,12 +231,23 @@ function logActiveNodeHandles () {
}

function newJasmineConsoleReporter () {
return new JasmineConsoleReporter({
colors: 1,
cleanStack: 1,
verbosity: 4,
listStyle: 'indent',
activity: false
return new JasmineReporter({
colors: {
enabled: true
},
spec: {
displayDuration: true,
displayErrorMessages: true,
displayStacktrace: true,
displayFailed: true,
displaySuccessful: true,
displayPending: false
},
summary: {
displayFailed: true,
displayStacktrace: true,
displayErrorMessages: true
}
})
}

Expand All @@ -237,3 +261,24 @@ function runKarma (browser, cb) {
}
).start()
}

function runJasmineTests (filterString) {
return new Promise((resolve, reject) => {
const jasmine = new JasmineExec()
jasmine.loadConfigFile('./spec/support/jasmine.json')
jasmine.loadHelpers()
jasmine.loadSpecs()
jasmine.configureDefaultReporter({
print: () => {}
})
jasmine.addReporter(newJasmineConsoleReporter())
jasmine.onComplete(passed => {
if (passed) {
resolve()
} else {
reject(new Error('tests failed'))
}
})
jasmine.execute(null, filterString)
})
}
1,149 changes: 503 additions & 646 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
"gulp-uglify": "^3.0.2",
"gulp-watch": "^5.0.1",
"husky": "^2.3.0",
"jasmine-console-reporter": "^3.1.0",
"jasmine-spec-reporter": "^4.2.1",
"karma": "^4.1.0",
"karma-browserify": "^6.0.0",
"karma-chrome-launcher": "^2.2.0",
Expand Down
7 changes: 7 additions & 0 deletions spec/support/jasmine.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"spec_dir": "test",
"spec_files": ["**/*.test.js", "!**/browser/*.js"],
"helpers": ["../node_modules/@babel/register/lib/node.js"],
"stopSpecOnExpectationFailure": false,
"random": true
}
135 changes: 22 additions & 113 deletions src/driver.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import Session from './session'
import Pool from './internal/pool'
import Connection from './internal/connection'
import ChannelConnection from './internal/connection-channel'
import { newError, SERVICE_UNAVAILABLE } from './error'
import { DirectConnectionProvider } from './internal/connection-providers'
import DirectConnectionProvider from './internal/connection-provider-direct'
import Bookmark from './internal/bookmark'
import ConnectivityVerifier from './internal/connectivity-verifier'
import PoolConfig, {
Expand Down Expand Up @@ -76,19 +76,9 @@ class Driver {
this._id = idGenerator++
this._address = address
this._userAgent = userAgent
this._openConnections = {}
this._authToken = authToken
this._config = config
this._log = Logger.create(config)
this._pool = new Pool({
create: this._createConnection.bind(this),
destroy: this._destroyConnection.bind(this),
validate: this._validateConnection.bind(this),
installIdleObserver: this._installIdleObserverOnConnection.bind(this),
removeIdleObserver: this._removeIdleObserverOnConnection.bind(this),
config: PoolConfig.fromDriverConfig(config),
log: this._log
})

/**
* Reference to the connection provider. Initialized lazily by {@link _getOrCreateConnectionProvider}.
Expand All @@ -111,73 +101,13 @@ class Driver {

/**
* Verifies connectivity of this driver by trying to open a connection with the provided driver options.
* @param {string} [db=''] the target database to verify connectivity for.
* @param {string} [database=''] the target database to verify connectivity for.
* @returns {Promise<object>} promise resolved with server info or rejected with error.
*/
verifyConnectivity ({ db = '' } = {}) {
verifyConnectivity ({ database = '' } = {}) {
const connectionProvider = this._getOrCreateConnectionProvider()
const connectivityVerifier = new ConnectivityVerifier(connectionProvider)
return connectivityVerifier.verify({ db })
}

/**
* Create a new connection and initialize it.
* @return {Promise<Connection>} promise resolved with a new connection or rejected when failed to connect.
* @access private
*/
_createConnection (address, release) {
const connection = Connection.create(
address,
this._config,
this._createConnectionErrorHandler(),
this._log
)
connection._release = () => release(address, connection)
this._openConnections[connection.id] = connection

return connection.connect(this._userAgent, this._authToken).catch(error => {
if (this.onError) {
// notify Driver.onError callback about connection initialization errors
this.onError(error)
}
// let's destroy this connection
this._destroyConnection(connection)
// propagate the error because connection failed to connect / initialize
throw error
})
}

/**
* Check that a connection is usable
* @return {boolean} true if the connection is open
* @access private
**/
_validateConnection (conn) {
if (!conn.isOpen()) {
return false
}

const maxConnectionLifetime = this._config.maxConnectionLifetime
const lifetime = Date.now() - conn.creationTimestamp
return lifetime <= maxConnectionLifetime
}

_installIdleObserverOnConnection (conn, observer) {
conn._queueObserver(observer)
}

_removeIdleObserverOnConnection (conn) {
conn._updateCurrentObserver()
}

/**
* Dispose of a connection.
* @return {Connection} the connection to dispose.
* @access private
*/
_destroyConnection (conn) {
delete this._openConnections[conn.id]
conn.close()
return connectivityVerifier.verify({ database })
}

/**
Expand All @@ -194,13 +124,13 @@ class Driver {
* @param {string} [defaultAccessMode=WRITE] the access mode of this session, allowed values are {@link READ} and {@link WRITE}.
* @param {string|string[]} [bookmarks=null] the initial reference or references to some previous
* transactions. Value is optional and absence indicates that that the bookmarks do not exist or are unknown.
* @param {string} [db=''] the database this session will connect to.
* @param {string} [database=''] the database this session will connect to.
* @return {Session} new session.
*/
session ({
defaultAccessMode = WRITE,
bookmarks: bookmarkOrBookmarks,
db = ''
database = ''
} = {}) {
const sessionMode = Driver._validateSessionMode(defaultAccessMode)
const connectionProvider = this._getOrCreateConnectionProvider()
Expand All @@ -209,7 +139,7 @@ class Driver {
: Bookmark.empty()
return new Session({
mode: sessionMode,
db,
database,
connectionProvider,
bookmark,
config: this._config
Expand All @@ -225,38 +155,27 @@ class Driver {
}

// Extension point
_createConnectionProvider (address, connectionPool, driverOnErrorCallback) {
return new DirectConnectionProvider(
address,
connectionPool,
driverOnErrorCallback
)
}

// Extension point
_createConnectionErrorHandler () {
return new ConnectionErrorHandler(SERVICE_UNAVAILABLE)
_createConnectionProvider (address, userAgent, authToken) {
return new DirectConnectionProvider({
id: this._id,
config: this._config,
log: this._log,
address: address,
userAgent: userAgent,
authToken: authToken
})
}

_getOrCreateConnectionProvider () {
if (!this._connectionProvider) {
const driverOnErrorCallback = this._driverOnErrorCallback.bind(this)
this._connectionProvider = this._createConnectionProvider(
this._address,
this._pool,
driverOnErrorCallback
this._userAgent,
this._authToken
)
}
return this._connectionProvider
}

_driverOnErrorCallback (error) {
const userDefinedOnErrorCallback = this.onError
if (userDefinedOnErrorCallback && error.code === SERVICE_UNAVAILABLE) {
userDefinedOnErrorCallback(error)
} else {
// we don't need to tell the driver about this error
}
return this._connectionProvider
}

/**
Expand All @@ -266,18 +185,8 @@ class Driver {
*/
close () {
this._log.info(`Driver ${this._id} closing`)

try {
// purge all idle connections in the connection pool
this._pool.purgeAll()
} finally {
// then close all connections driver has ever created
// it is needed to close connections that are active right now and are acquired from the pool
for (let connectionId in this._openConnections) {
if (this._openConnections.hasOwnProperty(connectionId)) {
this._openConnections[connectionId].close()
}
}
if (this._connectionProvider) {
this._connectionProvider.close()
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/internal/bolt-protocol-util.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ function assertTxConfigIsEmpty (txConfig, connection, observer) {

/**
* Asserts that the passed-in database name is empty.
* @param {string} db
* @param {string} database
* @param {Connection} connection
* @param {StreamObserver} observer
*/
function assertDbIsEmpty (db, connection, observer) {
if (db) {
function assertDatabaseIsEmpty (database, connection, observer) {
if (database) {
const error = newError(
'Driver is connected to the database that does not support multiple databases. ' +
'Please upgrade to neo4j 4.0.0 or later in order to use this functionality'
Expand All @@ -57,4 +57,4 @@ function assertDbIsEmpty (db, connection, observer) {
}
}

export { assertDbIsEmpty, assertTxConfigIsEmpty }
export { assertDatabaseIsEmpty, assertTxConfigIsEmpty }
19 changes: 11 additions & 8 deletions src/internal/bolt-protocol-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import * as v1 from './packstream-v1'
import Bookmark from './bookmark'
import TxConfig from './tx-config'
import { ACCESS_MODE_WRITE } from './constants'
import { assertDbIsEmpty, assertTxConfigIsEmpty } from './bolt-protocol-util'
import {
assertDatabaseIsEmpty,
assertTxConfigIsEmpty
} from './bolt-protocol-util'

export default class BoltProtocol {
/**
Expand Down Expand Up @@ -81,12 +84,12 @@ export default class BoltProtocol {
* @param {StreamObserver} observer the response observer.
* @param {Bookmark} bookmark the bookmark.
* @param {TxConfig} txConfig the configuration.
* @param {string} db the target database name.
* @param {string} database the target database name.
* @param {string} mode the access mode.
*/
beginTransaction (observer, { bookmark, txConfig, db, mode }) {
beginTransaction (observer, { bookmark, txConfig, database, mode }) {
assertTxConfigIsEmpty(txConfig, this._connection, observer)
assertDbIsEmpty(db, this._connection, observer)
assertDatabaseIsEmpty(database, this._connection, observer)

const runMessage = RequestMessage.run(
'BEGIN',
Expand Down Expand Up @@ -133,14 +136,14 @@ export default class BoltProtocol {
* @param {StreamObserver} observer the response observer.
* @param {Bookmark} bookmark the bookmark.
* @param {TxConfig} txConfig the auto-commit transaction configuration.
* @param {string} db the target database name.
* @param {string} database the target database name.
* @param {string} mode the access mode.
*/
run (statement, parameters, observer, { bookmark, txConfig, db, mode }) {
run (statement, parameters, observer, { bookmark, txConfig, database, mode }) {
// bookmark and mode are ignored in this version of the protocol
assertTxConfigIsEmpty(txConfig, this._connection, observer)
// passing in a db name on this protocol version throws an error
assertDbIsEmpty(db, this._connection, observer)
// passing in a database name on this protocol version throws an error
assertDatabaseIsEmpty(database, this._connection, observer)

const runMessage = RequestMessage.run(statement, parameters)
const pullAllMessage = RequestMessage.pullAll()
Expand Down
Loading