Skip to content

Commit 9b49898

Browse files
committed
Merge branch 1.6 into 1.7
2 parents 7f3be63 + b33d284 commit 9b49898

File tree

13 files changed

+428
-41
lines changed

13 files changed

+428
-41
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/DirectConnection.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private void writeResetMessageIfNeeded( ResponseHandler resetHandler, boolean is
199199
// auto-read could've been disabled, re-enable it to automatically receive response for RESET
200200
setAutoRead( true );
201201

202-
messageDispatcher.queue( resetHandler );
202+
messageDispatcher.enqueue( resetHandler );
203203
channel.writeAndFlush( ResetMessage.RESET, channel.voidPromise() );
204204
}
205205
} );
@@ -209,7 +209,7 @@ private void writeMessageInEventLoop( Message message, ResponseHandler handler,
209209
{
210210
channel.eventLoop().execute( () ->
211211
{
212-
messageDispatcher.queue( handler );
212+
messageDispatcher.enqueue( handler );
213213

214214
if ( flush )
215215
{
@@ -226,8 +226,8 @@ private void writeMessagesInEventLoop( Message message1, ResponseHandler handler
226226
{
227227
channel.eventLoop().execute( () ->
228228
{
229-
messageDispatcher.queue( handler1 );
230-
messageDispatcher.queue( handler2 );
229+
messageDispatcher.enqueue( handler1 );
230+
messageDispatcher.enqueue( handler2 );
231231

232232
channel.write( message1, channel.voidPromise() );
233233

driver/src/main/java/org/neo4j/driver/internal/async/inbound/InboundMessageDispatcher.java

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.neo4j.driver.internal.handlers.ResetResponseHandler;
2929
import org.neo4j.driver.internal.logging.ChannelActivityLogger;
3030
import org.neo4j.driver.internal.messaging.ResponseMessageHandler;
31+
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
3132
import org.neo4j.driver.internal.spi.ResponseHandler;
3233
import org.neo4j.driver.internal.util.ErrorUtil;
3334
import org.neo4j.driver.v1.Logger;
@@ -47,13 +48,15 @@ public class InboundMessageDispatcher implements ResponseMessageHandler
4748
private Throwable currentError;
4849
private boolean fatalErrorOccurred;
4950

51+
private AutoReadManagingResponseHandler autoReadManagingHandler;
52+
5053
public InboundMessageDispatcher( Channel channel, Logging logging )
5154
{
5255
this.channel = requireNonNull( channel );
5356
this.log = new ChannelActivityLogger( channel, logging, getClass() );
5457
}
5558

56-
public void queue( ResponseHandler handler )
59+
public void enqueue( ResponseHandler handler )
5760
{
5861
if ( fatalErrorOccurred )
5962
{
@@ -62,6 +65,7 @@ public void queue( ResponseHandler handler )
6265
else
6366
{
6467
handlers.add( handler );
68+
updateAutoReadManagingHandlerIfNeeded( handler );
6569
}
6670
}
6771

@@ -74,7 +78,7 @@ public int queuedHandlersCount()
7478
public void handleSuccessMessage( Map<String,Value> meta )
7579
{
7680
log.debug( "S: SUCCESS %s", meta );
77-
ResponseHandler handler = handlers.remove();
81+
ResponseHandler handler = removeHandler();
7882
handler.onSuccess( meta );
7983
}
8084

@@ -109,10 +113,10 @@ public void handleFailureMessage( String code, String message )
109113
}
110114

111115
// write a RESET to "acknowledge" the failure
112-
queue( new ResetResponseHandler( this ) );
116+
enqueue( new ResetResponseHandler( this ) );
113117
channel.writeAndFlush( RESET, channel.voidPromise() );
114118

115-
ResponseHandler handler = handlers.remove();
119+
ResponseHandler handler = removeHandler();
116120
handler.onFailure( currentError );
117121
}
118122

@@ -121,7 +125,7 @@ public void handleIgnoredMessage()
121125
{
122126
log.debug( "S: IGNORED" );
123127

124-
ResponseHandler handler = handlers.remove();
128+
ResponseHandler handler = removeHandler();
125129

126130
Throwable error;
127131
if ( currentError != null )
@@ -145,7 +149,7 @@ public void handleFatalError( Throwable error )
145149

146150
while ( !handlers.isEmpty() )
147151
{
148-
ResponseHandler handler = handlers.remove();
152+
ResponseHandler handler = removeHandler();
149153
handler.onFailure( currentError );
150154
}
151155
}
@@ -165,4 +169,44 @@ public boolean fatalErrorOccurred()
165169
return fatalErrorOccurred;
166170
}
167171

172+
/**
173+
* <b>Visible for testing</b>
174+
*/
175+
AutoReadManagingResponseHandler autoReadManagingHandler()
176+
{
177+
return autoReadManagingHandler;
178+
}
179+
180+
private ResponseHandler removeHandler()
181+
{
182+
ResponseHandler handler = handlers.remove();
183+
if ( handler == autoReadManagingHandler )
184+
{
185+
// the auto-read managing handler is being removed
186+
// make sure this dispatcher does not hold on to a removed handler
187+
updateAutoReadManagingHandler( null );
188+
}
189+
return handler;
190+
}
191+
192+
private void updateAutoReadManagingHandlerIfNeeded( ResponseHandler handler )
193+
{
194+
if ( handler instanceof AutoReadManagingResponseHandler )
195+
{
196+
updateAutoReadManagingHandler( (AutoReadManagingResponseHandler) handler );
197+
}
198+
}
199+
200+
private void updateAutoReadManagingHandler( AutoReadManagingResponseHandler newHandler )
201+
{
202+
if ( autoReadManagingHandler != null )
203+
{
204+
// there already exists a handler that manages channel's auto-read
205+
// make it stop because new managing handler is being added and there should only be a single such handler
206+
autoReadManagingHandler.disableAutoReadManagement();
207+
// restore the default value of auto-read
208+
channel.config().setAutoRead( true );
209+
}
210+
autoReadManagingHandler = newHandler;
211+
}
168212
}

driver/src/main/java/org/neo4j/driver/internal/async/pool/NettyChannelHealthChecker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private boolean hasBeenIdleForTooLong( Channel channel )
103103
private Future<Boolean> ping( Channel channel )
104104
{
105105
Promise<Boolean> result = channel.eventLoop().newPromise();
106-
messageDispatcher( channel ).queue( new PingResponseHandler( result, channel, log ) );
106+
messageDispatcher( channel ).enqueue( new PingResponseHandler( result, channel, log ) );
107107
channel.writeAndFlush( ResetMessage.RESET, channel.voidPromise() );
108108
return result;
109109
}

driver/src/main/java/org/neo4j/driver/internal/handlers/PullAllResponseHandler.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
import java.util.concurrent.CompletionStage;
2828

2929
import org.neo4j.driver.internal.InternalRecord;
30+
import org.neo4j.driver.internal.spi.AutoReadManagingResponseHandler;
3031
import org.neo4j.driver.internal.spi.Connection;
31-
import org.neo4j.driver.internal.spi.ResponseHandler;
3232
import org.neo4j.driver.internal.util.Futures;
3333
import org.neo4j.driver.internal.util.Iterables;
3434
import org.neo4j.driver.internal.util.MetadataExtractor;
@@ -44,7 +44,7 @@
4444
import static org.neo4j.driver.internal.util.Futures.completedWithNull;
4545
import static org.neo4j.driver.internal.util.Futures.failedFuture;
4646

47-
public abstract class PullAllResponseHandler implements ResponseHandler
47+
public abstract class PullAllResponseHandler implements AutoReadManagingResponseHandler
4848
{
4949
private static final Queue<Record> UNINITIALIZED_RECORDS = Iterables.emptyQueue();
5050

@@ -59,6 +59,7 @@ public abstract class PullAllResponseHandler implements ResponseHandler
5959
// initialized lazily when first record arrives
6060
private Queue<Record> records = UNINITIALIZED_RECORDS;
6161

62+
private boolean autoReadManagementEnabled = true;
6263
private boolean finished;
6364
private Throwable failure;
6465
private ResultSummary summary;
@@ -131,6 +132,12 @@ public synchronized void onRecord( Value[] fields )
131132
}
132133
}
133134

135+
@Override
136+
public synchronized void disableAutoReadManagement()
137+
{
138+
autoReadManagementEnabled = false;
139+
}
140+
134141
public synchronized CompletionStage<Record> peekAsync()
135142
{
136143
Record record = records.peek();
@@ -211,7 +218,7 @@ else if ( finished )
211218
// neither SUCCESS nor FAILURE message has arrived, register future to be notified when it arrives
212219
// future will be completed with null on SUCCESS and completed with Throwable on FAILURE
213220
// enable auto-read, otherwise we might not read SUCCESS/FAILURE if records are not consumed
214-
connection.enableAutoRead();
221+
enableAutoRead();
215222
failureFuture = new CompletableFuture<>();
216223
}
217224
return failureFuture;
@@ -236,7 +243,7 @@ private void enqueueRecord( Record record )
236243
// more than high watermark records are already queued, tell connection to stop auto-reading from network
237244
// this is needed to deal with slow consumers, we do not want to buffer all records in memory if they are
238245
// fetched from network faster than consumed
239-
connection.disableAutoRead();
246+
disableAutoRead();
240247
}
241248
}
242249

@@ -248,7 +255,7 @@ private Record dequeueRecord()
248255
{
249256
// less than low watermark records are now available in the buffer, tell connection to pre-fetch more
250257
// and populate queue with new records from network
251-
connection.enableAutoRead();
258+
enableAutoRead();
252259
}
253260

254261
return record;
@@ -321,4 +328,20 @@ private ResultSummary extractResultSummary( Map<String,Value> metadata )
321328
long resultAvailableAfter = runResponseHandler.resultAvailableAfter();
322329
return metadataExtractor.extractSummary( statement, connection, resultAvailableAfter, metadata );
323330
}
331+
332+
private void enableAutoRead()
333+
{
334+
if ( autoReadManagementEnabled )
335+
{
336+
connection.enableAutoRead();
337+
}
338+
}
339+
340+
private void disableAutoRead()
341+
{
342+
if ( autoReadManagementEnabled )
343+
{
344+
connection.disableAutoRead();
345+
}
346+
}
324347
}

driver/src/main/java/org/neo4j/driver/internal/messaging/v1/BoltProtocolV1.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public void initializeChannel( String userAgent, Map<String,Value> authToken, Ch
8484
InitMessage message = new InitMessage( userAgent, authToken );
8585
InitResponseHandler handler = new InitResponseHandler( channelInitializedPromise );
8686

87-
messageDispatcher( channel ).queue( handler );
87+
messageDispatcher( channel ).enqueue( handler );
8888
channel.writeAndFlush( message, channel.voidPromise() );
8989
}
9090

driver/src/main/java/org/neo4j/driver/internal/messaging/v3/BoltProtocolV3.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void initializeChannel( String userAgent, Map<String,Value> authToken, Ch
8080
HelloMessage message = new HelloMessage( userAgent, authToken );
8181
HelloResponseHandler handler = new HelloResponseHandler( channelInitializedPromise );
8282

83-
messageDispatcher( channel ).queue( handler );
83+
messageDispatcher( channel ).enqueue( handler );
8484
channel.writeAndFlush( message, channel.voidPromise() );
8585
}
8686

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright (c) 2002-2018 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.spi;
20+
21+
import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher;
22+
23+
/**
24+
* A type of {@link ResponseHandler handler} that manages auto-read of the underlying connection using {@link Connection#enableAutoRead()} and
25+
* {@link Connection#disableAutoRead()}.
26+
* <p>
27+
* Implementations can use auto-read management to apply network-level backpressure when receiving a stream of records.
28+
* There should only be a single such handler active for a connection at one point in time. Otherwise, handlers can interfere and turn on/off auto-read
29+
* racing with each other. {@link InboundMessageDispatcher} is responsible for tracking these handlers and disabling auto-read management to maintain just
30+
* a single auto-read managing handler per connection.
31+
*/
32+
public interface AutoReadManagingResponseHandler extends ResponseHandler
33+
{
34+
/**
35+
* Tell this handler that it should stop changing auto-read setting for the connection.
36+
*/
37+
void disableAutoReadManagement();
38+
}

driver/src/test/java/org/neo4j/driver/internal/async/DirectConnectionTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -623,10 +623,10 @@ private static class ThreadTrackingInboundMessageDispatcher extends InboundMessa
623623
}
624624

625625
@Override
626-
public void queue( ResponseHandler handler )
626+
public void enqueue( ResponseHandler handler )
627627
{
628628
queueThreadNames.add( Thread.currentThread().getName() );
629-
super.queue( handler );
629+
super.enqueue( handler );
630630
}
631631

632632
}

driver/src/test/java/org/neo4j/driver/internal/async/HandshakeCompletedListenerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ private void testWritingOfInitializationMessage( int protocolVersion, Message ex
113113
listener.operationComplete( handshakeCompletedPromise );
114114
assertTrue( channel.finish() );
115115

116-
verify( messageDispatcher ).queue( any( handlerType ) );
116+
verify( messageDispatcher ).enqueue( any( handlerType ) );
117117
Object outboundMessage = channel.readOutbound();
118118
assertEquals( expectedMessage, outboundMessage );
119119
}

0 commit comments

Comments
 (0)