@@ -79,11 +79,13 @@ class DefaultServerMonitor implements ServerMonitor {
79
79
private final ServerApi serverApi ;
80
80
private final boolean faas ;
81
81
private final ServerSettings serverSettings ;
82
- private final ServerMonitorRunnable monitor ;
83
- private final Thread monitorThread ;
84
- private final RoundTripTimeRunnable roundTripTimeMonitor ;
82
+ private final ServerMonitor monitor ;
83
+ /**
84
+ * Must be guarded by {@link #lock}.
85
+ */
86
+ @ Nullable
87
+ private RoundTripTimeMonitor roundTripTimeMonitor ;
85
88
private final ExponentiallyWeightedMovingAverage averageRoundTripTime = new ExponentiallyWeightedMovingAverage (0.2 );
86
- private final Thread roundTripTimeMonitorThread ;
87
89
private final Lock lock = new ReentrantLock ();
88
90
private final Condition condition = lock .newCondition ();
89
91
private volatile boolean isClosed ;
@@ -102,20 +104,26 @@ class DefaultServerMonitor implements ServerMonitor {
102
104
this .serverApi = serverApi ;
103
105
this .faas = faas ;
104
106
this .sdamProvider = sdamProvider ;
105
- monitor = new ServerMonitorRunnable ();
106
- monitorThread = new Thread (monitor , "cluster-" + this .serverId .getClusterId () + "-" + this .serverId .getAddress ());
107
- monitorThread .setDaemon (true );
108
- roundTripTimeMonitor = new RoundTripTimeRunnable ();
109
- roundTripTimeMonitorThread = new Thread (roundTripTimeMonitor ,
110
- "cluster-rtt-" + this .serverId .getClusterId () + "-" + this .serverId .getAddress ());
111
- roundTripTimeMonitorThread .setDaemon (true );
107
+ monitor = new ServerMonitor ();
108
+ roundTripTimeMonitor = null ;
112
109
isClosed = false ;
113
110
}
114
111
115
112
@ Override
116
113
public void start () {
117
- monitorThread .start ();
118
- roundTripTimeMonitorThread .start ();
114
+ monitor .start ();
115
+ }
116
+
117
+ private void ensureRoundTripTimeMonitorStarted () {
118
+ lock .lock ();
119
+ try {
120
+ if (roundTripTimeMonitor == null ) {
121
+ roundTripTimeMonitor = new RoundTripTimeMonitor ();
122
+ roundTripTimeMonitor .start ();
123
+ }
124
+ } finally {
125
+ lock .unlock ();
126
+ }
119
127
}
120
128
121
129
@ Override
@@ -124,24 +132,35 @@ public void connect() {
124
132
}
125
133
126
134
@ Override
135
+ @ SuppressWarnings ("try" )
127
136
public void close () {
128
137
isClosed = true ;
129
- monitor .close ();
130
- monitorThread .interrupt ();
131
- roundTripTimeMonitor .close ();
132
- roundTripTimeMonitorThread .interrupt ();
138
+ withLock (lock , () -> {
139
+ //noinspection EmptyTryBlock
140
+ try (ServerMonitor ignoredAutoClosed = monitor ;
141
+ RoundTripTimeMonitor ignoredAutoClose2 = roundTripTimeMonitor ) {
142
+ // we are automatically closing resources here
143
+ }
144
+ });
133
145
}
134
146
135
147
@ Override
136
148
public void cancelCurrentCheck () {
137
149
monitor .cancelCurrentCheck ();
138
150
}
139
151
140
- class ServerMonitorRunnable implements Runnable {
152
+ class ServerMonitor extends Thread implements AutoCloseable {
141
153
private volatile InternalConnection connection = null ;
142
154
private volatile boolean currentCheckCancelled ;
143
155
144
- void close () {
156
+ ServerMonitor () {
157
+ super ("cluster-" + serverId .getClusterId () + "-" + serverId .getAddress ());
158
+ setDaemon (true );
159
+ }
160
+
161
+ @ Override
162
+ public void close () {
163
+ interrupt ();
145
164
InternalConnection connection = this .connection ;
146
165
if (connection != null ) {
147
166
connection .close ();
@@ -155,6 +174,10 @@ public void run() {
155
174
while (!isClosed ) {
156
175
ServerDescription previousServerDescription = currentServerDescription ;
157
176
currentServerDescription = lookupServerDescription (currentServerDescription );
177
+ boolean shouldStreamResponses = shouldStreamResponses (currentServerDescription );
178
+ if (shouldStreamResponses ) {
179
+ ensureRoundTripTimeMonitorStarted ();
180
+ }
158
181
159
182
if (isClosed ) {
160
183
continue ;
@@ -169,7 +192,7 @@ public void run() {
169
192
logStateChange (previousServerDescription , currentServerDescription );
170
193
sdamProvider .get ().update (currentServerDescription );
171
194
172
- if ((shouldStreamResponses ( currentServerDescription ) && currentServerDescription .getType () != UNKNOWN )
195
+ if ((shouldStreamResponses && currentServerDescription .getType () != UNKNOWN )
173
196
|| (connection != null && connection .hasMoreToCome ())
174
197
|| (currentServerDescription .getException () instanceof MongoSocketException
175
198
&& previousServerDescription .getType () != UNKNOWN )) {
@@ -202,16 +225,17 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
202
225
if (LOGGER .isDebugEnabled ()) {
203
226
LOGGER .debug (format ("Checking status of %s" , serverId .getAddress ()));
204
227
}
228
+ boolean shouldStreamResponses = shouldStreamResponses (currentServerDescription );
205
229
serverMonitorListener .serverHearbeatStarted (new ServerHeartbeatStartedEvent (
206
- connection .getDescription ().getConnectionId (), shouldStreamResponses ( currentServerDescription ) ));
230
+ connection .getDescription ().getConnectionId (), shouldStreamResponses ));
207
231
208
232
long start = System .nanoTime ();
209
233
try {
210
234
SessionContext sessionContext = NoOpSessionContext .INSTANCE ;
211
235
if (!connection .hasMoreToCome ()) {
212
236
BsonDocument helloDocument = new BsonDocument (getHandshakeCommandName (currentServerDescription ), new BsonInt32 (1 ))
213
237
.append ("helloOk" , BsonBoolean .TRUE );
214
- if (shouldStreamResponses ( currentServerDescription ) ) {
238
+ if (shouldStreamResponses ) {
215
239
helloDocument .append ("topologyVersion" , assertNotNull (currentServerDescription .getTopologyVersion ()).asDocument ());
216
240
helloDocument .append ("maxAwaitTimeMS" , new BsonInt64 (serverSettings .getHeartbeatFrequency (MILLISECONDS )));
217
241
}
@@ -221,23 +245,26 @@ private ServerDescription lookupServerDescription(final ServerDescription curren
221
245
}
222
246
223
247
BsonDocument helloResult ;
224
- if (shouldStreamResponses ( currentServerDescription ) ) {
248
+ if (shouldStreamResponses ) {
225
249
helloResult = connection .receive (new BsonDocumentCodec (), sessionContext ,
226
250
Math .toIntExact (serverSettings .getHeartbeatFrequency (MILLISECONDS )));
227
251
} else {
228
252
helloResult = connection .receive (new BsonDocumentCodec (), sessionContext );
229
253
}
230
254
231
255
long elapsedTimeNanos = System .nanoTime () - start ;
256
+ if (!shouldStreamResponses ) {
257
+ averageRoundTripTime .addSample (elapsedTimeNanos );
258
+ }
232
259
serverMonitorListener .serverHeartbeatSucceeded (
233
260
new ServerHeartbeatSucceededEvent (connection .getDescription ().getConnectionId (), helloResult ,
234
- elapsedTimeNanos , shouldStreamResponses ( currentServerDescription ) ));
261
+ elapsedTimeNanos , shouldStreamResponses ));
235
262
236
263
return createServerDescription (serverId .getAddress (), helloResult , averageRoundTripTime .getAverage ());
237
264
} catch (Exception e ) {
238
265
serverMonitorListener .serverHeartbeatFailed (
239
266
new ServerHeartbeatFailedEvent (connection .getDescription ().getConnectionId (), System .nanoTime () - start ,
240
- shouldStreamResponses ( currentServerDescription ) , e ));
267
+ shouldStreamResponses , e ));
241
268
throw e ;
242
269
}
243
270
} catch (Throwable t ) {
@@ -399,10 +426,17 @@ static boolean shouldLogStageChange(final ServerDescription previous, final Serv
399
426
}
400
427
401
428
402
- private class RoundTripTimeRunnable implements Runnable {
429
+ private class RoundTripTimeMonitor extends Thread implements AutoCloseable {
403
430
private volatile InternalConnection connection = null ;
404
431
405
- void close () {
432
+ RoundTripTimeMonitor () {
433
+ super ("cluster-rtt-" + serverId .getClusterId () + "-" + serverId .getAddress ());
434
+ setDaemon (true );
435
+ }
436
+
437
+ @ Override
438
+ public void close () {
439
+ interrupt ();
406
440
InternalConnection connection = this .connection ;
407
441
if (connection != null ) {
408
442
connection .close ();
0 commit comments