Skip to content

Make ServerSelectionLoggingTest pass when run against a load balanced cluster, or when using Unix domain socket #1243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
try {
CountDownLatch currentPhase = phase.get();
ClusterDescription curDescription = description;
logServerSelectionStarted(operationContext, serverSelector, curDescription);
logServerSelectionStarted(clusterId, operationContext, serverSelector, curDescription);
ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector);
ServerTuple serverTuple = selectServer(compositeServerSelector, curDescription);

Expand All @@ -137,7 +137,8 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
}

if (serverTuple != null) {
logServerSelectionSucceeded(operationContext, serverTuple.getServerDescription().getAddress(), serverSelector, curDescription);
logServerSelectionSucceeded(
clusterId, operationContext, serverTuple.getServerDescription().getAddress(), serverSelector, curDescription);
return serverTuple;
}

Expand All @@ -148,7 +149,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
}

if (!selectionWaitingLogged) {
logServerSelectionWaiting(operationContext, remainingTimeNanos, serverSelector, curDescription);
logServerSelectionWaiting(clusterId, operationContext, remainingTimeNanos, serverSelector, curDescription);
selectionWaitingLogged = true;
}

Expand Down Expand Up @@ -178,7 +179,7 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati
CountDownLatch currentPhase = phase.get();
ClusterDescription currentDescription = description;

logServerSelectionStarted(operationContext, serverSelector, currentDescription);
logServerSelectionStarted(clusterId, operationContext, serverSelector, currentDescription);
ServerSelectionRequest request = new ServerSelectionRequest(operationContext, serverSelector, getCompositeServerSelector(serverSelector),
getMaxWaitTimeNanos(), callback);

Expand Down Expand Up @@ -277,13 +278,14 @@ private boolean handleServerSelectionRequest(final ServerSelectionRequest reques

ServerTuple serverTuple = selectServer(request.compositeSelector, description);
if (serverTuple != null) {
logServerSelectionSucceeded(
request.operationContext, serverTuple.getServerDescription().getAddress(), request.originalSelector, description);
logServerSelectionSucceeded(clusterId, request.operationContext, serverTuple.getServerDescription().getAddress(),
request.originalSelector, description);
request.onResult(serverTuple, null);
return true;
}
if (prevPhase == null) {
logServerSelectionWaiting(request.operationContext, request.getRemainingTime(), request.originalSelector, description);
logServerSelectionWaiting(
clusterId, request.operationContext, request.getRemainingTime(), request.originalSelector, description);
}
}

Expand Down Expand Up @@ -360,7 +362,7 @@ private MongoIncompatibleDriverException createAndLogIncompatibleException(
final ServerSelector serverSelector,
final ClusterDescription clusterDescription) {
MongoIncompatibleDriverException exception = createIncompatibleException(clusterDescription);
logServerSelectionFailed(operationContext, exception, serverSelector, clusterDescription);
logServerSelectionFailed(clusterId, operationContext, exception, serverSelector, clusterDescription);
return exception;
}

Expand Down Expand Up @@ -390,7 +392,7 @@ private MongoException createAndLogTimeoutException(
MongoTimeoutException exception = new MongoTimeoutException(format(
"Timed out while waiting for a server that matches %s. Client view of cluster state is %s",
serverSelector, clusterDescription.getShortDescription()));
logServerSelectionFailed(operationContext, exception, serverSelector, clusterDescription);
logServerSelectionFailed(clusterId, operationContext, exception, serverSelector, clusterDescription);
return exception;
}

Expand Down Expand Up @@ -498,7 +500,8 @@ public void run() {
}
}

private void logServerSelectionStarted(
static void logServerSelectionStarted(
final ClusterId clusterId,
final OperationContext operationContext,
final ServerSelector serverSelector,
final ClusterDescription clusterDescription) {
Expand All @@ -514,7 +517,8 @@ private void logServerSelectionStarted(
}
}

private void logServerSelectionWaiting(
private static void logServerSelectionWaiting(
final ClusterId clusterId,
final OperationContext operationContext,
@Nullable
final Long remainingTimeNanos,
Expand All @@ -534,7 +538,8 @@ private void logServerSelectionWaiting(
}
}

private void logServerSelectionFailed(
private static void logServerSelectionFailed(
final ClusterId clusterId,
final OperationContext operationContext,
final MongoException failure,
final ServerSelector serverSelector,
Expand All @@ -558,7 +563,8 @@ private void logServerSelectionFailed(
}
}

private void logServerSelectionSucceeded(
static void logServerSelectionSucceeded(
final ClusterId clusterId,
final OperationContext operationContext,
final ServerAddress serverAddress,
final ServerSelector serverSelector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import static com.mongodb.assertions.Assertions.isTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.connection.ServerConnectionState.CONNECTING;
import static com.mongodb.internal.connection.BaseCluster.logServerSelectionStarted;
import static com.mongodb.internal.connection.BaseCluster.logServerSelectionSucceeded;
import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener;
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
import static java.lang.String.format;
Expand Down Expand Up @@ -204,7 +206,11 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
if (srvRecordResolvedToMultipleHosts) {
throw createResolvedToMultipleHostsException();
}
return new ServerTuple(assertNotNull(server), description.getServerDescriptions().get(0));
ClusterDescription curDescription = description;
logServerSelectionStarted(clusterId, operationContext, serverSelector, curDescription);
ServerTuple serverTuple = new ServerTuple(assertNotNull(server), curDescription.getServerDescriptions().get(0));
logServerSelectionSucceeded(clusterId, operationContext, serverTuple.getServerDescription().getAddress(), serverSelector, curDescription);
return serverTuple;
}


Expand Down Expand Up @@ -238,7 +244,8 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati
return;
}

ServerSelectionRequest serverSelectionRequest = new ServerSelectionRequest(getMaxWaitTimeNanos(), callback);
ServerSelectionRequest serverSelectionRequest = new ServerSelectionRequest(
operationContext, serverSelector, getMaxWaitTimeNanos(), callback);
if (initializationCompleted) {
handleServerSelectionRequest(serverSelectionRequest);
} else {
Expand Down Expand Up @@ -288,7 +295,13 @@ private void handleServerSelectionRequest(final ServerSelectionRequest serverSel
if (srvRecordResolvedToMultipleHosts) {
serverSelectionRequest.onError(createResolvedToMultipleHostsException());
} else {
serverSelectionRequest.onSuccess(new ServerTuple(assertNotNull(server), description.getServerDescriptions().get(0)));
ClusterDescription curDescription = description;
logServerSelectionStarted(
clusterId, serverSelectionRequest.operationContext, serverSelectionRequest.serverSelector, curDescription);
ServerTuple serverTuple = new ServerTuple(assertNotNull(server), curDescription.getServerDescriptions().get(0));
logServerSelectionSucceeded(clusterId, serverSelectionRequest.operationContext,
serverTuple.getServerDescription().getAddress(), serverSelectionRequest.serverSelector, curDescription);
serverSelectionRequest.onSuccess(serverTuple);
}
}

Expand Down Expand Up @@ -391,11 +404,18 @@ public void run() {
}

private static final class ServerSelectionRequest {
private final OperationContext operationContext;
private final ServerSelector serverSelector;
private final long maxWaitTimeNanos;
private final long startTimeNanos = System.nanoTime();
private final SingleResultCallback<ServerTuple> callback;

private ServerSelectionRequest(final long maxWaitTimeNanos, final SingleResultCallback<ServerTuple> callback) {
private ServerSelectionRequest(
final OperationContext operationContext,
final ServerSelector serverSelector,
final long maxWaitTimeNanos, final SingleResultCallback<ServerTuple> callback) {
this.operationContext = operationContext;
this.serverSelector = serverSelector;
this.maxWaitTimeNanos = maxWaitTimeNanos;
this.callback = callback;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,12 @@ private static boolean serverDescriptionChangedEventMatches(final BsonDocument e
return true;
}
String newType = expectedEventContents.getDocument("newDescription").getString("type").getValue();
//noinspection SwitchStatementWithTooFewBranches
switch (newType) {
case "Unknown":
return event.getNewDescription().getType() == ServerType.UNKNOWN;
case "LoadBalancer": {
return event.getNewDescription().getType() == ServerType.LOAD_BALANCER;
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And does it? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not initially. Hopefully, together with the new logging code it will.

Copy link
Member Author

@stIncMale stIncMale Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, now it worked.

default:
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package com.mongodb.client.unified;

import com.mongodb.Function;
import com.mongodb.MongoCommandException;
import com.mongodb.internal.ExceptionUtils.MongoCommandExceptionUtils;
import com.mongodb.internal.logging.LogMessage;
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
import org.bson.BsonBoolean;
import org.bson.BsonDocument;
Expand All @@ -44,19 +46,20 @@ final class LogMatcher {
this.context = context;
}

void assertLogMessageEquality(final String client, final BsonArray expectedMessages, final List<LogMessage> actualMessages) {
void assertLogMessageEquality(final String client, final BsonArray expectedMessages, final List<LogMessage> actualMessages,
final Iterable<Tweak> tweaks) {
context.push(ContextElement.ofLogMessages(client, expectedMessages, actualMessages));

assertEquals(context.getMessage("Number of log messages must be the same"), expectedMessages.size(), actualMessages.size());

for (int i = 0; i < expectedMessages.size(); i++) {
BsonDocument expectedMessageAsDocument = expectedMessages.get(i).asDocument().clone();
// `LogMessage.Entry.Name.OPERATION` is not supported, therefore we skip matching its value
BsonValue expectedDataDocument = expectedMessageAsDocument.get("data");
if (expectedDataDocument != null) {
expectedDataDocument.asDocument().remove(LogMessage.Entry.Name.OPERATION.getValue());
BsonDocument expectedMessage = expectedMessages.get(i).asDocument().clone();
for (Tweak tweak : tweaks) {
expectedMessage = tweak.apply(expectedMessage);
}
if (expectedMessage != null) {
valueMatcher.assertValuesMatch(expectedMessage, asDocument(actualMessages.get(i)));
}
valueMatcher.assertValuesMatch(expectedMessageAsDocument, asDocument(actualMessages.get(i)));
}

context.pop();
Expand Down Expand Up @@ -108,4 +111,27 @@ private static BsonValue asBsonValue(final Object value) {
}
}

interface Tweak extends Function<BsonDocument, BsonDocument> {
/**
* @param expectedMessage May be {@code null}, in which case the method simply returns {@code null}.
* This method may mutate {@code expectedMessage}.
* @return {@code null} iff matching {@code expectedMessage} with the actual message must be skipped.
*/
@Nullable
BsonDocument apply(@Nullable BsonDocument expectedMessage);

static Tweak skip(final LogMessage.Entry.Name name) {
return expectedMessage -> {
if (expectedMessage == null) {
return null;
} else {
BsonDocument expectedData = expectedMessage.getDocument("data", null);
if (expectedData != null) {
expectedData.remove(name.getValue());
}
return expectedMessage;
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoNamespace;
import com.mongodb.ReadPreference;
import com.mongodb.UnixServerAddress;
import com.mongodb.internal.logging.LogMessage;
import com.mongodb.logging.TestLoggingInterceptor;
import com.mongodb.WriteConcern;
import com.mongodb.client.ClientSession;
Expand Down Expand Up @@ -72,6 +74,7 @@
import static com.mongodb.client.Fixture.getMongoClient;
import static com.mongodb.client.Fixture.getMongoClientSettings;
import static com.mongodb.client.unified.RunOnRequirementsMatcher.runOnRequirementsMet;
import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -242,7 +245,14 @@ public void shouldPassAllOutcomes() {
}

if (definition.containsKey("expectLogMessages")) {
compareLogMessages(rootContext, definition);
ArrayList<LogMatcher.Tweak> tweaks = new ArrayList<>(singletonList(
// `LogMessage.Entry.Name.OPERATION` is not supported, therefore we skip matching its value
LogMatcher.Tweak.skip(LogMessage.Entry.Name.OPERATION)));
if (getMongoClientSettings().getClusterSettings()
.getHosts().stream().anyMatch(serverAddress -> serverAddress instanceof UnixServerAddress)) {
tweaks.add(LogMatcher.Tweak.skip(LogMessage.Entry.Name.SERVER_PORT));
}
Comment on lines +251 to +254
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compareLogMessages(rootContext, definition, tweaks);
}
}

Expand All @@ -266,14 +276,15 @@ private void compareEvents(final UnifiedTestContext context, final BsonDocument
}
}

private void compareLogMessages(final UnifiedTestContext rootContext, final BsonDocument definition) {
private void compareLogMessages(final UnifiedTestContext rootContext, final BsonDocument definition,
final Iterable<LogMatcher.Tweak> tweaks) {
for (BsonValue cur : definition.getArray("expectLogMessages")) {
BsonDocument curLogMessagesForClient = cur.asDocument();
String clientId = curLogMessagesForClient.getString("client").getValue();
TestLoggingInterceptor loggingInterceptor =
entities.getClientLoggingInterceptor(clientId);
rootContext.getLogMatcher().assertLogMessageEquality(clientId, curLogMessagesForClient.getArray("messages"),
loggingInterceptor.getMessages());
loggingInterceptor.getMessages(), tweaks);
}
}

Expand Down