Skip to content

Introduce sessionConnectionTimeout and updateRoutingTableTimeout #966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
ConnectionErrorHandler
} from '../connection'
import { internal, error } from 'neo4j-driver-core'
import { controller } from '../lang'

const {
constants: { BOLT_PROTOCOL_V3, BOLT_PROTOCOL_V4_0, BOLT_PROTOCOL_V4_4 }
Expand All @@ -49,12 +50,17 @@ export default class DirectConnectionProvider extends PooledConnectionProvider {
this._handleAuthorizationExpired(error, address, database)
})

return this._connectionPool
.acquire(this._address)
.then(
connection =>
new DelegateConnection(connection, databaseSpecificErrorHandler)
)
const acquireConnectionJob = {
run: () => this._connectionPool
.acquire(this._address)
.then(
connection =>
new DelegateConnection(connection, databaseSpecificErrorHandler)
),
onTimeout: connection => connection._release()
}

return controller.runWithTimeout(this._sessionConnectionTimeoutConfig, acquireConnectionJob)
}

_handleAuthorizationExpired (error, address, database) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import { createChannelConnection, ConnectionErrorHandler } from '../connection'
import Pool, { PoolConfig } from '../pool'
import { error, ConnectionProvider, ServerInfo } from 'neo4j-driver-core'
import { error, newError, ConnectionProvider, ServerInfo } from 'neo4j-driver-core'

const { SERVICE_UNAVAILABLE } = error
export default class PooledConnectionProvider extends ConnectionProvider {
Expand Down Expand Up @@ -58,6 +58,12 @@ export default class PooledConnectionProvider extends ConnectionProvider {
log: this._log
})
this._openConnections = {}
this._sessionConnectionTimeoutConfig = {
timeout: this._config.sessionConnectionTimeout,
reason: () => newError(
`Session acquisition timed out in ${this._config.sessionConnectionTimeout} ms.`
)
}
}

_createConnectionErrorHandler () {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { HostNameResolver } from '../channel'
import SingleConnectionProvider from './connection-provider-single'
import PooledConnectionProvider from './connection-provider-pooled'
import { LeastConnectedLoadBalancingStrategy } from '../load-balancing'
import { controller } from '../lang'
import {
createChannelConnection,
ConnectionErrorHandler,
Expand Down Expand Up @@ -75,6 +76,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
)
})

this._updateRoutingTableTimeoutConfig = {
timeout: this._config.updateRoutingTableTimeout,
reason: () => newError(
`Routing table update timed out in ${this._config.updateRoutingTableTimeout} ms.`
)
}

this._routingContext = { ...routingContext, address: address.toString() }
this._seedRouter = address
this._rediscovery = new Rediscovery(this._routingContext)
Expand Down Expand Up @@ -143,53 +151,66 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._handleAuthorizationExpired(error, address, context.database)
)

const routingTable = await this._freshRoutingTable({
accessMode,
database: context.database,
bookmarks: bookmarks,
impersonatedUser,
onDatabaseNameResolved: (databaseName) => {
context.database = context.database || databaseName
if (onDatabaseNameResolved) {
onDatabaseNameResolved(databaseName)
const refreshRoutingTableJob = {
run: async (_, cancelationToken) => {
const routingTable = await this._freshRoutingTable({
accessMode,
database: context.database,
bookmarks: bookmarks,
impersonatedUser,
onDatabaseNameResolved: (databaseName) => {
context.database = context.database || databaseName
if (onDatabaseNameResolved) {
onDatabaseNameResolved(databaseName)
}
},
cancelationToken
})

// select a target server based on specified access mode
if (accessMode === READ) {
address = this._loadBalancingStrategy.selectReader(routingTable.readers)
name = 'read'
} else if (accessMode === WRITE) {
address = this._loadBalancingStrategy.selectWriter(routingTable.writers)
name = 'write'
} else {
throw newError('Illegal mode ' + accessMode)
}
}
})

// select a target server based on specified access mode
if (accessMode === READ) {
address = this._loadBalancingStrategy.selectReader(routingTable.readers)
name = 'read'
} else if (accessMode === WRITE) {
address = this._loadBalancingStrategy.selectWriter(routingTable.writers)
name = 'write'
} else {
throw newError('Illegal mode ' + accessMode)
}

// we couldn't select a target server
if (!address) {
throw newError(
`Failed to obtain connection towards ${name} server. Known routing table is: ${routingTable}`,
SESSION_EXPIRED
)
// we couldn't select a target server
if (!address) {
throw newError(
`Failed to obtain connection towards ${name} server. Known routing table is: ${routingTable}`,
SESSION_EXPIRED
)
}
return { routingTable, address }
}
}

try {
const connection = await this._acquireConnectionToServer(
address,
name,
routingTable
)
const acquireConnectionJob = {
run: async ({ routingTable, address }) => {
try {
const connection = await this._acquireConnectionToServer(
address,
name,
routingTable
)

return new DelegateConnection(connection, databaseSpecificErrorHandler)
} catch (error) {
const transformed = databaseSpecificErrorHandler.handleAndTransformError(
error,
address
)
throw transformed
return new DelegateConnection(connection, databaseSpecificErrorHandler)
} catch (error) {
const transformed = databaseSpecificErrorHandler.handleAndTransformError(
error,
address
)
throw transformed
}
},
onTimeout: connection => connection._release()
}

return controller.runWithTimeout(this._sessionConnectionTimeoutConfig, refreshRoutingTableJob, acquireConnectionJob)
}

async _hasProtocolVersion (versionPredicate) {
Expand Down Expand Up @@ -301,22 +322,28 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
return this._connectionPool.acquire(address)
}

_freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved } = {}) {
const currentRoutingTable = this._routingTableRegistry.get(
database,
() => new RoutingTable({ database })
)
_freshRoutingTable ({ accessMode, database, bookmarks, impersonatedUser, onDatabaseNameResolved, cancelationToken = new controller.CancelationToken(() => false) } = {}) {
const refreshRoutingTableJob = {
run: (_, refreshCancelationToken) => {
const combinedCancelationToken = refreshCancelationToken.combine(cancelationToken)
const currentRoutingTable = this._routingTableRegistry.get(
database,
() => new RoutingTable({ database })
)

if (!currentRoutingTable.isStaleFor(accessMode)) {
return currentRoutingTable
if (!currentRoutingTable.isStaleFor(accessMode)) {
return currentRoutingTable
}
this._log.info(
`Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}`
)
return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, combinedCancelationToken)
}
}
this._log.info(
`Routing table is stale for database: "${database}" and access mode: "${accessMode}": ${currentRoutingTable}`
)
return this._refreshRoutingTable(currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved)
return controller.runWithTimeout(this._updateRoutingTableTimeoutConfig, refreshRoutingTableJob)
}

_refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved) {
_refreshRoutingTable (currentRoutingTable, bookmarks, impersonatedUser, onDatabaseNameResolved, cancelationToken) {
const knownRouters = currentRoutingTable.routers

if (this._useSeedRouter) {
Expand All @@ -325,15 +352,17 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
currentRoutingTable,
bookmarks,
impersonatedUser,
onDatabaseNameResolved
onDatabaseNameResolved,
cancelationToken
)
}
return this._fetchRoutingTableFromKnownRoutersFallbackToSeedRouter(
knownRouters,
currentRoutingTable,
bookmarks,
impersonatedUser,
onDatabaseNameResolved
onDatabaseNameResolved,
cancelationToken
)
}

Expand All @@ -342,7 +371,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
currentRoutingTable,
bookmarks,
impersonatedUser,
onDatabaseNameResolved
onDatabaseNameResolved,
cancelationToken
) {
// we start with seed router, no routers were probed before
const seenRouters = []
Expand All @@ -351,7 +381,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._seedRouter,
currentRoutingTable,
bookmarks,
impersonatedUser
impersonatedUser,
cancelationToken
)

if (newRoutingTable) {
Expand All @@ -362,7 +393,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
knownRouters,
currentRoutingTable,
bookmarks,
impersonatedUser
impersonatedUser,
cancelationToken
)
newRoutingTable = newRoutingTable2
error = error2 || error
Expand All @@ -381,13 +413,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
currentRoutingTable,
bookmarks,
impersonatedUser,
onDatabaseNameResolved
onDatabaseNameResolved,
cancelationToken
) {
let [newRoutingTable, error] = await this._fetchRoutingTableUsingKnownRouters(
knownRouters,
currentRoutingTable,
bookmarks,
impersonatedUser
impersonatedUser,
cancelationToken
)

if (!newRoutingTable) {
Expand All @@ -397,7 +431,8 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
this._seedRouter,
currentRoutingTable,
bookmarks,
impersonatedUser
impersonatedUser,
cancelationToken
)
}

Expand All @@ -413,13 +448,15 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
knownRouters,
currentRoutingTable,
bookmarks,
impersonatedUser
impersonatedUser,
cancelationToken
) {
const [newRoutingTable, error] = await this._fetchRoutingTable(
knownRouters,
currentRoutingTable,
bookmarks,
impersonatedUser
impersonatedUser,
cancelationToken
)

if (newRoutingTable) {
Expand All @@ -444,16 +481,19 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
seedRouter,
routingTable,
bookmarks,
impersonatedUser
impersonatedUser,
cancelationToken
) {
const resolvedAddresses = await this._resolveSeedRouter(seedRouter)

cancelationToken.throwIfCancellationRequested()

// filter out all addresses that we've already tried
const newAddresses = resolvedAddresses.filter(
address => seenRouters.indexOf(address) < 0
)

return await this._fetchRoutingTable(newAddresses, routingTable, bookmarks, impersonatedUser)
return await this._fetchRoutingTable(newAddresses, routingTable, bookmarks, impersonatedUser, cancelationToken)
}

async _resolveSeedRouter (seedRouter) {
Expand All @@ -465,7 +505,7 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
return [].concat.apply([], dnsResolvedAddresses)
}

async _fetchRoutingTable (routerAddresses, routingTable, bookmarks, impersonatedUser) {
async _fetchRoutingTable (routerAddresses, routingTable, bookmarks, impersonatedUser, cancelationToken) {
return routerAddresses.reduce(
async (refreshedTablePromise, currentRouter, currentIndex) => {
const [newRoutingTable] = await refreshedTablePromise
Expand Down Expand Up @@ -499,11 +539,13 @@ export default class RoutingConnectionProvider extends PooledConnectionProvider
impersonatedUser
), null]
} catch (error) {
cancelationToken.throwIfCancellationRequested()
return this._handleRediscoveryError(error, currentRouter)
} finally {
session.close()
await session.close()
}
} else {
cancelationToken.throwIfCancellationRequested()
// unable to acquire connection and create session towards the current router
// return null to signal that the next router should be tried
return [null, error]
Expand Down
Loading