Skip to content

Commit 7295322

Browse files
stIncMalevbabaninjyemin
authored
Allow configuring the monitoring protocol to use; use the polling protocol in a FaaS environment by default (#1313)
JAVA-4936 --------- Co-authored-by: Viacheslav Babanin <[email protected]> Co-authored-by: Jeff Yemin <[email protected]>
1 parent d85982d commit 7295322

32 files changed

+1611
-80
lines changed

driver-core/src/main/com/mongodb/ConnectionString.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@
1818

1919
import com.mongodb.connection.ClusterSettings;
2020
import com.mongodb.connection.ConnectionPoolSettings;
21+
import com.mongodb.connection.ServerMonitoringMode;
22+
import com.mongodb.connection.ServerSettings;
2123
import com.mongodb.connection.SocketSettings;
2224
import com.mongodb.event.ConnectionCheckOutStartedEvent;
2325
import com.mongodb.event.ConnectionCheckedInEvent;
2426
import com.mongodb.event.ConnectionCheckedOutEvent;
2527
import com.mongodb.event.ConnectionCreatedEvent;
2628
import com.mongodb.event.ConnectionReadyEvent;
29+
import com.mongodb.internal.connection.ServerMonitoringModeUtil;
2730
import com.mongodb.internal.diagnostics.logging.Logger;
2831
import com.mongodb.internal.diagnostics.logging.Loggers;
2932
import com.mongodb.internal.dns.DefaultDnsResolver;
@@ -111,6 +114,13 @@
111114
* <ul>
112115
* <li>{@code heartbeatFrequencyMS=ms}: The frequency that the driver will attempt to determine the current state of each server in the
113116
* cluster.</li>
117+
* <li>{@code serverMonitoringMode=enum}: The server monitoring mode, which defines the monitoring protocol to use. Enumerated values:
118+
* <ul>
119+
* <li>{@code stream};</li>
120+
* <li>{@code poll};</li>
121+
* <li>{@code auto} - the default.</li>
122+
* </ul>
123+
* </li>
114124
* </ul>
115125
* <p>Replica set configuration:</p>
116126
* <ul>
@@ -307,6 +317,7 @@ public class ConnectionString {
307317
private Integer serverSelectionTimeout;
308318
private Integer localThreshold;
309319
private Integer heartbeatFrequency;
320+
private ServerMonitoringMode serverMonitoringMode;
310321
private String applicationName;
311322
private List<MongoCompressor> compressorList;
312323
private UuidRepresentation uuidRepresentation;
@@ -529,6 +540,7 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient
529540
GENERAL_OPTIONS_KEYS.add("serverselectiontimeoutms");
530541
GENERAL_OPTIONS_KEYS.add("localthresholdms");
531542
GENERAL_OPTIONS_KEYS.add("heartbeatfrequencyms");
543+
GENERAL_OPTIONS_KEYS.add("servermonitoringmode");
532544
GENERAL_OPTIONS_KEYS.add("retrywrites");
533545
GENERAL_OPTIONS_KEYS.add("retryreads");
534546

@@ -665,6 +677,9 @@ private void translateOptions(final Map<String, List<String>> optionsMap) {
665677
case "heartbeatfrequencyms":
666678
heartbeatFrequency = parseInteger(value, "heartbeatfrequencyms");
667679
break;
680+
case "servermonitoringmode":
681+
serverMonitoringMode = ServerMonitoringModeUtil.fromString(value);
682+
break;
668683
case "appname":
669684
applicationName = value;
670685
break;
@@ -1623,6 +1638,20 @@ public Integer getHeartbeatFrequency() {
16231638
return heartbeatFrequency;
16241639
}
16251640

1641+
/**
1642+
* The server monitoring mode, which defines the monitoring protocol to use.
1643+
* <p>
1644+
* Default is {@link ServerMonitoringMode#AUTO}.</p>
1645+
*
1646+
* @return The {@link ServerMonitoringMode}, or {@code null} if unset and the default is to be used.
1647+
* @see ServerSettings#getServerMonitoringMode()
1648+
* @since 5.1
1649+
*/
1650+
@Nullable
1651+
public ServerMonitoringMode getServerMonitoringMode() {
1652+
return serverMonitoringMode;
1653+
}
1654+
16261655
/**
16271656
* Gets the logical name of the application. The application name may be used by the client to identify the application to the server,
16281657
* for use in server logs, slow query logs, and profile collection.
@@ -1704,6 +1733,7 @@ public boolean equals(final Object o) {
17041733
&& Objects.equals(serverSelectionTimeout, that.serverSelectionTimeout)
17051734
&& Objects.equals(localThreshold, that.localThreshold)
17061735
&& Objects.equals(heartbeatFrequency, that.heartbeatFrequency)
1736+
&& Objects.equals(serverMonitoringMode, that.serverMonitoringMode)
17071737
&& Objects.equals(applicationName, that.applicationName)
17081738
&& Objects.equals(compressorList, that.compressorList)
17091739
&& Objects.equals(uuidRepresentation, that.uuidRepresentation)
@@ -1717,7 +1747,7 @@ public int hashCode() {
17171747
writeConcern, retryWrites, retryReads, readConcern, minConnectionPoolSize, maxConnectionPoolSize, maxWaitTime,
17181748
maxConnectionIdleTime, maxConnectionLifeTime, maxConnecting, connectTimeout, socketTimeout, sslEnabled,
17191749
sslInvalidHostnameAllowed, requiredReplicaSetName, serverSelectionTimeout, localThreshold, heartbeatFrequency,
1720-
applicationName, compressorList, uuidRepresentation, srvServiceName, srvMaxHosts, proxyHost, proxyPort,
1721-
proxyUsername, proxyPassword);
1750+
serverMonitoringMode, applicationName, compressorList, uuidRepresentation, srvServiceName, srvMaxHosts, proxyHost,
1751+
proxyPort, proxyUsername, proxyPassword);
17221752
}
17231753
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.connection;
17+
18+
import com.mongodb.event.ClusterListener;
19+
import com.mongodb.event.ServerHeartbeatFailedEvent;
20+
import com.mongodb.event.ServerHeartbeatStartedEvent;
21+
import com.mongodb.event.ServerHeartbeatSucceededEvent;
22+
import com.mongodb.event.ServerListener;
23+
24+
/**
25+
* The server monitoring mode, which defines the monitoring protocol to use.
26+
*
27+
* @see <a href="https://www.mongodb.com/docs/drivers/java/sync/current/fundamentals/monitoring/#server-discovery-and-monitoring-events">
28+
* server discovery and monitoring (SDAM)</a>
29+
* @since 5.1
30+
*/
31+
public enum ServerMonitoringMode {
32+
/**
33+
* Use the streaming protocol when the server supports it or fall back to the polling protocol otherwise.
34+
* When the streaming protocol comes into play,
35+
* {@link ServerHeartbeatStartedEvent#isAwaited()}, {@link ServerHeartbeatSucceededEvent#isAwaited()},
36+
* {@link ServerHeartbeatFailedEvent#isAwaited()} return {@code true} for new events.
37+
* <p>
38+
* The streaming protocol uses long polling for server monitoring, and is intended to reduce the delay between a server change
39+
* that warrants a new event for {@link ServerListener}/{@link ClusterListener},
40+
* and that event being emitted, as well as the related housekeeping work being done.</p>
41+
*/
42+
STREAM(),
43+
/**
44+
* Use the polling protocol.
45+
*/
46+
POLL(),
47+
/**
48+
* Behave the same as {@link #POLL} if running in a FaaS environment, otherwise behave as {@link #STREAM}.
49+
* This is the default.
50+
*/
51+
AUTO()
52+
}

driver-core/src/main/com/mongodb/connection/ServerSettings.java

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import java.util.ArrayList;
2626
import java.util.List;
27+
import java.util.Objects;
2728
import java.util.concurrent.TimeUnit;
2829

2930
import static com.mongodb.assertions.Assertions.notNull;
@@ -38,6 +39,7 @@
3839
public class ServerSettings {
3940
private final long heartbeatFrequencyMS;
4041
private final long minHeartbeatFrequencyMS;
42+
private final ServerMonitoringMode serverMonitoringMode;
4143
private final List<ServerListener> serverListeners;
4244
private final List<ServerMonitorListener> serverMonitorListeners;
4345

@@ -68,6 +70,7 @@ public static Builder builder(final ServerSettings serverSettings) {
6870
public static final class Builder {
6971
private long heartbeatFrequencyMS = 10000;
7072
private long minHeartbeatFrequencyMS = 500;
73+
private ServerMonitoringMode serverMonitoringMode = ServerMonitoringMode.AUTO;
7174
private List<ServerListener> serverListeners = new ArrayList<>();
7275
private List<ServerMonitorListener> serverMonitorListeners = new ArrayList<>();
7376

@@ -87,6 +90,7 @@ public Builder applySettings(final ServerSettings serverSettings) {
8790
notNull("serverSettings", serverSettings);
8891
heartbeatFrequencyMS = serverSettings.heartbeatFrequencyMS;
8992
minHeartbeatFrequencyMS = serverSettings.minHeartbeatFrequencyMS;
93+
serverMonitoringMode = serverSettings.serverMonitoringMode;
9094
serverListeners = new ArrayList<>(serverSettings.serverListeners);
9195
serverMonitorListeners = new ArrayList<>(serverSettings.serverMonitorListeners);
9296
return this;
@@ -117,6 +121,20 @@ public Builder minHeartbeatFrequency(final long minHeartbeatFrequency, final Tim
117121
return this;
118122
}
119123

124+
/**
125+
* Sets the server monitoring mode, which defines the monitoring protocol to use.
126+
* The default value is {@link ServerMonitoringMode#AUTO}.
127+
*
128+
* @param serverMonitoringMode The {@link ServerMonitoringMode}.
129+
* @return {@code this}.
130+
* @see #getServerMonitoringMode()
131+
* @since 5.1
132+
*/
133+
public Builder serverMonitoringMode(final ServerMonitoringMode serverMonitoringMode) {
134+
this.serverMonitoringMode = notNull("serverMonitoringMode", serverMonitoringMode);
135+
return this;
136+
}
137+
120138
/**
121139
* Add a server listener.
122140
*
@@ -181,6 +199,10 @@ public Builder applyConnectionString(final ConnectionString connectionString) {
181199
if (heartbeatFrequency != null) {
182200
heartbeatFrequencyMS = heartbeatFrequency;
183201
}
202+
ServerMonitoringMode serverMonitoringMode = connectionString.getServerMonitoringMode();
203+
if (serverMonitoringMode != null) {
204+
this.serverMonitoringMode = serverMonitoringMode;
205+
}
184206
return this;
185207
}
186208

@@ -215,6 +237,19 @@ public long getMinHeartbeatFrequency(final TimeUnit timeUnit) {
215237
return timeUnit.convert(minHeartbeatFrequencyMS, TimeUnit.MILLISECONDS);
216238
}
217239

240+
/**
241+
* Gets the server monitoring mode, which defines the monitoring protocol to use.
242+
* The default value is {@link ServerMonitoringMode#AUTO}.
243+
*
244+
* @return The {@link ServerMonitoringMode}.
245+
* @see Builder#serverMonitoringMode(ServerMonitoringMode)
246+
* @see ConnectionString#getServerMonitoringMode()
247+
* @since 5.1
248+
*/
249+
public ServerMonitoringMode getServerMonitoringMode() {
250+
return serverMonitoringMode;
251+
}
252+
218253
/**
219254
* Gets the server listeners. The default value is an empty list.
220255
*
@@ -243,40 +278,30 @@ public boolean equals(final Object o) {
243278
if (o == null || getClass() != o.getClass()) {
244279
return false;
245280
}
246-
247-
ServerSettings that = (ServerSettings) o;
248-
249-
if (heartbeatFrequencyMS != that.heartbeatFrequencyMS) {
250-
return false;
251-
}
252-
if (minHeartbeatFrequencyMS != that.minHeartbeatFrequencyMS) {
253-
return false;
254-
}
255-
256-
if (!serverListeners.equals(that.serverListeners)) {
257-
return false;
258-
}
259-
if (!serverMonitorListeners.equals(that.serverMonitorListeners)) {
260-
return false;
261-
}
262-
263-
return true;
281+
final ServerSettings that = (ServerSettings) o;
282+
return heartbeatFrequencyMS == that.heartbeatFrequencyMS
283+
&& minHeartbeatFrequencyMS == that.minHeartbeatFrequencyMS
284+
&& serverMonitoringMode == that.serverMonitoringMode
285+
&& Objects.equals(serverListeners, that.serverListeners)
286+
&& Objects.equals(serverMonitorListeners, that.serverMonitorListeners);
264287
}
265288

266289
@Override
267290
public int hashCode() {
268-
int result = (int) (heartbeatFrequencyMS ^ (heartbeatFrequencyMS >>> 32));
269-
result = 31 * result + (int) (minHeartbeatFrequencyMS ^ (minHeartbeatFrequencyMS >>> 32));
270-
result = 31 * result + serverListeners.hashCode();
271-
result = 31 * result + serverMonitorListeners.hashCode();
272-
return result;
291+
return Objects.hash(
292+
heartbeatFrequencyMS,
293+
minHeartbeatFrequencyMS,
294+
serverMonitoringMode,
295+
serverListeners,
296+
serverMonitorListeners);
273297
}
274298

275299
@Override
276300
public String toString() {
277301
return "ServerSettings{"
278302
+ "heartbeatFrequencyMS=" + heartbeatFrequencyMS
279303
+ ", minHeartbeatFrequencyMS=" + minHeartbeatFrequencyMS
304+
+ ", serverMonitoringMode=" + serverMonitoringMode
280305
+ ", serverListeners='" + serverListeners + '\''
281306
+ ", serverMonitorListeners='" + serverMonitorListeners + '\''
282307
+ '}';
@@ -285,6 +310,7 @@ public String toString() {
285310
ServerSettings(final Builder builder) {
286311
heartbeatFrequencyMS = builder.heartbeatFrequencyMS;
287312
minHeartbeatFrequencyMS = builder.minHeartbeatFrequencyMS;
313+
serverMonitoringMode = builder.serverMonitoringMode;
288314
serverListeners = unmodifiableList(builder.serverListeners);
289315
serverMonitorListeners = unmodifiableList(builder.serverMonitorListeners);
290316
}

driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.event;
1818

1919
import com.mongodb.connection.ConnectionId;
20+
import com.mongodb.connection.ServerMonitoringMode;
2021

2122
import java.util.concurrent.TimeUnit;
2223

@@ -77,6 +78,7 @@ public long getElapsedTime(final TimeUnit timeUnit) {
7778
* to the server and the time that the server waited before sending a response.
7879
*
7980
* @return whether the response was awaited
81+
* @see ServerMonitoringMode#STREAM
8082
* @since 4.1
8183
* @mongodb.server.release 4.4
8284
*/

driver-core/src/main/com/mongodb/event/ServerHeartbeatStartedEvent.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.event;
1818

1919
import com.mongodb.connection.ConnectionId;
20+
import com.mongodb.connection.ServerMonitoringMode;
2021

2122
import static com.mongodb.assertions.Assertions.notNull;
2223

@@ -27,14 +28,30 @@
2728
*/
2829
public final class ServerHeartbeatStartedEvent {
2930
private final ConnectionId connectionId;
31+
private final boolean awaited;
3032

3133
/**
3234
* Construct an instance.
3335
*
3436
* @param connectionId the non-null connnectionId
37+
* @param awaited {@code true} if and only if the heartbeat is for an awaitable `hello` / legacy hello.
38+
* @since 5.1
3539
*/
36-
public ServerHeartbeatStartedEvent(final ConnectionId connectionId) {
40+
public ServerHeartbeatStartedEvent(final ConnectionId connectionId, final boolean awaited) {
3741
this.connectionId = notNull("connectionId", connectionId);
42+
this.awaited = awaited;
43+
}
44+
45+
/**
46+
* Construct an instance.
47+
*
48+
* @param connectionId the non-null connnectionId
49+
* @deprecated Prefer {@link #ServerHeartbeatStartedEvent(ConnectionId, boolean)}.
50+
* If this constructor is used then {@link #isAwaited()} is {@code false}.
51+
*/
52+
@Deprecated
53+
public ServerHeartbeatStartedEvent(final ConnectionId connectionId) {
54+
this(connectionId, false);
3855
}
3956

4057
/**
@@ -46,12 +63,24 @@ public ConnectionId getConnectionId() {
4663
return connectionId;
4764
}
4865

66+
/**
67+
* Gets whether the heartbeat is for an awaitable `hello` / legacy hello.
68+
*
69+
* @return {@code true} if and only if the heartbeat is for an awaitable `hello` / legacy hello.
70+
* @see ServerMonitoringMode#STREAM
71+
* @since 5.1
72+
*/
73+
public boolean isAwaited() {
74+
return awaited;
75+
}
76+
4977
@Override
5078
public String toString() {
5179
return "ServerHeartbeatStartedEvent{"
5280
+ "connectionId=" + connectionId
5381
+ ", server=" + connectionId.getServerId().getAddress()
5482
+ ", clusterId=" + connectionId.getServerId().getClusterId()
83+
+ ", awaited=" + awaited
5584
+ "} " + super.toString();
5685
}
5786
}

driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.event;
1818

1919
import com.mongodb.connection.ConnectionId;
20+
import com.mongodb.connection.ServerMonitoringMode;
2021
import org.bson.BsonDocument;
2122

2223
import java.util.concurrent.TimeUnit;
@@ -87,6 +88,7 @@ public long getElapsedTime(final TimeUnit timeUnit) {
8788
* to the server and the time that the server waited before sending a response.
8889
*
8990
* @return whether the response was awaited
91+
* @see ServerMonitoringMode#STREAM
9092
* @since 4.1
9193
* @mongodb.server.release 4.4
9294
*/

driver-core/src/main/com/mongodb/internal/connection/DefaultClusterFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public Cluster createCluster(final ClusterSettings originalClusterSettings, fina
110110
connectionPoolSettings, internalConnectionPoolSettings,
111111
streamFactory, heartbeatStreamFactory, credential, loggerSettings, commandListener, applicationName,
112112
mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build(), compressorList,
113-
serverApi);
113+
serverApi, FaasEnvironment.getFaasEnvironment() != FaasEnvironment.UNKNOWN);
114114

115115
if (clusterSettings.getMode() == ClusterConnectionMode.SINGLE) {
116116
return new SingleServerCluster(clusterId, clusterSettings, serverFactory);

0 commit comments

Comments
 (0)