Skip to content

Bolt V2 with point support #335

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/v1/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import VERSION from '../version';
import {assertString, isEmptyObjectOrNull} from './internal/util';
import urlUtil from './internal/url-util';
import HttpDriver from './internal/http/http-driver';
import {Point} from './spatial-types';

/**
* @property {function(username: string, password: string, realm: ?string)} basic the function to create a
Expand Down Expand Up @@ -205,7 +206,8 @@ const types = {
Path,
Result,
ResultSummary,
Record
Record,
Point
};

/**
Expand Down
6 changes: 5 additions & 1 deletion src/v1/internal/ch-dummy.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ class DummyChannel {
}

close(cb) {
this.written = [];
this.clear();
if (cb) {
return cb();
}
}

clear() {
this.written = [];
}
}

const channel = DummyChannel;
Expand Down
128 changes: 34 additions & 94 deletions src/v1/internal/connector.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
import WebSocketChannel from './ch-websocket';
import NodeChannel from './ch-node';
import {Chunker, Dechunker} from './chunking';
import {Packer, Unpacker} from './packstream';
import packStreamUtil from './packstream-util';
import {alloc} from './buf';
import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types';
import {newError} from './../error';
import ChannelConfig from './ch-config';
import urlUtil from './url-util';
Expand Down Expand Up @@ -54,11 +53,6 @@ RECORD = 0x71, // 0111 0001 // RECORD <value>
IGNORED = 0x7E, // 0111 1110 // IGNORED <metadata>
FAILURE = 0x7F, // 0111 1111 // FAILURE <metadata>

// Signature bytes for higher-level graph objects
NODE = 0x4E,
RELATIONSHIP = 0x52,
UNBOUND_RELATIONSHIP = 0x72,
PATH = 0x50,
//sent before version negotiation
MAGIC_PREAMBLE = 0x6060B017,
DEBUG = false;
Expand All @@ -85,66 +79,6 @@ let NO_OP_OBSERVER = {
onError : NO_OP
};

/** Maps from packstream structures to Neo4j domain objects */
let _mappers = {
node : ( unpacker, buf ) => {
return new Node(
unpacker.unpack(buf), // Identity
unpacker.unpack(buf), // Labels
unpacker.unpack(buf) // Properties
);
},
rel : ( unpacker, buf ) => {
return new Relationship(
unpacker.unpack(buf), // Identity
unpacker.unpack(buf), // Start Node Identity
unpacker.unpack(buf), // End Node Identity
unpacker.unpack(buf), // Type
unpacker.unpack(buf) // Properties
);
},
unboundRel : ( unpacker, buf ) => {
return new UnboundRelationship(
unpacker.unpack(buf), // Identity
unpacker.unpack(buf), // Type
unpacker.unpack(buf) // Properties
);
},
path : ( unpacker, buf ) => {
let nodes = unpacker.unpack(buf),
rels = unpacker.unpack(buf),
sequence = unpacker.unpack(buf);
let prevNode = nodes[0],
segments = [];

for (let i = 0; i < sequence.length; i += 2) {
let relIndex = sequence[i],
nextNode = nodes[sequence[i + 1]],
rel;
if (relIndex > 0) {
rel = rels[relIndex - 1];
if( rel instanceof UnboundRelationship ) {
// To avoid duplication, relationships in a path do not contain
// information about their start and end nodes, that's instead
// inferred from the path sequence. This is us inferring (and,
// for performance reasons remembering) the start/end of a rel.
rels[relIndex - 1] = rel = rel.bind(prevNode.identity, nextNode.identity);
}
} else {
rel = rels[-relIndex - 1];
if( rel instanceof UnboundRelationship ) {
// See above
rels[-relIndex - 1] = rel = rel.bind(nextNode.identity, prevNode.identity);
}
}
// Done hydrating one path segment.
segments.push( new PathSegment( prevNode, rel, nextNode ) );
prevNode = nextNode;
}
return new Path(nodes[0], nodes[nodes.length - 1], segments );
}
};

/**
* A connection manages sending and recieving messages over a channel. A
* connector is very closely tied to the Bolt protocol, it implements the
Expand Down Expand Up @@ -175,13 +109,16 @@ class Connection {
this.url = url;
this.server = {address: url};
this.creationTimestamp = Date.now();
this._disableLosslessIntegers = disableLosslessIntegers;
this._pendingObservers = [];
this._currentObserver = undefined;
this._ch = channel;
this._dechunker = new Dechunker();
this._chunker = new Chunker( channel );
this._packer = new Packer(this._chunker);
this._unpacker = new Unpacker(disableLosslessIntegers);

// initially assume that database supports latest Bolt version, create latest packer and unpacker
this._packer = packStreamUtil.createLatestPacker(this._chunker);
this._unpacker = packStreamUtil.createLatestUnpacker(disableLosslessIntegers);

this._isHandlingFailure = false;
this._currentFailure = null;
Expand All @@ -191,34 +128,18 @@ class Connection {
// Set to true on fatal errors, to get this out of session pool.
this._isBroken = false;

// For deserialization, explain to the unpacker how to unpack nodes, rels, paths;
this._unpacker.structMappers[NODE] = _mappers.node;
this._unpacker.structMappers[RELATIONSHIP] = _mappers.rel;
this._unpacker.structMappers[UNBOUND_RELATIONSHIP] = _mappers.unboundRel;
this._unpacker.structMappers[PATH] = _mappers.path;

let self = this;
// TODO: Using `onmessage` and `onerror` came from the WebSocket API,
// it reads poorly and has several annoying drawbacks. Swap to having
// Channel extend EventEmitter instead, then we can use `on('data',..)`
this._ch.onmessage = (buf) => {
let proposed = buf.readInt32();
if( proposed == 1 ) {
// Ok, protocol running. Simply forward all messages past
// this to the dechunker
self._ch.onmessage = (buf) => {
self._dechunker.write(buf);
};

if( buf.hasRemaining() ) {
self._dechunker.write(buf.readSlice( buf.remaining() ));
}
const proposed = buf.readInt32();
if (proposed == 1 || proposed == 2) {
this._initializeProtocol(proposed, buf);
} else if (proposed == 1213486160) {//server responded 1213486160 == 0x48545450 == "HTTP"
self._handleFatalError(newError("Server responded HTTP. Make sure you are not trying to connect to the http endpoint " +
"(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)"));
}
else {
self._handleFatalError(newError("Unknown Bolt protocol version: " + proposed));
this._handleFatalError(newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)'));
} else {
this._handleFatalError(newError('Unknown Bolt protocol version: ' + proposed));
}
};

Expand All @@ -235,21 +156,40 @@ class Connection {
}

this._dechunker.onmessage = (buf) => {
self._handleMessage( self._unpacker.unpack( buf ) );
this._handleMessage(this._unpacker.unpack(buf));
};

let handshake = alloc( 5 * 4 );
//magic preamble
handshake.writeInt32( MAGIC_PREAMBLE );
//proposed versions
handshake.writeInt32( 2 );
handshake.writeInt32( 1 );
handshake.writeInt32( 0 );
handshake.writeInt32( 0 );
handshake.writeInt32( 0 );
handshake.reset();
this._ch.write( handshake );
}

/**
* Complete protocol initialization.
* @param {number} version the selected protocol version.
* @param {BaseBuffer} buffer the handshake response buffer.
* @private
*/
_initializeProtocol(version, buffer) {
// re-create packer and unpacker because version might be lower than we initially assumed
this._packer = packStreamUtil.createPackerForProtocolVersion(version, this._chunker);
this._unpacker = packStreamUtil.createUnpackerForProtocolVersion(version, this._disableLosslessIntegers);

// Ok, protocol running. Simply forward all messages to the dechunker
this._ch.onmessage = buf => this._dechunker.write(buf);

if (buffer.hasRemaining()) {
this._dechunker.write(buffer.readSlice(buffer.remaining()));
}
}

/**
* "Fatal" means the connection is dead. Only call this if something
* happens that cannot be recovered from. This will lead to all subscribers
Expand Down
55 changes: 55 additions & 0 deletions src/v1/internal/packstream-util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Copyright (c) 2002-2018 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import * as v1 from './packstream-v1';
import * as v2 from './packstream-v2';

const PACKER_CONSTRUCTORS_BY_VERSION = [null, v1.Packer, v2.Packer];
const UNPACKER_CONSTRUCTORS_BY_VERSION = [null, v1.Unpacker, v2.Unpacker];

function createLatestPacker(chunker) {
return createPackerForProtocolVersion(PACKER_CONSTRUCTORS_BY_VERSION.length - 1, chunker);
}

function createLatestUnpacker(disableLosslessIntegers) {
return createUnpackerForProtocolVersion(UNPACKER_CONSTRUCTORS_BY_VERSION.length - 1, disableLosslessIntegers);
}

function createPackerForProtocolVersion(version, chunker) {
const packerConstructor = PACKER_CONSTRUCTORS_BY_VERSION[version];
if (!packerConstructor) {
throw new Error(`Packer can't be created for protocol version ${version}`);
}
return new packerConstructor(chunker);
}

function createUnpackerForProtocolVersion(version, disableLosslessIntegers) {
const unpackerConstructor = UNPACKER_CONSTRUCTORS_BY_VERSION[version];
if (!unpackerConstructor) {
throw new Error(`Unpacker can't be created for protocol version ${version}`);
}
return new unpackerConstructor(disableLosslessIntegers);
}

export default {
createLatestPacker: createLatestPacker,
createLatestUnpacker: createLatestUnpacker,
createPackerForProtocolVersion: createPackerForProtocolVersion,
createUnpackerForProtocolVersion: createUnpackerForProtocolVersion
};
Loading