Skip to content

Commit ea8ff52

Browse files
committed
Introduce abstraction to get session
Always return the connection session for now. Can be leveraged to create a session for each publisher and consumer.
1 parent 514bbd9 commit ea8ff52

File tree

3 files changed

+62
-6
lines changed

3 files changed

+62
-6
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
5555
private final AtomicBoolean paused = new AtomicBoolean(false);
5656
private final AtomicReference<CountDownLatch> echoedFlowAfterPauseLatch = new AtomicReference<>();
5757
private final MetricsCollector metricsCollector;
58+
private final SessionHandler sessionHandler;
5859
// native receiver internal state, accessed only in the native executor/scheduler
5960
private ProtonReceiver protonReceiver;
6061
private Scheduler protonExecutor;
@@ -72,9 +73,10 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
7273
.observationCollector()
7374
.subscribe(builder.queue(), builder.messageHandler());
7475
this.address = "/queue/" + builder.queue();
75-
this.nativeReceiver = createNativeReceiver(builder.connection().nativeSession(), this.address);
76-
this.initStateFromNativeReceiver(this.nativeReceiver);
7776
this.connection = builder.connection();
77+
this.sessionHandler = new SessionHandler.ConnectionNativeSessionSessionHandler(this.connection);
78+
this.nativeReceiver = this.createNativeReceiver(this.sessionHandler.session(), this.address);
79+
this.initStateFromNativeReceiver(this.nativeReceiver);
7880
this.metricsCollector = this.connection.metricsCollector();
7981
this.startReceivingLoop();
8082
this.state(OPEN);
@@ -192,7 +194,7 @@ private void startReceivingLoop() {
192194
}
193195

194196
void recoverAfterConnectionFailure() {
195-
this.nativeReceiver = createNativeReceiver(this.connection.nativeSession(false), this.address);
197+
this.nativeReceiver = createNativeReceiver(this.sessionHandler.sessionNoCheck(), this.address);
196198
this.initStateFromNativeReceiver(this.nativeReceiver);
197199
this.paused.set(false);
198200
startReceivingLoop();
@@ -211,6 +213,7 @@ private void close(Throwable cause) {
211213
}
212214
try {
213215
this.nativeReceiver.close();
216+
this.sessionHandler.close();
214217
} catch (Exception e) {
215218
LOGGER.warn("Error while closing receiver", e);
216219
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
5151
private final Function<Message, Tracker> publishCall;
5252
private final DefaultAddressBuilder.DestinationSpec destinationSpec;
5353
private final Duration publishTimeout;
54+
private final SessionHandler sessionHandler;
5455
private volatile ObservationCollector.ConnectionInfo connectionInfo;
5556

5657
AmqpPublisher(AmqpPublisherBuilder builder) {
@@ -61,8 +62,8 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
6162
this.destinationSpec = builder.destination();
6263
this.connection = builder.connection();
6364
this.publishTimeout = builder.publishTimeout();
64-
this.sender =
65-
this.createSender(builder.connection().nativeSession(), this.address, this.publishTimeout);
65+
this.sessionHandler = new SessionHandler.ConnectionNativeSessionSessionHandler(this.connection);
66+
this.sender = this.createSender(sessionHandler.session(), this.address, this.publishTimeout);
6667
this.metricsCollector = this.connection.metricsCollector();
6768
this.observationCollector = this.connection.observationCollector();
6869
this.state(OPEN);
@@ -127,7 +128,7 @@ public void publish(Message message, Callback callback) {
127128
void recoverAfterConnectionFailure() {
128129
this.connectionInfo = new Utils.ObservationConnectionInfo(this.connection.connectionAddress());
129130
this.sender =
130-
this.createSender(this.connection.nativeSession(false), this.address, this.publishTimeout);
131+
this.createSender(this.sessionHandler.sessionNoCheck(), this.address, this.publishTimeout);
131132
}
132133

133134
@Override
@@ -162,6 +163,7 @@ private void close(Throwable cause) {
162163
this.connection.removePublisher(this);
163164
try {
164165
this.sender.close();
166+
this.sessionHandler.close();
165167
} catch (Exception e) {
166168
LOGGER.warn("Error while closing sender", e);
167169
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.impl;
19+
20+
import org.apache.qpid.protonj2.client.Session;
21+
22+
interface SessionHandler extends AutoCloseable {
23+
24+
Session session();
25+
26+
Session sessionNoCheck();
27+
28+
void close();
29+
30+
class ConnectionNativeSessionSessionHandler implements SessionHandler {
31+
32+
private final AmqpConnection connection;
33+
34+
ConnectionNativeSessionSessionHandler(AmqpConnection connection) {
35+
this.connection = connection;
36+
}
37+
38+
@Override
39+
public Session session() {
40+
return this.connection.nativeSession();
41+
}
42+
43+
@Override
44+
public Session sessionNoCheck() {
45+
return this.connection.nativeSession(false);
46+
}
47+
48+
@Override
49+
public void close() {}
50+
}
51+
}

0 commit comments

Comments
 (0)