Skip to content

Commit 69c14d2

Browse files
committed
Document locator connection count setting
1 parent e07cd04 commit 69c14d2

File tree

6 files changed

+51
-12
lines changed

6 files changed

+51
-12
lines changed

src/docs/asciidoc/api.adoc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,10 @@ Used as a prefix for connection names.
234234
|Contract to change resolved node address to connect to.
235235
|Pass-through (no-op)
236236

237+
|`locatorConnectionCount`
238+
|Number of locator connections to maintain (for metadata search)
239+
|The smaller of the number of URIs and 3.
240+
237241
|`tls`
238242
|Configuration helper for TLS.
239243
|TLS is enabled if a `rabbitmq-stream+tls` URI is provided.
@@ -293,8 +297,12 @@ include::{test-examples}/EnvironmentUsage.java[tag=address-resolver]
293297
<1> Set the load balancer address
294298
<2> Use load balancer address for initial connection
295299
<3> Ignore metadata hints, always use load balancer
300+
<4> Set the number of locator connections to maintain
296301

297-
The blog post covers the https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams/#client-workaround-with-a-load-balancer[underlying details of this workaround].
302+
Note the example above sets the number of locator connections the environment maintains.
303+
Locator connections are used to perform infrastructure-related operations (e.g. looking up the topology of a stream to find an appropriate node to connect to).
304+
The environment uses the number of passed-in URIs to choose an appropriate default number and will pick 1 in this case, which may be too low for a cluster deployment.
305+
This is why it is recommended to set the value explicitly, 3 being a good default.
298306

299307
==== Managing Streams
300308

src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import com.rabbitmq.stream.compression.Compression;
1818
import com.rabbitmq.stream.compression.CompressionCodecFactory;
19+
import com.rabbitmq.stream.impl.StreamEnvironmentBuilder;
1920
import com.rabbitmq.stream.metrics.MetricsCollector;
2021
import com.rabbitmq.stream.sasl.CredentialsProvider;
2122
import com.rabbitmq.stream.sasl.SaslConfiguration;
@@ -62,14 +63,15 @@ public interface EnvironmentBuilder {
6263
* An {@link AddressResolver} to potentially change resolved node address to connect to.
6364
*
6465
* <p>Applications can use this abstraction to make sure connection attempts ignore metadata hints
65-
* and always go to a single point like a load balancer.
66+
* and always go to a single point like a load balancer. Consider setting {@link
67+
* #locatorConnectionCount(int)} when using a load balancer.
6668
*
6769
* <p>The default implementation does not perform any logic, it just returns the passed-in
6870
* address.
6971
*
7072
* <p><i>The default implementation is overridden automatically if the following conditions are
7173
* met: the host to connect to is <code>localhost</code>, the user is <code>guest</code>, and no
72-
* address resolver has been provided. The client will then always tries to connect to <code>
74+
* address resolver has been provided. The client will then always try to connect to <code>
7375
* localhost</code> to facilitate local development. Just provide a pass-through address resolver
7476
* to avoid this behavior, e.g.:</i>
7577
*
@@ -79,10 +81,11 @@ public interface EnvironmentBuilder {
7981
* .build();
8082
* </pre>
8183
*
82-
* @param addressResolver
84+
* @param addressResolver the address resolver
8385
* @return this builder instance
8486
* @see <a href="https://blog.rabbitmq.com/posts/2021/07/connecting-to-streams/">"Connecting to
8587
* Streams" blog post</a>
88+
* @see #locatorConnectionCount(int)
8689
*/
8790
EnvironmentBuilder addressResolver(AddressResolver addressResolver);
8891

@@ -395,6 +398,27 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
395398
*/
396399
EnvironmentBuilder forceLeaderForProducers(boolean forceLeader);
397400

401+
/**
402+
* Set the expected number of "locator" connections to maintain.
403+
*
404+
* <p>Locator connections are used to perform infrastructure-related operations (e.g. looking up
405+
* the topology of a stream to find an appropriate node to connect to).
406+
*
407+
* <p>It is recommended to maintain 2 to 3 locator connections. The environment uses the smaller
408+
* of the number of passed-in URIs and 3 by default (see {@link #uris(List)}).
409+
*
410+
* <p>The number of locator connections should be explicitly set when a load balancer is used, as
411+
* the environment cannot know the number of cluster nodes in this case (the only URI set is the
412+
* one of the load balancer).
413+
*
414+
* @param locatorConnectionCount number of expected locator connections
415+
* @return this builder instance
416+
* @see #uris(List)
417+
* @see #addressResolver(AddressResolver)
418+
* @since 0.21.0
419+
*/
420+
StreamEnvironmentBuilder locatorConnectionCount(int locatorConnectionCount);
421+
398422
/**
399423
* Create the {@link Environment} instance.
400424
*

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,12 @@ class StreamEnvironment implements Environment {
182182

183183
this.addressResolver = addressResolverToUse;
184184

185-
int locatorCount = Math.max(this.addresses.size(), expectedLocatorCount);
185+
int locatorCount;
186+
if (expectedLocatorCount > 0) {
187+
locatorCount = expectedLocatorCount;
188+
} else {
189+
locatorCount = Math.min(this.addresses.size(), 3);
190+
}
186191
LOGGER.debug("Using {} locator connection(s)", locatorCount);
187192

188193
List<Locator> lctrs =

src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
7070
private ObservationCollector<?> observationCollector = ObservationCollector.NO_OP;
7171
private Duration producerNodeRetryDelay = Duration.ofMillis(500);
7272
private Duration consumerNodeRetryDelay = Duration.ofMillis(1000);
73-
private int locatorCount = 1;
73+
private int locatorConnectionCount = -1;
7474

7575
public StreamEnvironmentBuilder() {}
7676

@@ -316,8 +316,9 @@ StreamEnvironmentBuilder consumerNodeRetryDelay(Duration consumerNodeRetryDelay)
316316
return this;
317317
}
318318

319-
StreamEnvironmentBuilder locatorCount(int locatorCount) {
320-
this.locatorCount = locatorCount;
319+
@Override
320+
public StreamEnvironmentBuilder locatorConnectionCount(int locatorCount) {
321+
this.locatorConnectionCount = locatorCount;
321322
return this;
322323
}
323324

@@ -356,7 +357,7 @@ public Environment build() {
356357
this.forceLeaderForProducers,
357358
this.producerNodeRetryDelay,
358359
this.consumerNodeRetryDelay,
359-
this.locatorCount);
360+
this.locatorConnectionCount);
360361
}
361362

362363
static final class DefaultTlsConfiguration implements TlsConfiguration {

src/test/java/com/rabbitmq/stream/docs/EnvironmentUsage.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ void addressResolver() throws Exception {
9797
.host(entryPoint.host()) // <2>
9898
.port(entryPoint.port()) // <2>
9999
.addressResolver(address -> entryPoint) // <3>
100+
.locatorConnectionCount(3) // <4>
100101
.build();
101102
// end::address-resolver[]
102103
}

src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,10 +112,10 @@ void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws Interru
112112
environmentBuilder
113113
.host(LOAD_BALANCER_ADDRESS.host())
114114
.port(LOAD_BALANCER_ADDRESS.port())
115-
.addressResolver(addr -> LOAD_BALANCER_ADDRESS);
115+
.addressResolver(addr -> LOAD_BALANCER_ADDRESS)
116+
.forceLeaderForProducers(forceLeader)
117+
.locatorConnectionCount(URIS.size());
116118
Duration nodeRetryDelay = Duration.ofMillis(100);
117-
environmentBuilder.forceLeaderForProducers(forceLeader);
118-
((StreamEnvironmentBuilder) environmentBuilder).locatorCount(URIS.size());
119119
// to make the test faster
120120
((StreamEnvironmentBuilder) environmentBuilder).producerNodeRetryDelay(nodeRetryDelay);
121121
((StreamEnvironmentBuilder) environmentBuilder).consumerNodeRetryDelay(nodeRetryDelay);

0 commit comments

Comments
 (0)