Skip to content

Connection Hint: supporting connection.recv_timeout_seconds #761

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions bolt-connection/src/channel/browser/browser-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ export default class WebSocketChannel {
})
}

/**
* Setup the receive timeout for the channel.
*
* Not supported for the browser channel.
*
* @param {number} receiveTimeout The amount of time the channel will keep without receive any data before timeout (ms)
* @returns {void}
*/
setupReceiveTimeout (receiveTimeout) {}

/**
* Set connection timeout on the given WebSocket, if configured.
* @return {number} the timeout id or null.
Expand Down
43 changes: 37 additions & 6 deletions bolt-connection/src/channel/node/node-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ const TrustStrategy = {
* @param {function} onFailure - callback to execute on connection failure.
* @return {*} socket connection.
*/
function connect (config, onSuccess, onFailure = () => null) {
function _connect (config, onSuccess, onFailure = () => null) {
const trustStrategy = trustStrategyName(config)
if (!isEncrypted(config)) {
const socket = net.connect(
Expand Down Expand Up @@ -230,7 +230,7 @@ export default class NodeChannel {
* Create new instance
* @param {ChannelConfig} config - configuration for this channel.
*/
constructor (config) {
constructor (config, connect = _connect) {
const self = this

this.id = _CONNECTION_IDGEN++
Expand Down Expand Up @@ -305,12 +305,12 @@ export default class NodeChannel {
_setupConnectionTimeout (config, socket) {
const timeout = config.connectionTimeout
if (timeout) {
socket.on('connect', () => {
const connectListener = () => {
// connected - clear connection timeout
socket.setTimeout(0)
})
}

socket.on('timeout', () => {
const timeoutListener = () => {
// timeout fired - not connected within configured time. cancel timeout and destroy socket
socket.setTimeout(0)
socket.destroy(
Expand All @@ -319,12 +319,43 @@ export default class NodeChannel {
config.connectionErrorCode
)
)
})
}

socket.on('connect', connectListener)
socket.on('timeout', timeoutListener)

this._removeConnectionTimeoutListeners = () => {
this._conn.off('connect', connectListener)
this._conn.off('timeout', timeoutListener)
}

socket.setTimeout(timeout)
}
}

/**
* Setup the receive timeout for the channel.
*
* @param {number} receiveTimeout The amount of time the channel will keep without receive any data before timeout (ms)
* @returns {void}
*/
setupReceiveTimeout (receiveTimeout) {
if (this._removeConnectionTimeoutListeners) {
this._removeConnectionTimeoutListeners()
}

this._conn.on('timeout', () => {
this._conn.destroy(
newError(
`Connection lost. Server didn't respond in ${receiveTimeout}ms`,
this._connectionErrorCode
)
)
})

this._conn.setTimeout(receiveTimeout)
}

/**
* Write the passed in buffer to connection
* @param {NodeBuffer} buffer - Buffer to write
Expand Down
21 changes: 20 additions & 1 deletion bolt-connection/src/connection/connection-channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/

import { Chunker, Dechunker, ChannelConfig, Channel } from '../channel'
import { newError, error, json, internal } from 'neo4j-driver-core'
import { newError, error, json, internal, toNumber } from 'neo4j-driver-core'
import Connection from './connection'
import Bolt from '../bolt'

Expand Down Expand Up @@ -198,6 +198,25 @@ export default class ChannelConnection extends Connection {
if (!this.databaseId) {
this.databaseId = dbConnectionId
}

if (metadata.hints) {
const receiveTimeoutRaw =
metadata.hints['connection.recv_timeout_seconds']
if (
receiveTimeoutRaw !== null &&
receiveTimeoutRaw !== undefined
) {
const receiveTimeoutInSeconds = toNumber(receiveTimeoutRaw)
if (receiveTimeoutInSeconds > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if even feasible in JS's limited number type, but we also don't expect floats from the server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will create tests with float number to limit this option. One question, should 3.0 be considered an invalid value?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how the JS drivers handles numbers internally. But the server is expected to sent an Integer (strongly typed through packstream). It might well be, however, that at this point in the code, the type information is already lost. In that case, I'd say 3.0 is also ok.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At that point, I already lost this information. Since 3 is the same as 3.0 for JS. So, I'm verifying if it's a Integer using the JS Number definition of integer.

this._ch.setupReceiveTimeout(receiveTimeoutInSeconds * 1000)
} else {
this._log.info(
`Server located at ${this._address} supplied an invalid connection receive timeout value (${receiveTimeoutInSeconds}). ` +
'Please, verify the server configuration and status because this can a symptom of a bigger issue.'
)
}
}
}
}
resolve(self)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
* limitations under the License.
*/

import WebSocketChannel from '../../../bolt-connection/lib/channel/browser/browser-channel'
import ChannelConfig from '../../../bolt-connection/lib/channel/channel-config'
import WebSocketChannel from '../../../src/channel/browser/browser-channel'
import ChannelConfig from '../../../src/channel/channel-config'
import { error, internal } from 'neo4j-driver-core'
import { setTimeoutMock } from '../timers-util'
import { setTimeoutMock } from '../../timers-util'

const {
serverAddress: { ServerAddress },
Expand All @@ -35,7 +35,7 @@ const WS_CLOSING = 2
const WS_CLOSED = 3

/* eslint-disable no-global-assign */
describe('#unit WebSocketChannel', () => {
describe('WebSocketChannel', () => {
let webSocketChannel

afterEach(async () => {
Expand Down Expand Up @@ -173,7 +173,7 @@ describe('#unit WebSocketChannel', () => {
createWebSocketFactory(WS_CLOSED)
)

await expectAsync(channel.close()).toBeResolved()
await expect(channel.close()).resolves.not.toThrow()
})

it('should resolve close when websocket is closed', async () => {
Expand All @@ -186,7 +186,7 @@ describe('#unit WebSocketChannel', () => {
createWebSocketFactory(WS_OPEN)
)

await expectAsync(channel.close()).toBeResolved()
await expect(channel.close()).resolves.not.toThrow()
})

function testFallbackToLiteralIPv6 (boltAddress, expectedWsAddress) {
Expand Down Expand Up @@ -294,6 +294,39 @@ describe('#unit WebSocketChannel', () => {
}
})

describe('.setupReceiveTimeout()', () => {
beforeEach(() => {
const address = ServerAddress.fromUrl('http://localhost:8989')
const channelConfig = new ChannelConfig(
address,
{ connectionTimeout: 0 },
SERVICE_UNAVAILABLE
)
webSocketChannel = new WebSocketChannel(
channelConfig,
undefined,
createWebSocketFactory(WS_OPEN)
)
})

it('should exists', () => {
expect(webSocketChannel).toHaveProperty('setupReceiveTimeout')
expect(typeof webSocketChannel.setupReceiveTimeout).toBe('function')
})

it('should not setTimeout', () => {
const fakeSetTimeout = setTimeoutMock.install()
try {
webSocketChannel.setupReceiveTimeout()

expect(fakeSetTimeout._timeoutIdCounter).toEqual(0)
expect(webSocketChannel._connectionTimeoutId).toBe(null)
} finally {
fakeSetTimeout.uninstall()
}
})
})

function createWebSocketFactory (readyState) {
const ws = {}
ws.readyState = readyState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* limitations under the License.
*/

import BrowserHostNameResolver from '../../../bolt-connection/lib/channel/browser/browser-host-name-resolver'
import BrowserHostNameResolver from '../../../src/channel/browser/browser-host-name-resolver'

describe('#unit BrowserHostNameResolver', () => {
it('should resolve given address to itself', done => {
Expand Down
138 changes: 138 additions & 0 deletions bolt-connection/test/channel/node/node-channel.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/**
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import NodeChannel from '../../../src/channel/node/node-channel'
import ChannelConfig from '../../../src/channel/channel-config'
import { error, internal, newError } from 'neo4j-driver-core'

const {
serverAddress: { ServerAddress }
} = internal

const { SERVICE_UNAVAILABLE } = error

describe('NodeChannel', () => {
it('should resolve close if websocket is already closed', () => {
const address = ServerAddress.fromUrl('bolt://localhost:9999')
const channelConfig = new ChannelConfig(address, {}, SERVICE_UNAVAILABLE)
const channel = new NodeChannel(channelConfig)

// Modify the connection to be closed
channel._open = false

return expect(channel.close()).resolves.not.toThrow()
})

it('should resolve close when websocket is connected', () => {
const channel = createMockedChannel(true)

return expect(channel.close()).resolves.not.toThrow()
})

describe('.setupReceiveTimeout()', () => {
it('should call socket.setTimeout(receiveTimeout)', () => {
const receiveTimeout = 42
const channel = createMockedChannel(true)

channel.setupReceiveTimeout(receiveTimeout)

expect(channel._conn.getCalls().setTimeout[1]).toEqual([receiveTimeout])
})

it('should unsubscribe to the on connect and on timeout created on the create socket', () => {
const receiveTimeout = 42
const channel = createMockedChannel(true)

channel.setupReceiveTimeout(receiveTimeout)

expect(channel._conn.getCalls().on.slice(0, 2)).toEqual(
channel._conn.getCalls().off
)
})

it('should destroy the connection when time out', () => {
const receiveTimeout = 42
const channel = createMockedChannel(true)

channel.setupReceiveTimeout(receiveTimeout)

const [event, listener] = channel._conn.getCalls().on[2]
expect(event).toEqual('timeout')
listener()

expect(channel._conn.getCalls().destroy).toEqual([
[
newError(
"Connection lost. Server didn't respond in 42ms",
SERVICE_UNAVAILABLE
)
]
])
})

it('should not unsubscribe to the any on connect or on timeout if connectionTimeout is not set', () => {
const receiveTimeout = 42
const channel = createMockedChannel(true, { connectionTimeout: 0 })

channel.setupReceiveTimeout(receiveTimeout)

expect(channel._conn.getCalls().off).toEqual([])
})
})
})

function createMockedChannel (connected, config = {}) {
let endCallback = null
const address = ServerAddress.fromUrl('bolt://localhost:9999')
const channelConfig = new ChannelConfig(address, config, SERVICE_UNAVAILABLE)
const socketFactory = () => {
const on = []
const off = []
const setTimeout = []
const destroy = []
return {
destroyed: false,
destroy: () => {
destroy.push([...arguments])
},
end: () => {
channel._open = false
endCallback()
},
removeListener: () => {},
on: (key, cb) => {
on.push([...arguments])
if (key === 'end') {
endCallback = cb
}
},
off: () => {
off.push([...arguments])
},
setTimeout: () => {
setTimeout.push([...arguments])
},
getCalls: () => {
return { on, off, setTimeout, destroy }
}
}
}
const channel = new NodeChannel(channelConfig, socketFactory)
channel._open = connected
return channel
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* limitations under the License.
*/

import NodeHostNameResolver from '../../../bolt-connection/lib/channel/node/node-host-name-resolver'
import NodeHostNameResolver from '../../../src/channel/node/node-host-name-resolver'
import { internal } from 'neo4j-driver-core'

const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1719,7 +1719,6 @@ describe('#unit RoutingConnectionProvider', () => {
database: 'databaseX'
})
} catch (error) {
console.error('Message', error)
expect(error instanceof Neo4jError).toBeTruthy()
expect(error.code).toBe(SERVICE_UNAVAILABLE)
expect(error.message).toContain(
Expand Down
Loading