Skip to content

Add Client Metadata Update Support. #1708

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 27 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions buildSrc/src/main/kotlin/conventions/testing-base.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -105,3 +105,7 @@ testlogger {
showFailedStandardStreams = true
logLevel = LogLevel.LIFECYCLE
}

dependencies {
testImplementation(libs.assertj)
}
Comment on lines +109 to +111
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AssertJ has been used in this PR and added to test-base as it is a useful library that could be shared across all modules.

Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ private ServerTuple(final ClusterableServer server, final ServerDescription desc
}
}

AbstractMultiServerCluster(final ClusterId clusterId, final ClusterSettings settings, final ClusterableServerFactory serverFactory) {
super(clusterId, settings, serverFactory);
AbstractMultiServerCluster(final ClusterId clusterId,
final ClusterSettings settings,
final ClusterableServerFactory serverFactory,
final ClientMetadata clientMetadata) {
super(clusterId, settings, serverFactory, clientMetadata);
isTrue("connection mode is multiple", settings.getMode() == MULTIPLE);
clusterType = settings.getRequiredClusterType();
replicaSetName = settings.getRequiredReplicaSetName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrue;
Expand Down Expand Up @@ -101,26 +101,36 @@ abstract class BaseCluster implements Cluster {
private final ClusterListener clusterListener;
private final Deque<ServerSelectionRequest> waitQueue = new ConcurrentLinkedDeque<>();
private final ClusterClock clusterClock = new ClusterClock();
private final ClientMetadata clientMetadata;
private Thread waitQueueHandler;

private volatile boolean isClosed;
private volatile ClusterDescription description;

BaseCluster(final ClusterId clusterId, final ClusterSettings settings, final ClusterableServerFactory serverFactory) {
BaseCluster(final ClusterId clusterId,
final ClusterSettings settings,
final ClusterableServerFactory serverFactory,
final ClientMetadata clientMetadata) {
this.clusterId = notNull("clusterId", clusterId);
this.settings = notNull("settings", settings);
this.serverFactory = notNull("serverFactory", serverFactory);
this.clusterListener = singleClusterListener(settings);
clusterListener.clusterOpening(new ClusterOpeningEvent(clusterId));
description = new ClusterDescription(settings.getMode(), ClusterType.UNKNOWN, Collections.emptyList(),
this.clusterListener.clusterOpening(new ClusterOpeningEvent(clusterId));
this.description = new ClusterDescription(settings.getMode(), ClusterType.UNKNOWN, Collections.emptyList(),
settings, serverFactory.getSettings());
this.clientMetadata = clientMetadata;
}

@Override
public ClusterClock getClock() {
return clusterClock;
}

@Override
public ClientMetadata getClientMetadata() {
return clientMetadata;
}

@Override
public ServerTuple selectServer(final ServerSelector serverSelector, final OperationContext operationContext) {
isTrue("open", !isClosed());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.mongodb.internal.connection;

import com.mongodb.MongoDriverInformation;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;

import java.util.concurrent.locks.ReentrantReadWriteLock;

import static com.mongodb.internal.Locks.withLock;
import static com.mongodb.internal.connection.ClientMetadataHelper.createClientMetadataDocument;
import static com.mongodb.internal.connection.ClientMetadataHelper.updateClientMetadataDocument;

/**
* Represents metadata of the current MongoClient.
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public class ClientMetadata {
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private BsonDocument clientMetadataBsonDocument;

public ClientMetadata(@Nullable final String applicationName, final MongoDriverInformation mongoDriverInformation) {
withLock(readWriteLock.writeLock(), () -> {
this.clientMetadataBsonDocument = createClientMetadataDocument(applicationName, mongoDriverInformation);
});
}

/**
* Returns mutable BsonDocument that represents the client metadata.
*/
public BsonDocument getBsonDocument() {
return withLock(readWriteLock.readLock(), () -> clientMetadataBsonDocument);
}

public void append(final MongoDriverInformation mongoDriverInformation) {
withLock(readWriteLock.writeLock(), () ->
this.clientMetadataBsonDocument = updateClientMetadataDocument(clientMetadataBsonDocument.clone(), mongoDriverInformation)
);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

Expand Down Expand Up @@ -180,6 +181,44 @@ static boolean clientMetadataDocumentTooLarge(final BsonDocument document) {
return buffer.getPosition() > MAXIMUM_CLIENT_METADATA_ENCODED_SIZE;
}

/**
* Modifies the given client metadata document by appending the driver information.
* Driver name and version are appended atomically to the existing driver name and version if they do not exceed
* {@value MAXIMUM_CLIENT_METADATA_ENCODED_SIZE} bytes.
* <p>
* Platform is appended separately to the existing platform if it does not exceed {@value MAXIMUM_CLIENT_METADATA_ENCODED_SIZE} bytes.
*/
public static BsonDocument updateClientMetadataDocument(final BsonDocument clientMetadataDocument,
final MongoDriverInformation driverInformationToAppend) {
BsonDocument currentDriverInformation = clientMetadataDocument.getDocument("driver");

List<String> driverNamesToAppend = driverInformationToAppend.getDriverNames();
List<String> driverVersionsToAppend = driverInformationToAppend.getDriverVersions();
List<String> driverPlatformsToAppend = driverInformationToAppend.getDriverPlatforms();

List<String> updatedDriverNames = new ArrayList<>(driverNamesToAppend.size() + 1);
List<String> updatedDriverVersions = new ArrayList<>(driverVersionsToAppend.size() + 1);
List<String> updateDriverPlatforms = new ArrayList<>(driverPlatformsToAppend.size() + 1);

updatedDriverNames.add(currentDriverInformation.getString("name").getValue());
updatedDriverNames.addAll(driverNamesToAppend);

updatedDriverVersions.add(currentDriverInformation.getString("version").getValue());
updatedDriverVersions.addAll(driverVersionsToAppend);

updateDriverPlatforms.add(clientMetadataDocument.getString("platform").getValue());
updateDriverPlatforms.addAll(driverPlatformsToAppend);

tryWithLimit(clientMetadataDocument, d -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should tryWithLimit return a boolean indicating whether the document was added (i.e enough space) or not. The boolean will avoid subsequent call to tryWithLimit if it previously failed due to reached capacity

Copy link
Member Author

@vbabanin vbabanin May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since fields have variable length, even if the driverName doesn’t fit, the platform field might still fit, so we need to attempt adding each field individually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point 👍

putAtPath(d, "driver.name", listToString(updatedDriverNames));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tryWithLimit invokes the Consumer<BsonDocument> lambda twice, we could build the appended string outside the tryWithLimit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point. In this case, I’d choose to follow the existing pattern in ClientMetadataHelper to keep the code consistent and neat, as this pattern exists elsewhere in the class. Since updateMetadata is expected to be called most likely once per application lifecycle and isn’t on a hot path or GC-critical, I’d prefer to keep it as is for now. Let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for consistency

putAtPath(d, "driver.version", listToString(updatedDriverVersions));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we try in a dedicated tryWithLimit?

Copy link
Member Author

@vbabanin vbabanin May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the spec, driver name and version must be updated atomically. If both can't fit, then both must be omitted - so splitting them in a tryWithLimit wouldn't comply with that requirement. We follow the same approach during initial metadata creation:

tryWithLimit(client, d -> {
putAtPath(d, "driver.name", listToString(fullDriverInfo.getDriverNames()));
putAtPath(d, "driver.version", listToString(fullDriverInfo.getDriverVersions()));
});

});
tryWithLimit(clientMetadataDocument, d -> {
putAtPath(d, "platform", listToString(updateDriverPlatforms));
});
return clientMetadataDocument;
}

public enum ContainerRuntime {
DOCKER("docker") {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public interface Cluster extends Closeable {
*/
ClusterClock getClock();

ClientMetadata getClientMetadata();

ServerTuple selectServer(ServerSelector serverSelector, OperationContext operationContext);

void selectServerAsync(ServerSelector serverSelector, OperationContext operationContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,27 +107,29 @@ public Cluster createCluster(final ClusterSettings originalClusterSettings, fina
InternalOperationContextFactory heartBeatOperationContextFactory =
new InternalOperationContextFactory(heartbeatTimeoutSettings, serverApi);

ClientMetadata clientMetadata = new ClientMetadata(
applicationName,
mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build());

if (clusterSettings.getMode() == ClusterConnectionMode.LOAD_BALANCED) {
ClusterableServerFactory serverFactory = new LoadBalancedClusterableServerFactory(serverSettings,
connectionPoolSettings, internalConnectionPoolSettings, streamFactory, credential, loggerSettings, commandListener,
applicationName, mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build(),
compressorList, serverApi, clusterOperationContextFactory);
return new LoadBalancedCluster(clusterId, clusterSettings, serverFactory, dnsSrvRecordMonitorFactory);
return new LoadBalancedCluster(clusterId, clusterSettings, serverFactory, clientMetadata, dnsSrvRecordMonitorFactory);
} else {
ClusterableServerFactory serverFactory = new DefaultClusterableServerFactory(serverSettings,
connectionPoolSettings, internalConnectionPoolSettings,
clusterOperationContextFactory, streamFactory, heartBeatOperationContextFactory, heartbeatStreamFactory, credential,
loggerSettings, commandListener, applicationName,
mongoDriverInformation != null ? mongoDriverInformation : MongoDriverInformation.builder().build(), compressorList,
loggerSettings, commandListener, compressorList,
serverApi, FaasEnvironment.getFaasEnvironment() != FaasEnvironment.UNKNOWN);

if (clusterSettings.getMode() == ClusterConnectionMode.SINGLE) {
return new SingleServerCluster(clusterId, clusterSettings, serverFactory);
return new SingleServerCluster(clusterId, clusterSettings, serverFactory, clientMetadata);
} else if (clusterSettings.getMode() == ClusterConnectionMode.MULTIPLE) {
if (clusterSettings.getSrvHost() == null) {
return new MultiServerCluster(clusterId, clusterSettings, serverFactory);
return new MultiServerCluster(clusterId, clusterSettings, serverFactory, clientMetadata);
} else {
return new DnsMultiServerCluster(clusterId, clusterSettings, serverFactory, dnsSrvRecordMonitorFactory);
return new DnsMultiServerCluster(clusterId, clusterSettings, serverFactory, clientMetadata, dnsSrvRecordMonitorFactory);
}
} else {
throw new UnsupportedOperationException("Unsupported cluster mode: " + clusterSettings.getMode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.mongodb.LoggerSettings;
import com.mongodb.MongoCompressor;
import com.mongodb.MongoCredential;
import com.mongodb.MongoDriverInformation;
import com.mongodb.ServerAddress;
import com.mongodb.ServerApi;
import com.mongodb.connection.ClusterConnectionMode;
Expand Down Expand Up @@ -50,8 +49,6 @@ public class DefaultClusterableServerFactory implements ClusterableServerFactory
private final MongoCredentialWithCache credential;
private final LoggerSettings loggerSettings;
private final CommandListener commandListener;
private final String applicationName;
private final MongoDriverInformation mongoDriverInformation;
private final List<MongoCompressor> compressorList;
@Nullable
private final ServerApi serverApi;
Expand All @@ -63,8 +60,7 @@ public DefaultClusterableServerFactory(
final InternalOperationContextFactory clusterOperationContextFactory, final StreamFactory streamFactory,
final InternalOperationContextFactory heartbeatOperationContextFactory, final StreamFactory heartbeatStreamFactory,
@Nullable final MongoCredential credential, final LoggerSettings loggerSettings,
@Nullable final CommandListener commandListener, @Nullable final String applicationName,
@Nullable final MongoDriverInformation mongoDriverInformation,
@Nullable final CommandListener commandListener,
final List<MongoCompressor> compressorList, @Nullable final ServerApi serverApi, final boolean isFunctionAsAServiceEnvironment) {
this.serverSettings = serverSettings;
this.connectionPoolSettings = connectionPoolSettings;
Expand All @@ -76,8 +72,6 @@ public DefaultClusterableServerFactory(
this.credential = credential == null ? null : new MongoCredentialWithCache(credential);
this.loggerSettings = loggerSettings;
this.commandListener = commandListener;
this.applicationName = applicationName;
this.mongoDriverInformation = mongoDriverInformation;
this.compressorList = compressorList;
this.serverApi = serverApi;
this.isFunctionAsAServiceEnvironment = isFunctionAsAServiceEnvironment;
Expand All @@ -88,15 +82,17 @@ public ClusterableServer create(final Cluster cluster, final ServerAddress serve
ServerId serverId = new ServerId(cluster.getClusterId(), serverAddress);
ClusterConnectionMode clusterMode = cluster.getSettings().getMode();
SameObjectProvider<SdamServerDescriptionManager> sdamProvider = SameObjectProvider.uninitialized();
ClientMetadata clientMetadata = cluster.getClientMetadata();

ServerMonitor serverMonitor = new DefaultServerMonitor(serverId, serverSettings,
// no credentials, compressor list, or command listener for the server monitor factory
new InternalStreamConnectionFactory(clusterMode, true, heartbeatStreamFactory, null, applicationName,
mongoDriverInformation, emptyList(), loggerSettings, null, serverApi),
new InternalStreamConnectionFactory(clusterMode, true, heartbeatStreamFactory, null, clientMetadata,
emptyList(), loggerSettings, null, serverApi),
clusterMode, serverApi, isFunctionAsAServiceEnvironment, sdamProvider, heartbeatOperationContextFactory);

ConnectionPool connectionPool = new DefaultConnectionPool(serverId,
new InternalStreamConnectionFactory(clusterMode, streamFactory, credential, applicationName,
mongoDriverInformation, compressorList, loggerSettings, commandListener, serverApi),
new InternalStreamConnectionFactory(clusterMode, streamFactory, credential, clientMetadata,
compressorList, loggerSettings, commandListener, serverApi),
connectionPoolSettings, internalConnectionPoolSettings, sdamProvider, clusterOperationContextFactory);
ServerListener serverListener = singleServerListener(serverSettings);
SdamServerDescriptionManager sdam = new DefaultSdamServerDescriptionManager(cluster, serverId, serverListener, serverMonitor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ public final class DnsMultiServerCluster extends AbstractMultiServerCluster {
private final DnsSrvRecordMonitor dnsSrvRecordMonitor;
private volatile MongoException srvResolutionException;

public DnsMultiServerCluster(final ClusterId clusterId, final ClusterSettings settings, final ClusterableServerFactory serverFactory,
public DnsMultiServerCluster(final ClusterId clusterId, final ClusterSettings settings,
final ClusterableServerFactory serverFactory,
final ClientMetadata clientMetadata,
final DnsSrvRecordMonitorFactory dnsSrvRecordMonitorFactory) {
super(clusterId, settings, serverFactory);
super(clusterId, settings, serverFactory, clientMetadata);
dnsSrvRecordMonitor = dnsSrvRecordMonitorFactory.create(assertNotNull(settings.getSrvHost()), settings.getSrvServiceName(),
new DnsSrvRecordInitializer() {
private volatile boolean initialized;
Expand Down
Loading