Skip to content

Commit c8b028a

Browse files
committed
Introduce streaming ismaster monitoring protocol
* DefaultServerMonitor implements the new streaming isMaster protocol, if available, to detect topology changes sooner. This capability was added in MongoDB release 4.4. * This requires the ability to increase the read timeout on a per-read basis. Since the Stream class is public, this has to be done carefully in order to avoid using the streaming protocol with Stream implementations that don't have this ability. All the built-in Stream implementations have had this ability added, so in practice it should not happen unless an application has created its own Stream implementation (unlikely). * Implement the new server discovery and monitoring specification integration tests * Fix some remaining bugs in error handling JAVA-3626
1 parent 665cc9b commit c8b028a

File tree

61 files changed

+3355
-366
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+3355
-366
lines changed

config/checkstyle-exclude.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
<suppress checks="Regexp" files="Tour"/>
2929

3030
<suppress checks="MethodLength" files="PojoRoundTripTest"/>
31+
<suppress checks="MethodLength" files="AbstractUnifiedTest"/>
3132

3233
<suppress checks="JavadocPackage" files="com[\\/]mongodb[\\/][^\\/]*\.java"/>
3334
<suppress checks="JavadocPackage" files="com[\\/]mongodb[\\/]client[\\/][^\\/]*\.java"/>

driver-core/src/main/com/mongodb/connection/Stream.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,42 @@ public interface Stream extends BufferProvider{
6060
*/
6161
ByteBuf read(int numBytes) throws IOException;
6262

63+
/**
64+
* Gets whether this implementation supports specifying an additional timeout for read operations
65+
* <p>
66+
* The default is to not support specifying an additional timeout
67+
* </p>
68+
*
69+
* @return true if this implementation supports specifying an additional timeouts for reads operations
70+
* @see #read(int, int)
71+
* @since 4.1
72+
*/
73+
default boolean supportsAdditionalTimeout() {
74+
return false;
75+
}
76+
77+
/**
78+
* Read from the stream, blocking until the requested number of bytes have been read. If supported by the implementation,
79+
* adds the given additional timeout to the configured timeout for the stream.
80+
* <p>
81+
* This method should not be called unless {@link #supportsAdditionalTimeout()} returns true.
82+
* </p>
83+
* <p>
84+
* The default behavior is to throw an {@link UnsupportedOperationException}
85+
* </p>
86+
*
87+
* @param numBytes The number of bytes to read into the returned byte buffer
88+
* @param additionalTimeout additional timeout in milliseconds to add to the configured timeout
89+
* @return a byte buffer filled with number of bytes requested
90+
* @throws IOException if there are problems reading from the stream
91+
* @throws UnsupportedOperationException if this implementation does not support additional timeouts
92+
* @see #supportsAdditionalTimeout()
93+
* @since 4.1
94+
*/
95+
default ByteBuf read(int numBytes, int additionalTimeout) throws IOException {
96+
throw new UnsupportedOperationException();
97+
}
98+
6399
/**
64100
* Write each buffer in the list to the stream in order, asynchronously. This method should return immediately, and invoke the given
65101
* callback on completion.

driver-core/src/main/com/mongodb/connection/TlsChannelStreamFactoryFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,11 @@ private static class TlsChannelStream extends AsynchronousChannelStream implemen
196196
this.selectorMonitor = selectorMonitor;
197197
}
198198

199+
@Override
200+
public boolean supportsAdditionalTimeout() {
201+
return true;
202+
}
203+
199204
@Override
200205
public void openAsync(final AsyncCompletionHandler<Void> handler) {
201206
isTrue("unopened", getChannel() == null);

driver-core/src/main/com/mongodb/connection/netty/NettyStream.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,18 @@ public void write(final List<ByteBuf> buffers) throws IOException {
175175

176176
@Override
177177
public ByteBuf read(final int numBytes) throws IOException {
178+
return read(numBytes, 0);
179+
}
180+
181+
@Override
182+
public boolean supportsAdditionalTimeout() {
183+
return true;
184+
}
185+
186+
@Override
187+
public ByteBuf read(final int numBytes, final int additionalTimeout) throws IOException {
178188
FutureAsyncCompletionHandler<ByteBuf> future = new FutureAsyncCompletionHandler<ByteBuf>();
179-
readAsync(numBytes, future);
189+
readAsync(numBytes, future, additionalTimeout);
180190
return future.get();
181191
}
182192

@@ -201,7 +211,11 @@ public void operationComplete(final ChannelFuture future) throws Exception {
201211

202212
@Override
203213
public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler) {
204-
scheduleReadTimeout();
214+
readAsync(numBytes, handler, 0);
215+
}
216+
217+
private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler, final int additionalTimeout) {
218+
scheduleReadTimeout(additionalTimeout);
205219
ByteBuf buffer = null;
206220
Throwable exceptionResult = null;
207221
synchronized (this) {
@@ -431,15 +445,18 @@ public void operationComplete(final ChannelFuture future) {
431445
}
432446
}
433447

434-
private void scheduleReadTimeout() {
435-
adjustTimeout(false);
448+
private void scheduleReadTimeout(final int additionalTimeout) {
449+
adjustTimeout(false, additionalTimeout);
436450
}
437451

438452
private void disableReadTimeout() {
439-
adjustTimeout(true);
453+
adjustTimeout(true, 0);
440454
}
441455

442-
private void adjustTimeout(final boolean disable) {
456+
private void adjustTimeout(final boolean disable, final int additionalTimeout) {
457+
if (isClosed) {
458+
return;
459+
}
443460
ChannelHandler timeoutHandler = channel.pipeline().get(READ_HANDLER_NAME);
444461
if (timeoutHandler != null) {
445462
final ReadTimeoutHandler readTimeoutHandler = (ReadTimeoutHandler) timeoutHandler;
@@ -459,12 +476,12 @@ public void run() {
459476
}
460477
} else {
461478
if (executor.inEventLoop()) {
462-
readTimeoutHandler.scheduleTimeout(handlerContext);
479+
readTimeoutHandler.scheduleTimeout(handlerContext, additionalTimeout);
463480
} else {
464481
executor.submit(new Runnable() {
465482
@Override
466483
public void run() {
467-
readTimeoutHandler.scheduleTimeout(handlerContext);
484+
readTimeoutHandler.scheduleTimeout(handlerContext, additionalTimeout);
468485
}
469486
});
470487
}

driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,10 @@ final class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
4040
this.readTimeout = readTimeout;
4141
}
4242

43-
void scheduleTimeout(final ChannelHandlerContext ctx) {
43+
void scheduleTimeout(final ChannelHandlerContext ctx, final int additionalTimeout) {
4444
isTrue("Handler called from the eventLoop", ctx.channel().eventLoop().inEventLoop());
4545
if (timeout == null) {
46-
timeout = ctx.executor().schedule(new ReadTimeoutTask(ctx), readTimeout, TimeUnit.MILLISECONDS);
46+
timeout = ctx.executor().schedule(new ReadTimeoutTask(ctx), readTimeout + additionalTimeout, TimeUnit.MILLISECONDS);
4747
}
4848
}
4949

driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
public final class ServerHeartbeatFailedEvent {
3232
private final ConnectionId connectionId;
3333
private final long elapsedTimeNanos;
34+
private final boolean awaited;
3435
private final Throwable throwable;
3536

3637
/**
@@ -39,9 +40,26 @@ public final class ServerHeartbeatFailedEvent {
3940
* @param connectionId the non-null connectionId
4041
* @param elapsedTimeNanos the non-negative elapsed time in nanoseconds
4142
* @param throwable the non-null exception that caused the failure
43+
* @deprecated Prefer {@link #ServerHeartbeatFailedEvent(ConnectionId, long, boolean, Throwable)}
4244
*/
45+
@Deprecated
4346
public ServerHeartbeatFailedEvent(final ConnectionId connectionId, final long elapsedTimeNanos, final Throwable throwable) {
47+
this(connectionId, elapsedTimeNanos, false, throwable);
48+
}
49+
50+
/**
51+
* Construct an instance.
52+
*
53+
* @param connectionId the non-null connectionId
54+
* @param elapsedTimeNanos the non-negative elapsed time in nanoseconds
55+
* @param awaited true if the response was awaited
56+
* @param throwable the non-null exception that caused the failure
57+
* @since 4.1
58+
*/
59+
public ServerHeartbeatFailedEvent(final ConnectionId connectionId, final long elapsedTimeNanos, final boolean awaited,
60+
final Throwable throwable) {
4461
this.connectionId = notNull("connectionId", connectionId);
62+
this.awaited = awaited;
4563
isTrueArgument("elapsed time is not negative", elapsedTimeNanos >= 0);
4664
this.elapsedTimeNanos = elapsedTimeNanos;
4765
this.throwable = notNull("throwable", throwable);
@@ -67,6 +85,18 @@ public long getElapsedTime(final TimeUnit timeUnit) {
6785
return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS);
6886
}
6987

88+
/**
89+
* Gets whether the heartbeat was awaited. If true, then {@link #getElapsedTime(TimeUnit)} reflects the sum of the round trip time
90+
* to the server and the time that the server waited before sending a response.
91+
*
92+
* @return whether the response was awaited
93+
* @since 4.1
94+
* @mongodb.server.release 4.4
95+
*/
96+
public boolean isAwaited() {
97+
return awaited;
98+
}
99+
70100
/**
71101
* Gets the exceptions that caused the failure
72102
*
@@ -81,6 +111,7 @@ public String toString() {
81111
return "ServerHeartbeatFailedEvent{"
82112
+ "connectionId=" + connectionId
83113
+ ", elapsedTimeNanos=" + elapsedTimeNanos
114+
+ ", awaited=" + awaited
84115
+ ", throwable=" + throwable
85116
+ "} " + super.toString();
86117
}

driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,19 +33,37 @@ public final class ServerHeartbeatSucceededEvent {
3333
private final ConnectionId connectionId;
3434
private final BsonDocument reply;
3535
private final long elapsedTimeNanos;
36+
private final boolean awaited;
3637

3738
/**
3839
* Construct an instance.
3940
*
4041
* @param connectionId the non-null connectionId
4142
* @param reply the non-null reply to an isMaster command
4243
* @param elapsedTimeNanos the non-negative elapsed time in nanoseconds
44+
* @deprecated Prefer {@link #ServerHeartbeatSucceededEvent(ConnectionId, BsonDocument, long, boolean)}
4345
*/
46+
@Deprecated
4447
public ServerHeartbeatSucceededEvent(final ConnectionId connectionId, final BsonDocument reply, final long elapsedTimeNanos) {
48+
this(connectionId, reply, elapsedTimeNanos, false);
49+
}
50+
51+
/**
52+
* Construct an instance.
53+
*
54+
* @param connectionId the non-null connectionId
55+
* @param reply the non-null reply to an isMaster command
56+
* @param elapsedTimeNanos the non-negative elapsed time in nanoseconds
57+
* @param awaited true if the response was awaited
58+
* @since 4.1
59+
*/
60+
public ServerHeartbeatSucceededEvent(final ConnectionId connectionId, final BsonDocument reply, final long elapsedTimeNanos,
61+
final boolean awaited) {
4562
this.connectionId = notNull("connectionId", connectionId);
4663
this.reply = notNull("reply", reply);
4764
isTrueArgument("elapsed time is not negative", elapsedTimeNanos >= 0);
4865
this.elapsedTimeNanos = elapsedTimeNanos;
66+
this.awaited = awaited;
4967
}
5068

5169
/**
@@ -77,12 +95,25 @@ public long getElapsedTime(final TimeUnit timeUnit) {
7795
return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS);
7896
}
7997

98+
/**
99+
* Gets whether the heartbeat was awaited. If true, then {@link #getElapsedTime(TimeUnit)} reflects the sum of the round trip time
100+
* to the server and the time that the server waited before sending a response.
101+
*
102+
* @return whether the response was awaited
103+
* @since 4.1
104+
* @mongodb.server.release 4.4
105+
*/
106+
public boolean isAwaited() {
107+
return awaited;
108+
}
109+
80110
@Override
81111
public String toString() {
82112
return "ServerHeartbeatSucceededEvent{"
83113
+ "connectionId=" + connectionId
84114
+ ", reply=" + reply
85115
+ ", elapsedTimeNanos=" + elapsedTimeNanos
116+
+ ", awaited=" + awaited
86117
+ "} ";
87118
}
88119
}

driver-core/src/main/com/mongodb/internal/async/client/AsyncMongoClient.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818

1919
import com.mongodb.ClientSessionOptions;
2020
import com.mongodb.annotations.Immutable;
21+
import com.mongodb.connection.ClusterDescription;
22+
import com.mongodb.connection.ClusterSettings;
23+
import com.mongodb.event.ClusterListener;
2124
import com.mongodb.internal.async.SingleResultCallback;
2225
import org.bson.Document;
2326
import org.bson.conversions.Bson;
@@ -233,4 +236,19 @@ public interface AsyncMongoClient extends Closeable {
233236
<TResult> AsyncChangeStreamIterable<TResult> watch(AsyncClientSession clientSession, List<? extends Bson> pipeline,
234237
Class<TResult> resultClass);
235238

239+
/**
240+
* Gets the current cluster description.
241+
*
242+
* <p>
243+
* This method will not block, meaning that it may return a {@link ClusterDescription} whose {@code clusterType} is unknown
244+
* and whose {@link com.mongodb.connection.ServerDescription}s are all in the connecting state. If the application requires
245+
* notifications after the driver has connected to a member of the cluster, it should register a {@link ClusterListener} via
246+
* the {@link ClusterSettings} in {@link com.mongodb.MongoClientSettings}.
247+
* </p>
248+
*
249+
* @return the current cluster description
250+
* @see ClusterSettings.Builder#addClusterListener(ClusterListener)
251+
* @see com.mongodb.MongoClientSettings.Builder#applyToClusterSettings(com.mongodb.Block)
252+
*/
253+
ClusterDescription getClusterDescription();
236254
}

driver-core/src/main/com/mongodb/internal/async/client/AsyncMongoClientImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.mongodb.MongoClientException;
2323
import com.mongodb.MongoClientSettings;
2424
import com.mongodb.ReadPreference;
25+
import com.mongodb.connection.ClusterDescription;
2526
import com.mongodb.diagnostics.logging.Logger;
2627
import com.mongodb.diagnostics.logging.Loggers;
2728
import com.mongodb.internal.async.SingleResultCallback;
@@ -211,6 +212,11 @@ public <TResult> AsyncChangeStreamIterable<TResult> watch(final AsyncClientSessi
211212
return createChangeStreamIterable(clientSession, pipeline, resultClass);
212213
}
213214

215+
@Override
216+
public ClusterDescription getClusterDescription() {
217+
return cluster.getCurrentDescription();
218+
}
219+
214220
private <TResult> AsyncChangeStreamIterable<TResult> createChangeStreamIterable(@Nullable final AsyncClientSession clientSession,
215221
final List<? extends Bson> pipeline,
216222
final Class<TResult> resultClass) {

driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.mongodb.internal.connection;
1818

1919
import com.mongodb.MongoException;
20-
import com.mongodb.MongoNotPrimaryException;
2120
import com.mongodb.ServerAddress;
2221
import com.mongodb.connection.ClusterConnectionMode;
2322
import com.mongodb.connection.ClusterDescription;
@@ -30,7 +29,6 @@
3029
import com.mongodb.event.ClusterDescriptionChangedEvent;
3130
import com.mongodb.event.ServerDescriptionChangedEvent;
3231
import com.mongodb.event.ServerListener;
33-
import org.bson.BsonDocument;
3432
import org.bson.types.ObjectId;
3533

3634
import java.util.ArrayList;
@@ -283,7 +281,7 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
283281
setVersion, electionId,
284282
maxSetVersion, maxElectionId));
285283
}
286-
addressToServerTupleMap.get(newDescription.getAddress()).server.invalidate();
284+
addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting();
287285
return false;
288286
}
289287

@@ -370,7 +368,7 @@ private void invalidateOldPrimaries(final ServerAddress newPrimary) {
370368
if (LOGGER.isInfoEnabled()) {
371369
LOGGER.info(format("Rediscovering type of existing primary %s", serverTuple.description.getAddress()));
372370
}
373-
serverTuple.server.invalidate(new MongoNotPrimaryException(new BsonDocument(), serverTuple.description.getAddress()));
371+
serverTuple.server.invalidate();
374372
}
375373
}
376374
}

driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,18 @@ public void failed(final Throwable t) {
105105

106106
@Override
107107
public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler) {
108+
readAsync(numBytes, 0, handler);
109+
}
110+
111+
private void readAsync(final int numBytes, final int additionalTimeout, final AsyncCompletionHandler<ByteBuf> handler) {
108112
ByteBuf buffer = bufferProvider.getBuffer(numBytes);
109-
channel.read(buffer.asNIO(), settings.getReadTimeout(MILLISECONDS), MILLISECONDS, null,
110-
new BasicCompletionHandler(buffer, handler));
113+
114+
int timeout = settings.getReadTimeout(MILLISECONDS);
115+
if (timeout > 0 && additionalTimeout > 0) {
116+
timeout += additionalTimeout;
117+
}
118+
119+
channel.read(buffer.asNIO(), timeout, MILLISECONDS, null, new BasicCompletionHandler(buffer, handler));
111120
}
112121

113122
@Override
@@ -131,6 +140,18 @@ public ByteBuf read(final int numBytes) throws IOException {
131140
return handler.getRead();
132141
}
133142

143+
@Override
144+
public boolean supportsAdditionalTimeout() {
145+
return true;
146+
}
147+
148+
@Override
149+
public ByteBuf read(final int numBytes, final int additionalTimeout) throws IOException {
150+
FutureAsyncCompletionHandler<ByteBuf> handler = new FutureAsyncCompletionHandler<ByteBuf>();
151+
readAsync(numBytes, additionalTimeout, handler);
152+
return handler.getRead();
153+
}
154+
134155
@Override
135156
public ServerAddress getAddress() {
136157
return serverAddress;

0 commit comments

Comments
 (0)