Skip to content

Commit 86047d9

Browse files
committed
Start affinity support
1 parent 40243e0 commit 86047d9

14 files changed

+659
-57
lines changed

src/main/java/com/rabbitmq/client/amqp/ConnectionSettings.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public interface ConnectionSettings<T> {
4949

5050
TlsSettings<? extends T> tls();
5151

52+
Affinity<? extends T> affinity();
53+
5254
interface TlsSettings<T> {
5355

5456
TlsSettings<T> hostnameVerification();
@@ -61,4 +63,20 @@ interface TlsSettings<T> {
6163

6264
T connection();
6365
}
66+
67+
interface Affinity<T> {
68+
69+
Affinity<T> queue(String queue);
70+
71+
Affinity<T> operation(Operation operation);
72+
73+
Affinity<T> reuse(boolean reuse);
74+
75+
T connection();
76+
77+
enum Operation {
78+
PUBLISH,
79+
CONSUME
80+
}
81+
}
6482
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 134 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,16 @@
2424
import com.rabbitmq.client.amqp.impl.Utils.StopWatch;
2525
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
2626
import java.time.Duration;
27-
import java.util.ArrayList;
28-
import java.util.LinkedHashMap;
29-
import java.util.List;
30-
import java.util.Map;
27+
import java.util.*;
3128
import java.util.concurrent.*;
3229
import java.util.concurrent.atomic.AtomicBoolean;
3330
import java.util.concurrent.atomic.AtomicLong;
3431
import java.util.concurrent.atomic.AtomicReference;
3532
import java.util.function.BiConsumer;
33+
import java.util.function.Function;
3634
import java.util.function.Predicate;
3735
import java.util.function.Supplier;
36+
import java.util.stream.Collectors;
3837
import org.apache.qpid.protonj2.client.ConnectionOptions;
3938
import org.apache.qpid.protonj2.client.DisconnectionEvent;
4039
import org.apache.qpid.protonj2.client.Session;
@@ -57,6 +56,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
5756
private final AmqpManagement management;
5857
private volatile org.apache.qpid.protonj2.client.Connection nativeConnection;
5958
private volatile Address connectionAddress;
59+
private volatile String connectionNodename;
6060
private final AtomicBoolean closed = new AtomicBoolean(false);
6161
private volatile Session nativeSession;
6262
private final List<AmqpPublisher> publishers = new CopyOnWriteArrayList<>();
@@ -68,16 +68,15 @@ final class AmqpConnection extends ResourceBase implements Connection {
6868
private final Future<?> recoveryLoop;
6969
private final BlockingQueue<Runnable> recoveryRequestQueue;
7070
private final AtomicBoolean recoveringConnection = new AtomicBoolean(false);
71-
private final DefaultConnectionSettings<?> connectionSettings =
72-
DefaultConnectionSettings.instance();
73-
private Supplier<SessionHandler> sessionHandlerSupplier;
71+
private final DefaultConnectionSettings<?> connectionSettings;
72+
private final Supplier<SessionHandler> sessionHandlerSupplier;
73+
private final ConnectionUtils.ConnectionAffinity affinity;
7474

7575
AmqpConnection(AmqpConnectionBuilder builder) {
7676
super(builder.listeners());
7777
this.id = ID_SEQUENCE.getAndIncrement();
7878
this.environment = builder.environment();
79-
builder.connectionSettings().copyTo(this.connectionSettings);
80-
this.connectionSettings.consolidate();
79+
this.connectionSettings = builder.connectionSettings().consolidate();
8180
this.sessionHandlerSupplier =
8281
builder.isolateResources()
8382
? () -> new SessionHandler.SingleSessionSessionHandler(this)
@@ -116,8 +115,26 @@ final class AmqpConnection extends ResourceBase implements Connection {
116115
this.recoveryRequestQueue = null;
117116
this.recoveryLoop = null;
118117
}
119-
this.nativeConnection = connect(this.connectionSettings, builder.name(), disconnectHandler);
118+
this.affinity =
119+
this.connectionSettings.affinity().activated()
120+
? new ConnectionUtils.ConnectionAffinity(
121+
this.connectionSettings.affinity().queue(),
122+
this.connectionSettings.affinity().operation())
123+
: null;
124+
NativeConnectionWrapper ncw =
125+
connect(this.connectionSettings, builder.name(), disconnectHandler, null);
126+
this.nativeConnection = ncw.connection;
120127
this.management = createManagement();
128+
ncw =
129+
enforceAffinity(
130+
ncw,
131+
addrs -> connect(this.connectionSettings, builder.name(), disconnectHandler, addrs),
132+
this.management,
133+
this.affinity,
134+
this.environment.affinityCache());
135+
this.connectionAddress = ncw.address;
136+
this.connectionNodename = ncw.nodename;
137+
this.nativeConnection = ncw.connection;
121138
this.state(OPEN);
122139
this.environment.metricsCollector().openConnection();
123140
}
@@ -133,7 +150,7 @@ Management managementNoCheck() {
133150
return this.management;
134151
}
135152

136-
protected AmqpManagement createManagement() {
153+
AmqpManagement createManagement() {
137154
return new AmqpManagement(
138155
new AmqpManagementParameters(this).topologyListener(this.topologyListener));
139156
}
@@ -167,11 +184,11 @@ public void close() {
167184

168185
// internal API
169186

170-
private org.apache.qpid.protonj2.client.Connection connect(
187+
private NativeConnectionWrapper connect(
171188
DefaultConnectionSettings<?> connectionSettings,
172189
String name,
173-
BiConsumer<org.apache.qpid.protonj2.client.Connection, DisconnectionEvent>
174-
disconnectHandler) {
190+
BiConsumer<org.apache.qpid.protonj2.client.Connection, DisconnectionEvent> disconnectHandler,
191+
List<Address> addresses) {
175192

176193
ConnectionOptions connectionOptions = new ConnectionOptions();
177194
if (connectionSettings.credentialsProvider() instanceof UsernamePasswordCredentialsProvider) {
@@ -200,26 +217,69 @@ private org.apache.qpid.protonj2.client.Connection connect(
200217
sslOptions.sslContextOverride(tlsSettings.sslContext());
201218
sslOptions.verifyHost(tlsSettings.isHostnameVerification());
202219
}
203-
this.connectionAddress = connectionSettings.selectAddress();
220+
Address address = connectionSettings.selectAddress(addresses);
204221
StopWatch stopWatch = new StopWatch();
205222
try {
206223
LOGGER.debug("Connecting...");
207224
org.apache.qpid.protonj2.client.Connection connection =
208-
this.environment
209-
.client()
210-
.connect(
211-
this.connectionAddress.host(), this.connectionAddress.port(), connectionOptions);
225+
this.environment.client().connect(address.host(), address.port(), connectionOptions);
212226
ExceptionUtils.wrapGet(connection.openFuture());
213227
LOGGER.debug("Connection attempt succeeded");
214228
checkBrokerVersion(connection);
215-
return connection;
229+
return new NativeConnectionWrapper(connection, extractNode(connection), address);
216230
} catch (ClientException e) {
217231
throw ExceptionUtils.convert(e);
218232
} finally {
219233
LOGGER.debug("Connection attempt took {}", stopWatch.stop());
220234
}
221235
}
222236

237+
private static NativeConnectionWrapper enforceAffinity(
238+
NativeConnectionWrapper connectionWrapper,
239+
Function<List<Address>, NativeConnectionWrapper> connectionFactory,
240+
AmqpManagement management,
241+
ConnectionUtils.ConnectionAffinity affinity,
242+
ConnectionUtils.AffinityCache affinityCache) {
243+
if (connectionWrapper.nodename == null || affinity == null) {
244+
return connectionWrapper;
245+
} else {
246+
affinityCache.put(connectionWrapper.nodename, connectionWrapper.address);
247+
try {
248+
management.init();
249+
Management.QueueInfo info = management.queueInfo(affinity.queue());
250+
NativeConnectionWrapper pickedConnection = null;
251+
int attemptCount = 0;
252+
while (pickedConnection == null && ++attemptCount <= 5) {
253+
List<String> nodesWithAffinity = ConnectionUtils.findAffinity(affinity, info);
254+
LOGGER.debug("Currently connected to node {}", connectionWrapper.nodename);
255+
if (nodesWithAffinity.contains(connectionWrapper.nodename)) {
256+
LOGGER.debug("Affinity {} found with node {}", affinity, connectionWrapper.nodename);
257+
pickedConnection = connectionWrapper;
258+
} else {
259+
LOGGER.debug(
260+
"Affinity {} not found with node {}", affinity, connectionWrapper.nodename);
261+
connectionWrapper.connection.close();
262+
management.releaseResources();
263+
List<Address> addressHints =
264+
nodesWithAffinity.stream()
265+
.map(affinityCache::get)
266+
.filter(Objects::nonNull)
267+
.collect(Collectors.toList());
268+
connectionWrapper = connectionFactory.apply(addressHints);
269+
affinityCache.put(connectionWrapper.nodename, connectionWrapper.address);
270+
}
271+
}
272+
return pickedConnection;
273+
} catch (Exception e) {
274+
LOGGER.warn(
275+
"Cannot enforce affinity because of error when looking up queue '{}': {}",
276+
affinity.queue(),
277+
e.getMessage());
278+
return connectionWrapper;
279+
}
280+
}
281+
}
282+
223283
private static void checkBrokerVersion(org.apache.qpid.protonj2.client.Connection connection)
224284
throws ClientException {
225285
String version = (String) connection.properties().get("version");
@@ -231,6 +291,15 @@ private static void checkBrokerVersion(org.apache.qpid.protonj2.client.Connectio
231291
}
232292
}
233293

294+
private static String extractNode(org.apache.qpid.protonj2.client.Connection connection)
295+
throws ClientException {
296+
String node = (String) connection.properties().get("node");
297+
if (node == null) {
298+
throw new AmqpException("The broker node name is not available");
299+
}
300+
return node;
301+
}
302+
234303
TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
235304
TopologyListener topologyListener;
236305
if (builder.recoveryConfiguration().topology()) {
@@ -354,15 +423,12 @@ private org.apache.qpid.protonj2.client.Connection recoverNativeConnection(
354423
}
355424

356425
try {
357-
org.apache.qpid.protonj2.client.Connection result =
358-
connect(this.connectionSettings, connectionName, disconnectedHandlerReference.get());
359-
result.openFuture().get();
426+
NativeConnectionWrapper result =
427+
connect(
428+
this.connectionSettings, connectionName, disconnectedHandlerReference.get(), null);
429+
this.connectionAddress = result.address;
360430
LOGGER.debug("Reconnected to {}", this.currentConnectionLabel());
361-
return result;
362-
} catch (InterruptedException ex) {
363-
Thread.currentThread().interrupt();
364-
LOGGER.info("Thread interrupted while waiting for connection opening");
365-
throw ex;
431+
return result.connection;
366432
} catch (Exception ex) {
367433
LOGGER.info("Error while trying to recover connection", ex);
368434
if (!RECOVERY_PREDICATE.test(ex)) {
@@ -573,9 +639,22 @@ Address connectionAddress() {
573639
return this.connectionAddress;
574640
}
575641

642+
String connectionNodename() {
643+
return this.connectionNodename;
644+
}
645+
646+
ConnectionUtils.ConnectionAffinity affinity() {
647+
return this.affinity;
648+
}
649+
650+
long id() {
651+
return this.id;
652+
}
653+
576654
private void close(Throwable cause) {
577655
if (this.closed.compareAndSet(false, true)) {
578656
this.state(CLOSING, cause);
657+
this.environment.removeConnection(this);
579658
if (this.recoveryLoop != null) {
580659
this.recoveryLoop.cancel(true);
581660
}
@@ -613,4 +692,31 @@ private void close(Throwable cause) {
613692
public String toString() {
614693
return this.environment.toString() + "-" + this.id;
615694
}
695+
696+
private static class NativeConnectionWrapper {
697+
698+
private final org.apache.qpid.protonj2.client.Connection connection;
699+
private final String nodename;
700+
private final Address address;
701+
702+
private NativeConnectionWrapper(
703+
org.apache.qpid.protonj2.client.Connection connection, String nodename, Address address) {
704+
this.connection = connection;
705+
this.nodename = nodename;
706+
this.address = address;
707+
}
708+
}
709+
710+
@Override
711+
public boolean equals(Object o) {
712+
if (this == o) return true;
713+
if (o == null || getClass() != o.getClass()) return false;
714+
AmqpConnection that = (AmqpConnection) o;
715+
return id == that.id;
716+
}
717+
718+
@Override
719+
public int hashCode() {
720+
return Objects.hashCode(id);
721+
}
616722
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnectionBuilder.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,11 @@ public TlsSettings<? extends ConnectionBuilder> tls() {
9999
return this.connectionSettings.tls();
100100
}
101101

102+
@Override
103+
public DefaultConnectionSettings.DefaultAffinity<? extends ConnectionBuilder> affinity() {
104+
return this.connectionSettings.affinity();
105+
}
106+
102107
@Override
103108
public ConnectionBuilder listeners(Resource.StateListener... listeners) {
104109
if (listeners == null || listeners.length == 0) {
@@ -126,10 +131,16 @@ boolean isolateResources() {
126131

127132
@Override
128133
public Connection build() {
129-
// TODO copy the recovery configuration to keep the settings
130-
AmqpConnection connection = new AmqpConnection(this);
131-
this.environment.addConnection(connection);
132-
return connection;
134+
return this.environment.connection(this);
135+
}
136+
137+
void copyTo(AmqpConnectionBuilder copy) {
138+
this.connectionSettings.copyTo(copy.connectionSettings);
139+
this.recoveryConfiguration.copyTo(copy.recoveryConfiguration);
140+
copy.listeners(this.listeners.toArray(new Resource.StateListener[0]));
141+
copy.name(this.name);
142+
copy.topologyListener(this.topologyListener);
143+
copy.isolateResources(this.isolateResources);
133144
}
134145

135146
AmqpConnectionBuilder name(String name) {
@@ -211,6 +222,12 @@ boolean topology() {
211222
BackOffDelayPolicy backOffDelayPolicy() {
212223
return this.backOffDelayPolicy;
213224
}
225+
226+
void copyTo(RecoveryConfiguration copy) {
227+
copy.activated(this.activated);
228+
copy.topology(this.topology);
229+
copy.backOffDelayPolicy(this.backOffDelayPolicy);
230+
}
214231
}
215232

216233
static class AmqpConnectionBuilderConnectionSettings

0 commit comments

Comments
 (0)