Skip to content

Commit 4b0a2a8

Browse files
committed
Bolt V2 negotiation
Allow driver to negotiate version 2 of the Bolt protocol and select `Packer`/`Unpacker` based on the agreed protocol version. Struct unpacking logic moved to the V1 unpacker. Driver initially tries to newest available `Packer`/`Unpacker` and downgrades them after the handshake response, if needed.
1 parent b401576 commit 4b0a2a8

File tree

8 files changed

+290
-114
lines changed

8 files changed

+290
-114
lines changed

src/v1/internal/ch-dummy.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,15 @@ class DummyChannel {
5858
}
5959

6060
close(cb) {
61-
this.written = [];
61+
this.clear();
6262
if (cb) {
6363
return cb();
6464
}
6565
}
66+
67+
clear() {
68+
this.written = [];
69+
}
6670
}
6771

6872
const channel = DummyChannel;

src/v1/internal/connector.js

Lines changed: 34 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020
import WebSocketChannel from './ch-websocket';
2121
import NodeChannel from './ch-node';
2222
import {Chunker, Dechunker} from './chunking';
23-
import {Packer, Unpacker} from './packstream';
23+
import packStreamUtil from './packstream-util';
2424
import {alloc} from './buf';
25-
import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types';
2625
import {newError} from './../error';
2726
import ChannelConfig from './ch-config';
2827
import urlUtil from './url-util';
@@ -54,11 +53,6 @@ RECORD = 0x71, // 0111 0001 // RECORD <value>
5453
IGNORED = 0x7E, // 0111 1110 // IGNORED <metadata>
5554
FAILURE = 0x7F, // 0111 1111 // FAILURE <metadata>
5655

57-
// Signature bytes for higher-level graph objects
58-
NODE = 0x4E,
59-
RELATIONSHIP = 0x52,
60-
UNBOUND_RELATIONSHIP = 0x72,
61-
PATH = 0x50,
6256
//sent before version negotiation
6357
MAGIC_PREAMBLE = 0x6060B017,
6458
DEBUG = false;
@@ -85,66 +79,6 @@ let NO_OP_OBSERVER = {
8579
onError : NO_OP
8680
};
8781

88-
/** Maps from packstream structures to Neo4j domain objects */
89-
let _mappers = {
90-
node : ( unpacker, buf ) => {
91-
return new Node(
92-
unpacker.unpack(buf), // Identity
93-
unpacker.unpack(buf), // Labels
94-
unpacker.unpack(buf) // Properties
95-
);
96-
},
97-
rel : ( unpacker, buf ) => {
98-
return new Relationship(
99-
unpacker.unpack(buf), // Identity
100-
unpacker.unpack(buf), // Start Node Identity
101-
unpacker.unpack(buf), // End Node Identity
102-
unpacker.unpack(buf), // Type
103-
unpacker.unpack(buf) // Properties
104-
);
105-
},
106-
unboundRel : ( unpacker, buf ) => {
107-
return new UnboundRelationship(
108-
unpacker.unpack(buf), // Identity
109-
unpacker.unpack(buf), // Type
110-
unpacker.unpack(buf) // Properties
111-
);
112-
},
113-
path : ( unpacker, buf ) => {
114-
let nodes = unpacker.unpack(buf),
115-
rels = unpacker.unpack(buf),
116-
sequence = unpacker.unpack(buf);
117-
let prevNode = nodes[0],
118-
segments = [];
119-
120-
for (let i = 0; i < sequence.length; i += 2) {
121-
let relIndex = sequence[i],
122-
nextNode = nodes[sequence[i + 1]],
123-
rel;
124-
if (relIndex > 0) {
125-
rel = rels[relIndex - 1];
126-
if( rel instanceof UnboundRelationship ) {
127-
// To avoid duplication, relationships in a path do not contain
128-
// information about their start and end nodes, that's instead
129-
// inferred from the path sequence. This is us inferring (and,
130-
// for performance reasons remembering) the start/end of a rel.
131-
rels[relIndex - 1] = rel = rel.bind(prevNode.identity, nextNode.identity);
132-
}
133-
} else {
134-
rel = rels[-relIndex - 1];
135-
if( rel instanceof UnboundRelationship ) {
136-
// See above
137-
rels[-relIndex - 1] = rel = rel.bind(nextNode.identity, prevNode.identity);
138-
}
139-
}
140-
// Done hydrating one path segment.
141-
segments.push( new PathSegment( prevNode, rel, nextNode ) );
142-
prevNode = nextNode;
143-
}
144-
return new Path(nodes[0], nodes[nodes.length - 1], segments );
145-
}
146-
};
147-
14882
/**
14983
* A connection manages sending and recieving messages over a channel. A
15084
* connector is very closely tied to the Bolt protocol, it implements the
@@ -175,13 +109,16 @@ class Connection {
175109
this.url = url;
176110
this.server = {address: url};
177111
this.creationTimestamp = Date.now();
112+
this._disableLosslessIntegers = disableLosslessIntegers;
178113
this._pendingObservers = [];
179114
this._currentObserver = undefined;
180115
this._ch = channel;
181116
this._dechunker = new Dechunker();
182117
this._chunker = new Chunker( channel );
183-
this._packer = new Packer(this._chunker);
184-
this._unpacker = new Unpacker(disableLosslessIntegers);
118+
119+
// initially assume that database supports latest Bolt version, create latest packer and unpacker
120+
this._packer = packStreamUtil.createLatestPacker(this._chunker);
121+
this._unpacker = packStreamUtil.createLatestUnpacker(disableLosslessIntegers);
185122

186123
this._isHandlingFailure = false;
187124
this._currentFailure = null;
@@ -191,34 +128,18 @@ class Connection {
191128
// Set to true on fatal errors, to get this out of session pool.
192129
this._isBroken = false;
193130

194-
// For deserialization, explain to the unpacker how to unpack nodes, rels, paths;
195-
this._unpacker.structMappers[NODE] = _mappers.node;
196-
this._unpacker.structMappers[RELATIONSHIP] = _mappers.rel;
197-
this._unpacker.structMappers[UNBOUND_RELATIONSHIP] = _mappers.unboundRel;
198-
this._unpacker.structMappers[PATH] = _mappers.path;
199-
200-
let self = this;
201131
// TODO: Using `onmessage` and `onerror` came from the WebSocket API,
202132
// it reads poorly and has several annoying drawbacks. Swap to having
203133
// Channel extend EventEmitter instead, then we can use `on('data',..)`
204134
this._ch.onmessage = (buf) => {
205-
let proposed = buf.readInt32();
206-
if( proposed == 1 ) {
207-
// Ok, protocol running. Simply forward all messages past
208-
// this to the dechunker
209-
self._ch.onmessage = (buf) => {
210-
self._dechunker.write(buf);
211-
};
212-
213-
if( buf.hasRemaining() ) {
214-
self._dechunker.write(buf.readSlice( buf.remaining() ));
215-
}
135+
const proposed = buf.readInt32();
136+
if (proposed == 1 || proposed == 2) {
137+
this._initializeProtocol(proposed, buf);
216138
} else if (proposed == 1213486160) {//server responded 1213486160 == 0x48545450 == "HTTP"
217-
self._handleFatalError(newError("Server responded HTTP. Make sure you are not trying to connect to the http endpoint " +
218-
"(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)"));
219-
}
220-
else {
221-
self._handleFatalError(newError("Unknown Bolt protocol version: " + proposed));
139+
this._handleFatalError(newError('Server responded HTTP. Make sure you are not trying to connect to the http endpoint ' +
140+
'(HTTP defaults to port 7474 whereas BOLT defaults to port 7687)'));
141+
} else {
142+
this._handleFatalError(newError('Unknown Bolt protocol version: ' + proposed));
222143
}
223144
};
224145

@@ -235,21 +156,40 @@ class Connection {
235156
}
236157

237158
this._dechunker.onmessage = (buf) => {
238-
self._handleMessage( self._unpacker.unpack( buf ) );
159+
this._handleMessage(this._unpacker.unpack(buf));
239160
};
240161

241162
let handshake = alloc( 5 * 4 );
242163
//magic preamble
243164
handshake.writeInt32( MAGIC_PREAMBLE );
244165
//proposed versions
166+
handshake.writeInt32( 2 );
245167
handshake.writeInt32( 1 );
246168
handshake.writeInt32( 0 );
247169
handshake.writeInt32( 0 );
248-
handshake.writeInt32( 0 );
249170
handshake.reset();
250171
this._ch.write( handshake );
251172
}
252173

174+
/**
175+
* Complete protocol initialization.
176+
* @param {number} version the selected protocol version.
177+
* @param {BaseBuffer} buffer the handshake response buffer.
178+
* @private
179+
*/
180+
_initializeProtocol(version, buffer) {
181+
// re-create packer and unpacker because version might be lower than we initially assumed
182+
this._packer = packStreamUtil.createPackerForProtocolVersion(version, this._chunker);
183+
this._unpacker = packStreamUtil.createUnpackerForProtocolVersion(version, this._disableLosslessIntegers);
184+
185+
// Ok, protocol running. Simply forward all messages to the dechunker
186+
this._ch.onmessage = buf => this._dechunker.write(buf);
187+
188+
if (buffer.hasRemaining()) {
189+
this._dechunker.write(buffer.readSlice(buffer.remaining()));
190+
}
191+
}
192+
253193
/**
254194
* "Fatal" means the connection is dead. Only call this if something
255195
* happens that cannot be recovered from. This will lead to all subscribers

src/v1/internal/packstream-util.js

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Copyright (c) 2002-2018 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
import * as v1 from './packstream-v1';
21+
import * as v2 from './packstream-v2';
22+
23+
const PACKER_CONSTRUCTORS_BY_VERSION = [null, v1.Packer, v2.Packer];
24+
const UNPACKER_CONSTRUCTORS_BY_VERSION = [null, v1.Unpacker, v2.Unpacker];
25+
26+
function createLatestPacker(chunker) {
27+
return createPackerForProtocolVersion(PACKER_CONSTRUCTORS_BY_VERSION.length - 1, chunker);
28+
}
29+
30+
function createLatestUnpacker(disableLosslessIntegers) {
31+
return createUnpackerForProtocolVersion(UNPACKER_CONSTRUCTORS_BY_VERSION.length - 1, disableLosslessIntegers);
32+
}
33+
34+
function createPackerForProtocolVersion(version, chunker) {
35+
const packerConstructor = PACKER_CONSTRUCTORS_BY_VERSION[version];
36+
if (!packerConstructor) {
37+
throw new Error(`Packer can't be created for protocol version ${version}`);
38+
}
39+
return new packerConstructor(chunker);
40+
}
41+
42+
function createUnpackerForProtocolVersion(version, disableLosslessIntegers) {
43+
const unpackerConstructor = UNPACKER_CONSTRUCTORS_BY_VERSION[version];
44+
if (!unpackerConstructor) {
45+
throw new Error(`Unpacker can't be created for protocol version ${version}`);
46+
}
47+
return new unpackerConstructor(disableLosslessIntegers);
48+
}
49+
50+
export default {
51+
createLatestPacker: createLatestPacker,
52+
createLatestUnpacker: createLatestUnpacker,
53+
createPackerForProtocolVersion: createPackerForProtocolVersion,
54+
createUnpackerForProtocolVersion: createUnpackerForProtocolVersion
55+
};

src/v1/internal/packstream.js renamed to src/v1/internal/packstream-v1.js

Lines changed: 85 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import utf8 from './utf8';
2020
import Integer, {int, isInt} from '../integer';
2121
import {newError} from './../error';
2222
import {Chunker} from './chunking';
23+
import {Node, Path, PathSegment, Relationship, UnboundRelationship} from '../graph-types';
2324

2425
const TINY_STRING = 0x80;
2526
const TINY_LIST = 0x90;
@@ -48,6 +49,11 @@ const MAP_32 = 0xDA;
4849
const STRUCT_8 = 0xDC;
4950
const STRUCT_16 = 0xDD;
5051

52+
const NODE = 0x4E;
53+
const RELATIONSHIP = 0x52;
54+
const UNBOUND_RELATIONSHIP = 0x72;
55+
const PATH = 0x50;
56+
5157
/**
5258
* A Structure have a signature and fields.
5359
* @access private
@@ -322,10 +328,6 @@ class Unpacker {
322328
* @param {boolean} disableLosslessIntegers if this unpacker should convert all received integers to native JS numbers.
323329
*/
324330
constructor(disableLosslessIntegers = false) {
325-
// Higher level layers can specify how to map structs to higher-level objects.
326-
// If we receive a struct that has a signature that does not have a mapper,
327-
// we simply return a Structure object.
328-
this.structMappers = {};
329331
this._disableLosslessIntegers = disableLosslessIntegers;
330332
}
331333

@@ -504,17 +506,88 @@ class Unpacker {
504506
}
505507

506508
_unpackStructWithSize(size, buffer) {
507-
let signature = buffer.readUInt8();
508-
let mapper = this.structMappers[signature];
509-
if (mapper) {
510-
return mapper(this, buffer);
509+
const signature = buffer.readUInt8();
510+
if (signature == NODE) {
511+
return this._unpackNode(buffer);
512+
} else if (signature == RELATIONSHIP) {
513+
return this._unpackRelationship(buffer);
514+
} else if (signature == UNBOUND_RELATIONSHIP) {
515+
return this._unpackUnboundRelationship(buffer);
516+
} else if (signature == PATH) {
517+
return this._unpackPath(buffer);
511518
} else {
512-
let value = new Structure(signature, []);
513-
for (let i = 0; i < size; i++) {
514-
value.fields.push(this.unpack(buffer));
519+
return this._unpackUnknownStruct(signature, size, buffer);
520+
}
521+
}
522+
523+
_unpackNode(buffer) {
524+
return new Node(
525+
this.unpack(buffer), // Identity
526+
this.unpack(buffer), // Labels
527+
this.unpack(buffer) // Properties
528+
);
529+
}
530+
531+
_unpackRelationship(buffer) {
532+
return new Relationship(
533+
this.unpack(buffer), // Identity
534+
this.unpack(buffer), // Start Node Identity
535+
this.unpack(buffer), // End Node Identity
536+
this.unpack(buffer), // Type
537+
this.unpack(buffer) // Properties
538+
);
539+
}
540+
541+
_unpackUnboundRelationship(buffer) {
542+
return new UnboundRelationship(
543+
this.unpack(buffer), // Identity
544+
this.unpack(buffer), // Type
545+
this.unpack(buffer) // Properties
546+
);
547+
}
548+
549+
_unpackPath(buffer) {
550+
const nodes = this.unpack(buffer);
551+
const rels = this.unpack(buffer);
552+
const sequence = this.unpack(buffer);
553+
554+
const segments = [];
555+
let prevNode = nodes[0];
556+
557+
for (let i = 0; i < sequence.length; i += 2) {
558+
const nextNode = nodes[sequence[i + 1]];
559+
let relIndex = sequence[i];
560+
let rel;
561+
562+
if (relIndex > 0) {
563+
rel = rels[relIndex - 1];
564+
if (rel instanceof UnboundRelationship) {
565+
// To avoid duplication, relationships in a path do not contain
566+
// information about their start and end nodes, that's instead
567+
// inferred from the path sequence. This is us inferring (and,
568+
// for performance reasons remembering) the start/end of a rel.
569+
rels[relIndex - 1] = rel = rel.bind(prevNode.identity, nextNode.identity);
570+
}
571+
} else {
572+
rel = rels[-relIndex - 1];
573+
if (rel instanceof UnboundRelationship) {
574+
// See above
575+
rels[-relIndex - 1] = rel = rel.bind(nextNode.identity, prevNode.identity);
576+
}
515577
}
516-
return value;
578+
// Done hydrating one path segment.
579+
segments.push(new PathSegment(prevNode, rel, nextNode));
580+
prevNode = nextNode;
581+
}
582+
return new Path(nodes[0], nodes[nodes.length - 1], segments);
583+
}
584+
585+
_unpackUnknownStruct(signature, size, buffer) {
586+
const result = new Structure(signature, []);
587+
for (let i = 0; i < size; i++) {
588+
result.fields.push(this.unpack(buffer));
517589
}
590+
return result;
518591
}
519592

520593
}

0 commit comments

Comments
 (0)