Skip to content

Commit 7333fa2

Browse files
committed
Track RPC client and server instances in connection
1 parent 83aba18 commit 7333fa2

File tree

4 files changed

+78
-14
lines changed

4 files changed

+78
-14
lines changed

src/main/java/com/rabbitmq/model/amqp/AmqpConnection.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ class AmqpConnection extends ResourceBase implements Connection {
6767
private volatile Session nativeSession;
6868
private final List<AmqpPublisher> publishers = new CopyOnWriteArrayList<>();
6969
private final List<AmqpConsumer> consumers = new CopyOnWriteArrayList<>();
70+
private final List<RpcClient> rpcClients = new CopyOnWriteArrayList<>();
71+
private final List<RpcServer> rpcServers = new CopyOnWriteArrayList<>();
7072
private final TopologyListener topologyListener;
7173
private volatile EntityRecovery entityRecovery;
7274
private final Future<?> recoveryLoop;
@@ -177,13 +179,18 @@ public void close() {
177179
}
178180
}
179181
this.closeManagement();
182+
for (RpcClient rpcClient : this.rpcClients) {
183+
rpcClient.close();
184+
}
185+
for (RpcServer rpcServer : this.rpcServers) {
186+
rpcServer.close();
187+
}
180188
for (AmqpPublisher publisher : this.publishers) {
181189
publisher.close();
182190
}
183191
for (AmqpConsumer consumer : this.consumers) {
184192
consumer.close();
185193
}
186-
187194
try {
188195
this.nativeConnection.close();
189196
} catch (Exception e) {
@@ -517,6 +524,26 @@ void removeConsumer(AmqpConsumer consumer) {
517524
this.topologyListener.consumerDeleted(consumer.id(), consumer.address());
518525
}
519526

527+
RpcClient createRpcClient(RpcSupport.AmqpRpcClientBuilder builder) {
528+
RpcClient rpcClient = new AmqpRpcClient(builder);
529+
this.rpcClients.add(rpcClient);
530+
return rpcClient;
531+
}
532+
533+
void removeRpcClient(RpcClient rpcClient) {
534+
this.rpcClients.remove(rpcClient);
535+
}
536+
537+
RpcServer createRpcServer(RpcSupport.AmqpRpcServerBuilder builder) {
538+
RpcServer rpcServer = new AmqpRpcServer(builder);
539+
this.rpcServers.add(rpcServer);
540+
return rpcServer;
541+
}
542+
543+
void removeRpcServer(RpcServer rpcServer) {
544+
this.rpcServers.remove(rpcServer);
545+
}
546+
520547
private void changeStateOfPublishers(State newState, Throwable failure) {
521548
this.changeStateOfResources(this.publishers, newState, failure);
522549
}

src/main/java/com/rabbitmq/model/amqp/AmqpRpcClient.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,34 +22,42 @@
2222
import java.util.UUID;
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.atomic.AtomicBoolean;
2526
import java.util.concurrent.atomic.AtomicLong;
2627
import java.util.function.BiFunction;
2728
import java.util.function.Function;
2829
import java.util.function.Supplier;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
2932

3033
class AmqpRpcClient implements RpcClient {
3134

35+
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpRpcClient.class);
36+
3237
private static final Publisher.Callback NO_OP_CALLBACK = ctx -> {};
3338

39+
private final AmqpConnection connection;
3440
private final Publisher publisher;
3541
private final Consumer consumer;
3642
private final Map<Object, CompletableFuture<Message>> outstandingRequests =
3743
new ConcurrentHashMap<>();
3844
private final Supplier<Object> correlationIdSupplier;
3945
private final BiFunction<Message, Object, Message> requestPostProcessor;
4046
private final Function<Message, Object> correlationIdExtractor;
47+
private final AtomicBoolean closed = new AtomicBoolean(false);
4148

4249
AmqpRpcClient(RpcSupport.AmqpRpcClientBuilder builder) {
43-
AmqpConnection connection = builder.connection();
50+
this.connection = builder.connection();
4451

45-
AmqpPublisherBuilder publisherBuilder = (AmqpPublisherBuilder) connection.publisherBuilder();
52+
AmqpPublisherBuilder publisherBuilder =
53+
(AmqpPublisherBuilder) this.connection.publisherBuilder();
4654
((DefaultAddressBuilder<?>) builder.requestAddress()).copyTo(publisherBuilder.addressBuilder());
4755
this.publisher = publisherBuilder.build();
4856

4957
String replyTo = builder.replyToQueue();
5058
if (replyTo == null) {
5159
Management.QueueInfo queueInfo =
52-
connection.management().queue().exclusive(true).autoDelete(true).declare();
60+
this.connection.management().queue().exclusive(true).autoDelete(true).declare();
5361
replyTo = queueInfo.name();
5462
}
5563
if (builder.correlationIdExtractor() == null) {
@@ -58,7 +66,7 @@ class AmqpRpcClient implements RpcClient {
5866
this.correlationIdExtractor = builder.correlationIdExtractor();
5967
}
6068
this.consumer =
61-
connection
69+
this.connection
6270
.consumerBuilder()
6371
.queue(replyTo)
6472
.messageHandler(
@@ -114,7 +122,18 @@ public CompletableFuture<Message> publish(Message message) {
114122

115123
@Override
116124
public void close() {
117-
this.publisher.close();
118-
this.consumer.close();
125+
if (this.closed.compareAndSet(false, true)) {
126+
this.connection.removeRpcClient(this);
127+
try {
128+
this.publisher.close();
129+
} catch (Exception e) {
130+
LOGGER.warn("Error while closing RPC client publisher: {}", e.getMessage());
131+
}
132+
try {
133+
this.consumer.close();
134+
} catch (Exception e) {
135+
LOGGER.warn("Error while closing RPC client consumer: {}", e.getMessage());
136+
}
137+
}
119138
}
120139
}

src/main/java/com/rabbitmq/model/amqp/AmqpRpcServer.java

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,30 @@
2121
import com.rabbitmq.model.Message;
2222
import com.rabbitmq.model.Publisher;
2323
import com.rabbitmq.model.RpcServer;
24+
import java.util.concurrent.atomic.AtomicBoolean;
2425
import java.util.function.BiFunction;
2526
import java.util.function.Function;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2629

2730
class AmqpRpcServer implements RpcServer {
2831

32+
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpRpcServer.class);
33+
2934
private static final Publisher.Callback NO_OP_CALLBACK = ctx -> {};
3035

36+
private final AmqpConnection connection;
3137
private final Publisher publisher;
3238
private final Consumer consumer;
3339
private final Function<Message, Object> correlationIdExtractor;
3440
private final BiFunction<Message, Object, Message> replyPostProcessor;
41+
private final AtomicBoolean closed = new AtomicBoolean(false);
3542

3643
AmqpRpcServer(RpcSupport.AmqpRpcServerBuilder builder) {
37-
AmqpConnection connection = builder.connection();
44+
this.connection = builder.connection();
3845
Handler handler = builder.handler();
3946

40-
this.publisher = connection.publisherBuilder().build();
47+
this.publisher = this.connection.publisherBuilder().build();
4148

4249
Context context =
4350
new Context() {
@@ -62,7 +69,7 @@ public Message message(byte[] body) {
6269
this.replyPostProcessor = builder.replyPostProcessor();
6370
}
6471
this.consumer =
65-
connection
72+
this.connection
6673
.consumerBuilder()
6774
.queue(builder.requestQueue())
6875
.messageHandler(
@@ -81,7 +88,18 @@ public Message message(byte[] body) {
8188

8289
@Override
8390
public void close() {
84-
this.consumer.close();
85-
this.publisher.close();
91+
if (this.closed.compareAndSet(false, true)) {
92+
this.connection.removeRpcServer(this);
93+
try {
94+
this.consumer.close();
95+
} catch (Exception e) {
96+
LOGGER.warn("Error while closing RPC server consumer: {}", e.getMessage());
97+
}
98+
try {
99+
this.publisher.close();
100+
} catch (Exception e) {
101+
LOGGER.warn("Error while closing RPC server publisher: {}", e.getMessage());
102+
}
103+
}
86104
}
87105
}

src/main/java/com/rabbitmq/model/amqp/RpcSupport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ Function<Message, Object> correlationIdExtractor() {
7878

7979
@Override
8080
public RpcClient build() {
81-
return new AmqpRpcClient(this);
81+
return this.connection.createRpcClient(this);
8282
}
8383

8484
AmqpConnection connection() {
@@ -168,7 +168,7 @@ public RpcServerBuilder replyPostProcessor(
168168

169169
@Override
170170
public RpcServer build() {
171-
return new AmqpRpcServer(this);
171+
return this.connection.createRpcServer(this);
172172
}
173173

174174
AmqpConnection connection() {

0 commit comments

Comments
 (0)