Skip to content

Commit 2412cbd

Browse files
stIncMalekatcharov
andauthored
Direct retries to another mongos if one is available (#1367)
JAVA-4254 --------- Co-authored-by: Maxim Katcharov <[email protected]>
1 parent 01ba99d commit 2412cbd

31 files changed

+1086
-166
lines changed

driver-core/src/main/com/mongodb/connection/ClusterSettings.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -468,16 +468,18 @@ public String getRequiredReplicaSetName() {
468468
*
469469
* <p>The server selector augments the normal server selection rules applied by the driver when determining
470470
* which server to send an operation to. At the point that it's called by the driver, the
471-
* {@link com.mongodb.connection.ClusterDescription} which is passed to it contains a list of
472-
* {@link com.mongodb.connection.ServerDescription} instances which satisfy either the configured {@link com.mongodb.ReadPreference}
473-
* for any read operation or ones that can take writes (e.g. a standalone, mongos, or replica set primary).
471+
* {@link ClusterDescription} which is passed to it {@linkplain ClusterDescription#getServerDescriptions() contains} a list of
472+
* {@link ServerDescription} instances which satisfy either the configured {@link com.mongodb.ReadPreference}
473+
* for any read operation or ones that can take writes (e.g. a standalone, mongos, or replica set primary),
474+
* barring those corresponding to servers that the driver considers unavailable or potentially problematic.
474475
* </p>
475476
* <p>The server selector can then filter the {@code ServerDescription} list using whatever criteria that is required by the
476477
* application.</p>
477-
* <p>After this selector executes, two additional selectors are applied by the driver:</p>
478+
* <p>After this selector executes, three additional selectors are applied by the driver:</p>
478479
* <ul>
479480
* <li>select from within the latency window</li>
480-
* <li>select a random server from those remaining</li>
481+
* <li>select at most two random servers from those remaining</li>
482+
* <li>select the one with fewer outstanding concurrent operations</li>
481483
* </ul>
482484
* <p>To skip the latency window selector, an application can:</p>
483485
* <ul>
@@ -486,6 +488,7 @@ public String getRequiredReplicaSetName() {
486488
* </ul>
487489
*
488490
* @return the server selector, which may be null
491+
* @see Builder#serverSelector(ServerSelector)
489492
*/
490493
@Nullable
491494
public ServerSelector getServerSelector() {

driver-core/src/main/com/mongodb/internal/async/function/RetryState.java

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -78,32 +78,33 @@ public RetryState() {
7878
* which is usually synchronous code.
7979
*
8080
* @param attemptException The exception produced by the most recent attempt.
81-
* It is passed to the {@code retryPredicate} and to the {@code exceptionTransformer}.
82-
* @param exceptionTransformer A function that chooses which exception to preserve as a prospective failed result of the associated
83-
* retryable activity and may also transform or mutate the exceptions.
84-
* The choice is between
81+
* It is passed to the {@code retryPredicate} and to the {@code onAttemptFailureOperator}.
82+
* @param onAttemptFailureOperator The action that is called once per failed attempt before (in the happens-before order) the
83+
* {@code retryPredicate}, regardless of whether the {@code retryPredicate} is called.
84+
* This action is allowed to have side effects.
85+
* <p>
86+
* It also has to choose which exception to preserve as a prospective failed result of the associated retryable activity.
87+
* The {@code onAttemptFailureOperator} may mutate its arguments, choose from the arguments, or return a different exception,
88+
* but it must return a {@code @}{@link NonNull} value.
89+
* The choice is between</p>
8590
* <ul>
8691
* <li>the previously chosen exception or {@code null} if none has been chosen
87-
* (the first argument of the {@code exceptionTransformer})</li>
88-
* <li>and the exception from the most recent attempt (the second argument of the {@code exceptionTransformer}).</li>
92+
* (the first argument of the {@code onAttemptFailureOperator})</li>
93+
* <li>and the exception from the most recent attempt (the second argument of the {@code onAttemptFailureOperator}).</li>
8994
* </ul>
90-
* The {@code exceptionTransformer} may either choose from its arguments, or return a different exception, a.k.a. transform,
91-
* but it must return a {@code @}{@link NonNull} value.
92-
* The {@code exceptionTransformer} is called once before (in the happens-before order) the {@code retryPredicate},
93-
* regardless of whether the {@code retryPredicate} is called. The result of the {@code exceptionTransformer} does not affect
94-
* what exception is passed to the {@code retryPredicate}.
95+
* The result of the {@code onAttemptFailureOperator} does not affect the exception passed to the {@code retryPredicate}.
9596
* @param retryPredicate {@code true} iff another attempt needs to be made. The {@code retryPredicate} is called not more than once
9697
* per attempt and only if all the following is true:
9798
* <ul>
98-
* <li>{@code exceptionTransformer} completed normally;</li>
99+
* <li>{@code onAttemptFailureOperator} completed normally;</li>
99100
* <li>the most recent attempt is not the {@linkplain #isLastAttempt() last} one.</li>
100101
* </ul>
101102
* The {@code retryPredicate} accepts this {@link RetryState} and the exception from the most recent attempt,
102103
* and may mutate the exception. The {@linkplain RetryState} advances to represent the state of a new attempt
103104
* after (in the happens-before order) testing the {@code retryPredicate}, and only if the predicate completes normally.
104105
* @throws RuntimeException Iff any of the following is true:
105106
* <ul>
106-
* <li>the {@code exceptionTransformer} completed abruptly;</li>
107+
* <li>the {@code onAttemptFailureOperator} completed abruptly;</li>
107108
* <li>the most recent attempt is the {@linkplain #isLastAttempt() last} one;</li>
108109
* <li>the {@code retryPredicate} completed abruptly;</li>
109110
* <li>the {@code retryPredicate} is {@code false}.</li>
@@ -112,10 +113,10 @@ public RetryState() {
112113
* i.e., the caller must not do any more attempts.
113114
* @see #advanceOrThrow(Throwable, BinaryOperator, BiPredicate)
114115
*/
115-
void advanceOrThrow(final RuntimeException attemptException, final BinaryOperator<Throwable> exceptionTransformer,
116+
void advanceOrThrow(final RuntimeException attemptException, final BinaryOperator<Throwable> onAttemptFailureOperator,
116117
final BiPredicate<RetryState, Throwable> retryPredicate) throws RuntimeException {
117118
try {
118-
doAdvanceOrThrow(attemptException, exceptionTransformer, retryPredicate, true);
119+
doAdvanceOrThrow(attemptException, onAttemptFailureOperator, retryPredicate, true);
119120
} catch (RuntimeException | Error unchecked) {
120121
throw unchecked;
121122
} catch (Throwable checked) {
@@ -129,18 +130,19 @@ void advanceOrThrow(final RuntimeException attemptException, final BinaryOperato
129130
*
130131
* @see #advanceOrThrow(RuntimeException, BinaryOperator, BiPredicate)
131132
*/
132-
void advanceOrThrow(final Throwable attemptException, final BinaryOperator<Throwable> exceptionTransformer,
133+
void advanceOrThrow(final Throwable attemptException, final BinaryOperator<Throwable> onAttemptFailureOperator,
133134
final BiPredicate<RetryState, Throwable> retryPredicate) throws Throwable {
134-
doAdvanceOrThrow(attemptException, exceptionTransformer, retryPredicate, false);
135+
doAdvanceOrThrow(attemptException, onAttemptFailureOperator, retryPredicate, false);
135136
}
136137

137138
/**
138139
* @param onlyRuntimeExceptions {@code true} iff the method must expect {@link #exception} and {@code attemptException} to be
139140
* {@link RuntimeException}s and must not explicitly handle other {@link Throwable} types, of which only {@link Error} is possible
140141
* as {@link RetryState} does not have any source of {@link Exception}s.
142+
* @param onAttemptFailureOperator See {@link #advanceOrThrow(RuntimeException, BinaryOperator, BiPredicate)}.
141143
*/
142144
private void doAdvanceOrThrow(final Throwable attemptException,
143-
final BinaryOperator<Throwable> exceptionTransformer,
145+
final BinaryOperator<Throwable> onAttemptFailureOperator,
144146
final BiPredicate<RetryState, Throwable> retryPredicate,
145147
final boolean onlyRuntimeExceptions) throws Throwable {
146148
assertTrue(attempt() < attempts);
@@ -149,7 +151,7 @@ private void doAdvanceOrThrow(final Throwable attemptException,
149151
assertTrue(isRuntime(attemptException));
150152
}
151153
assertTrue(!isFirstAttempt() || exception == null);
152-
Throwable newlyChosenException = transformException(exception, attemptException, onlyRuntimeExceptions, exceptionTransformer);
154+
Throwable newlyChosenException = callOnAttemptFailureOperator(exception, attemptException, onlyRuntimeExceptions, onAttemptFailureOperator);
153155
if (isLastAttempt()) {
154156
exception = newlyChosenException;
155157
throw exception;
@@ -167,27 +169,31 @@ private void doAdvanceOrThrow(final Throwable attemptException,
167169

168170
/**
169171
* @param onlyRuntimeExceptions See {@link #doAdvanceOrThrow(Throwable, BinaryOperator, BiPredicate, boolean)}.
172+
* @param onAttemptFailureOperator See {@link #advanceOrThrow(RuntimeException, BinaryOperator, BiPredicate)}.
170173
*/
171-
private static Throwable transformException(@Nullable final Throwable previouslyChosenException, final Throwable attemptException,
172-
final boolean onlyRuntimeExceptions, final BinaryOperator<Throwable> exceptionTransformer) {
174+
private static Throwable callOnAttemptFailureOperator(
175+
@Nullable final Throwable previouslyChosenException,
176+
final Throwable attemptException,
177+
final boolean onlyRuntimeExceptions,
178+
final BinaryOperator<Throwable> onAttemptFailureOperator) {
173179
if (onlyRuntimeExceptions && previouslyChosenException != null) {
174180
assertTrue(isRuntime(previouslyChosenException));
175181
}
176182
Throwable result;
177183
try {
178-
result = assertNotNull(exceptionTransformer.apply(previouslyChosenException, attemptException));
184+
result = assertNotNull(onAttemptFailureOperator.apply(previouslyChosenException, attemptException));
179185
if (onlyRuntimeExceptions) {
180186
assertTrue(isRuntime(result));
181187
}
182-
} catch (Throwable exceptionTransformerException) {
183-
if (onlyRuntimeExceptions && !isRuntime(exceptionTransformerException)) {
184-
throw exceptionTransformerException;
188+
} catch (Throwable onAttemptFailureOperatorException) {
189+
if (onlyRuntimeExceptions && !isRuntime(onAttemptFailureOperatorException)) {
190+
throw onAttemptFailureOperatorException;
185191
}
186192
if (previouslyChosenException != null) {
187-
exceptionTransformerException.addSuppressed(previouslyChosenException);
193+
onAttemptFailureOperatorException.addSuppressed(previouslyChosenException);
188194
}
189-
exceptionTransformerException.addSuppressed(attemptException);
190-
throw exceptionTransformerException;
195+
onAttemptFailureOperatorException.addSuppressed(attemptException);
196+
throw onAttemptFailureOperatorException;
191197
}
192198
return result;
193199
}

driver-core/src/main/com/mongodb/internal/async/function/RetryingAsyncCallbackSupplier.java

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,31 +41,34 @@
4141
public final class RetryingAsyncCallbackSupplier<R> implements AsyncCallbackSupplier<R> {
4242
private final RetryState state;
4343
private final BiPredicate<RetryState, Throwable> retryPredicate;
44-
private final BinaryOperator<Throwable> failedResultTransformer;
44+
private final BinaryOperator<Throwable> onAttemptFailureOperator;
4545
private final AsyncCallbackSupplier<R> asyncFunction;
4646

4747
/**
4848
* @param state The {@link RetryState} to be deemed as initial for the purpose of the new {@link RetryingAsyncCallbackSupplier}.
49-
* @param failedResultTransformer A function that chooses which failed result of the {@code asyncFunction} to preserve as a prospective
50-
* failed result of this {@link RetryingAsyncCallbackSupplier} and may also transform or mutate the exceptions.
51-
* The choice is between
49+
* @param onAttemptFailureOperator The action that is called once per failed attempt before (in the happens-before order) the
50+
* {@code retryPredicate}, regardless of whether the {@code retryPredicate} is called.
51+
* This action is allowed to have side effects.
52+
* <p>
53+
* It also has to choose which exception to preserve as a prospective failed result of this {@link RetryingAsyncCallbackSupplier}.
54+
* The {@code onAttemptFailureOperator} may mutate its arguments, choose from the arguments, or return a different exception,
55+
* but it must return a {@code @}{@link NonNull} value.
56+
* The choice is between</p>
5257
* <ul>
5358
* <li>the previously chosen failed result or {@code null} if none has been chosen
54-
* (the first argument of the {@code failedResultTransformer})</li>
55-
* <li>and the failed result from the most recent attempt (the second argument of the {@code failedResultTransformer}).</li>
59+
* (the first argument of the {@code onAttemptFailureOperator})</li>
60+
* <li>and the failed result from the most recent attempt (the second argument of the {@code onAttemptFailureOperator}).</li>
5661
* </ul>
57-
* The {@code failedResultTransformer} may either choose from its arguments, or return a different exception, a.k.a. transform,
58-
* but it must return a {@code @}{@link NonNull} value.
59-
* If it completes abruptly, then the {@code asyncFunction} cannot be retried and the exception thrown by
60-
* the {@code failedResultTransformer} is used as a failed result of this {@link RetryingAsyncCallbackSupplier}.
61-
* The {@code failedResultTransformer} is called before (in the happens-before order) the {@code retryPredicate}.
62-
* The result of the {@code failedResultTransformer} does not affect what exception is passed to the {@code retryPredicate}.
62+
* The result of the {@code onAttemptFailureOperator} does not affect the exception passed to the {@code retryPredicate}.
63+
* <p>
64+
* If {@code onAttemptFailureOperator} completes abruptly, then the {@code asyncFunction} cannot be retried and the exception thrown by
65+
* the {@code onAttemptFailureOperator} is used as a failed result of this {@link RetryingAsyncCallbackSupplier}.</p>
6366
* @param retryPredicate {@code true} iff another attempt needs to be made. If it completes abruptly,
6467
* then the {@code asyncFunction} cannot be retried and the exception thrown by the {@code retryPredicate}
6568
* is used as a failed result of this {@link RetryingAsyncCallbackSupplier}. The {@code retryPredicate} is called not more than once
6669
* per attempt and only if all the following is true:
6770
* <ul>
68-
* <li>{@code failedResultTransformer} completed normally;</li>
71+
* <li>{@code onAttemptFailureOperator} completed normally;</li>
6972
* <li>the most recent attempt is not the {@linkplain RetryState#isLastAttempt() last} one.</li>
7073
* </ul>
7174
* The {@code retryPredicate} accepts this {@link RetryState} and the exception from the most recent attempt,
@@ -75,12 +78,12 @@ public final class RetryingAsyncCallbackSupplier<R> implements AsyncCallbackSupp
7578
*/
7679
public RetryingAsyncCallbackSupplier(
7780
final RetryState state,
78-
final BinaryOperator<Throwable> failedResultTransformer,
81+
final BinaryOperator<Throwable> onAttemptFailureOperator,
7982
final BiPredicate<RetryState, Throwable> retryPredicate,
8083
final AsyncCallbackSupplier<R> asyncFunction) {
8184
this.state = state;
8285
this.retryPredicate = retryPredicate;
83-
this.failedResultTransformer = failedResultTransformer;
86+
this.onAttemptFailureOperator = onAttemptFailureOperator;
8487
this.asyncFunction = asyncFunction;
8588
}
8689

@@ -113,7 +116,7 @@ private class RetryingCallback implements SingleResultCallback<R> {
113116
public void onResult(@Nullable final R result, @Nullable final Throwable t) {
114117
if (t != null) {
115118
try {
116-
state.advanceOrThrow(t, failedResultTransformer, retryPredicate);
119+
state.advanceOrThrow(t, onAttemptFailureOperator, retryPredicate);
117120
} catch (Throwable failedResult) {
118121
wrapped.onResult(null, failedResult);
119122
return;

driver-core/src/main/com/mongodb/internal/async/function/RetryingSyncSupplier.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,26 +37,26 @@
3737
public final class RetryingSyncSupplier<R> implements Supplier<R> {
3838
private final RetryState state;
3939
private final BiPredicate<RetryState, Throwable> retryPredicate;
40-
private final BinaryOperator<Throwable> failedResultTransformer;
40+
private final BinaryOperator<Throwable> onAttemptFailureOperator;
4141
private final Supplier<R> syncFunction;
4242

4343
/**
4444
* See {@link RetryingAsyncCallbackSupplier#RetryingAsyncCallbackSupplier(RetryState, BinaryOperator, BiPredicate, AsyncCallbackSupplier)}
4545
* for the documentation of the parameters.
4646
*
47-
* @param failedResultTransformer Even though the {@code failedResultTransformer} accepts {@link Throwable},
47+
* @param onAttemptFailureOperator Even though the {@code onAttemptFailureOperator} accepts {@link Throwable},
4848
* only {@link RuntimeException}s are passed to it.
4949
* @param retryPredicate Even though the {@code retryPredicate} accepts {@link Throwable},
5050
* only {@link RuntimeException}s are passed to it.
5151
*/
5252
public RetryingSyncSupplier(
5353
final RetryState state,
54-
final BinaryOperator<Throwable> failedResultTransformer,
54+
final BinaryOperator<Throwable> onAttemptFailureOperator,
5555
final BiPredicate<RetryState, Throwable> retryPredicate,
5656
final Supplier<R> syncFunction) {
5757
this.state = state;
5858
this.retryPredicate = retryPredicate;
59-
this.failedResultTransformer = failedResultTransformer;
59+
this.onAttemptFailureOperator = onAttemptFailureOperator;
6060
this.syncFunction = syncFunction;
6161
}
6262

@@ -66,10 +66,10 @@ public R get() {
6666
try {
6767
return syncFunction.get();
6868
} catch (RuntimeException attemptException) {
69-
state.advanceOrThrow(attemptException, failedResultTransformer, retryPredicate);
69+
state.advanceOrThrow(attemptException, onAttemptFailureOperator, retryPredicate);
7070
} catch (Exception attemptException) {
7171
// wrap potential sneaky / Kotlin exceptions
72-
state.advanceOrThrow(new RuntimeException(attemptException), failedResultTransformer, retryPredicate);
72+
state.advanceOrThrow(new RuntimeException(attemptException), onAttemptFailureOperator, retryPredicate);
7373
}
7474
}
7575
}

driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@
3131

3232
import java.util.ArrayList;
3333
import java.util.Collection;
34+
import java.util.HashMap;
3435
import java.util.HashSet;
3536
import java.util.Iterator;
3637
import java.util.List;
38+
import java.util.Map;
3739
import java.util.Set;
3840
import java.util.concurrent.ConcurrentHashMap;
3941
import java.util.concurrent.ConcurrentMap;
@@ -123,14 +125,13 @@ public void close() {
123125
}
124126

125127
@Override
126-
public ClusterableServer getServer(final ServerAddress serverAddress) {
128+
public ServersSnapshot getServersSnapshot() {
127129
isTrue("is open", !isClosed());
128-
129-
ServerTuple serverTuple = addressToServerTupleMap.get(serverAddress);
130-
if (serverTuple == null) {
131-
return null;
132-
}
133-
return serverTuple.server;
130+
Map<ServerAddress, ServerTuple> nonAtomicSnapshot = new HashMap<>(addressToServerTupleMap);
131+
return serverAddress -> {
132+
ServerTuple serverTuple = nonAtomicSnapshot.get(serverAddress);
133+
return serverTuple == null ? null : serverTuple.server;
134+
};
134135
}
135136

136137
void onChange(final Collection<ServerAddress> newHosts) {

0 commit comments

Comments
 (0)