Skip to content

Commit 8565dba

Browse files
committed
Add parameter to create a session per publisher/consumer
The option is not public for now. Name is temporary.
1 parent ea8ff52 commit 8565dba

File tree

9 files changed

+160
-52
lines changed

9 files changed

+160
-52
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.atomic.AtomicReference;
3434
import java.util.function.BiConsumer;
3535
import java.util.function.Predicate;
36+
import java.util.function.Supplier;
3637
import org.apache.qpid.protonj2.client.ConnectionOptions;
3738
import org.apache.qpid.protonj2.client.DisconnectionEvent;
3839
import org.apache.qpid.protonj2.client.Session;
@@ -68,14 +69,18 @@ final class AmqpConnection extends ResourceBase implements Connection {
6869
private final AtomicBoolean recoveringConnection = new AtomicBoolean(false);
6970
private final DefaultConnectionSettings<?> connectionSettings =
7071
DefaultConnectionSettings.instance();
72+
private Supplier<SessionHandler> sessionHandlerSupplier;
7173

7274
AmqpConnection(AmqpConnectionBuilder builder) {
7375
super(builder.listeners());
7476
this.id = ID_SEQUENCE.getAndIncrement();
7577
this.environment = builder.environment();
7678
builder.connectionSettings().copyTo(this.connectionSettings);
7779
this.connectionSettings.consolidate();
78-
80+
this.sessionHandlerSupplier =
81+
builder.isolateResources()
82+
? () -> new SessionHandler.SingleSessionSessionHandler(this)
83+
: () -> new SessionHandler.ConnectionNativeSessionSessionHandler(this);
7984
BiConsumer<org.apache.qpid.protonj2.client.Connection, DisconnectionEvent> disconnectHandler;
8085
AmqpConnectionBuilder.AmqpRecoveryConfiguration recoveryConfiguration =
8186
builder.recoveryConfiguration();
@@ -491,6 +496,10 @@ ObservationCollector observationCollector() {
491496
return this.environment.observationCollector();
492497
}
493498

499+
SessionHandler createSessionHandler() {
500+
return this.sessionHandlerSupplier.get();
501+
}
502+
494503
Publisher createPublisher(AmqpPublisherBuilder builder) {
495504
// TODO copy the builder properties to create the publisher
496505
AmqpPublisher publisher = new AmqpPublisher(builder);

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnectionBuilder.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class AmqpConnectionBuilder implements ConnectionBuilder {
3333
private final List<Resource.StateListener> listeners = new ArrayList<>();
3434
private String name;
3535
private TopologyListener topologyListener;
36+
private boolean isolateResources = false;
3637

3738
AmqpConnectionBuilder(AmqpEnvironment environment) {
3839
this.environment = environment;
@@ -115,6 +116,15 @@ public RecoveryConfiguration recovery() {
115116
return this.recoveryConfiguration;
116117
}
117118

119+
AmqpConnectionBuilder isolateResources(boolean isolateResources) {
120+
this.isolateResources = isolateResources;
121+
return this;
122+
}
123+
124+
boolean isolateResources() {
125+
return isolateResources;
126+
}
127+
118128
@Override
119129
public Connection build() {
120130
// TODO copy the recovery configuration to keep the settings

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
7474
.subscribe(builder.queue(), builder.messageHandler());
7575
this.address = "/queue/" + builder.queue();
7676
this.connection = builder.connection();
77-
this.sessionHandler = new SessionHandler.ConnectionNativeSessionSessionHandler(this.connection);
77+
this.sessionHandler = this.connection.createSessionHandler();
7878
this.nativeReceiver = this.createNativeReceiver(this.sessionHandler.session(), this.address);
7979
this.initStateFromNativeReceiver(this.nativeReceiver);
8080
this.metricsCollector = this.connection.metricsCollector();

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
6262
this.destinationSpec = builder.destination();
6363
this.connection = builder.connection();
6464
this.publishTimeout = builder.publishTimeout();
65-
this.sessionHandler = new SessionHandler.ConnectionNativeSessionSessionHandler(this.connection);
65+
this.sessionHandler = this.connection.createSessionHandler();
6666
this.sender = this.createSender(sessionHandler.session(), this.address, this.publishTimeout);
6767
this.metricsCollector = this.connection.metricsCollector();
6868
this.observationCollector = this.connection.observationCollector();

src/main/java/com/rabbitmq/client/amqp/impl/SessionHandler.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,13 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20+
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.function.Supplier;
22+
import org.apache.qpid.protonj2.client.Connection;
2023
import org.apache.qpid.protonj2.client.Session;
24+
import org.apache.qpid.protonj2.client.exceptions.ClientException;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2127

2228
interface SessionHandler extends AutoCloseable {
2329

@@ -48,4 +54,50 @@ public Session sessionNoCheck() {
4854
@Override
4955
public void close() {}
5056
}
57+
58+
class SingleSessionSessionHandler implements SessionHandler {
59+
60+
private static final Logger LOGGER = LoggerFactory.getLogger(SingleSessionSessionHandler.class);
61+
62+
private final Supplier<Connection> connection;
63+
private final AtomicReference<Session> session = new AtomicReference<>();
64+
65+
public SingleSessionSessionHandler(AmqpConnection connection) {
66+
this.connection = connection::nativeConnection;
67+
}
68+
69+
@Override
70+
public Session session() {
71+
closeCurrentSession();
72+
try {
73+
Session session = this.connection.get().openSession();
74+
this.session.set(ExceptionUtils.wrapGet(session.openFuture()));
75+
return this.session.get();
76+
} catch (ClientException e) {
77+
this.session.set(null);
78+
throw ExceptionUtils.convert(e);
79+
}
80+
}
81+
82+
@Override
83+
public Session sessionNoCheck() {
84+
return this.session();
85+
}
86+
87+
@Override
88+
public void close() {
89+
closeCurrentSession();
90+
}
91+
92+
private void closeCurrentSession() {
93+
Session previousSession = session.getAndSet(null);
94+
if (previousSession != null) {
95+
try {
96+
previousSession.close();
97+
} catch (RuntimeException e) {
98+
LOGGER.debug("Error while closing session: {}", e.getMessage());
99+
}
100+
}
101+
}
102+
}
51103
}

src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionRecoveryTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
import java.util.concurrent.atomic.AtomicReference;
4040
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
4141
import org.junit.jupiter.api.*;
42+
import org.junit.jupiter.params.ParameterizedTest;
43+
import org.junit.jupiter.params.provider.ValueSource;
4244

4345
@DisabledIfRabbitMqCtlNotSet
4446
public class AmqpConnectionRecoveryTest {
@@ -73,8 +75,10 @@ static void afterAll() {
7375
environment.close();
7476
}
7577

76-
@Test
77-
void connectionShouldRecoverAfterClosingIt(TestInfo info) throws Exception {
78+
@ParameterizedTest
79+
@ValueSource(booleans = {true, false})
80+
void connectionShouldRecoverAfterClosingIt(boolean isolateResources, TestInfo info)
81+
throws Exception {
7882
String q = name(info);
7983
String connectionName = UUID.randomUUID().toString();
8084
Map<Resource.State, CountDownLatch> stateLatches = new ConcurrentHashMap<>();
@@ -84,6 +88,7 @@ void connectionShouldRecoverAfterClosingIt(TestInfo info) throws Exception {
8488
(AmqpConnectionBuilder)
8589
new AmqpConnectionBuilder(environment)
8690
.name(connectionName)
91+
.isolateResources(isolateResources)
8792
.listeners(
8893
context -> {
8994
if (stateLatches.containsKey(context.currentState())) {
@@ -170,8 +175,9 @@ void connectionShouldRecoverAfterClosingIt(TestInfo info) throws Exception {
170175
}
171176
}
172177

173-
@Test
174-
void connectionShouldRecoverAfterBrokerStopStart(TestInfo info) {
178+
@ParameterizedTest
179+
@ValueSource(booleans = {true, false})
180+
void connectionShouldRecoverAfterBrokerStopStart(boolean isolateResources, TestInfo info) {
175181
String q = name(info);
176182
String connectionName = UUID.randomUUID().toString();
177183
Map<Resource.State, CountDownLatch> stateLatches = new ConcurrentHashMap<>();
@@ -181,6 +187,7 @@ void connectionShouldRecoverAfterBrokerStopStart(TestInfo info) {
181187
(AmqpConnectionBuilder)
182188
new AmqpConnectionBuilder(environment)
183189
.name(connectionName)
190+
.isolateResources(isolateResources)
184191
.listeners(
185192
context -> {
186193
if (stateLatches.containsKey(context.currentState())) {

src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import java.util.stream.IntStream;
3434
import org.junit.jupiter.api.*;
3535
import org.junit.jupiter.api.extension.ExtendWith;
36+
import org.junit.jupiter.params.ParameterizedTest;
37+
import org.junit.jupiter.params.provider.ValueSource;
3638

3739
@ExtendWith(AmqpTestInfrastructureExtension.class)
3840
public class RpcTest {
@@ -201,8 +203,9 @@ void rpcUseCorrelationIdRequestProperty() {
201203
}
202204
}
203205

204-
@Test
205-
void rpcShouldRecoverAfterConnectionIsClosed()
206+
@ParameterizedTest
207+
@ValueSource(booleans = {true, false})
208+
void rpcShouldRecoverAfterConnectionIsClosed(boolean isolateResources)
206209
throws ExecutionException, InterruptedException, TimeoutException {
207210
String clientConnectionName = UUID.randomUUID().toString();
208211
CountDownLatch clientConnectionLatch = new CountDownLatch(1);
@@ -213,6 +216,7 @@ void rpcShouldRecoverAfterConnectionIsClosed()
213216
Connection serverConnection =
214217
connectionBuilder()
215218
.name(serverConnectionName)
219+
.isolateResources(isolateResources)
216220
.listeners(recoveredListener(serverConnectionLatch))
217221
.recovery()
218222
.backOffDelayPolicy(backOffDelayPolicy)
@@ -222,6 +226,7 @@ void rpcShouldRecoverAfterConnectionIsClosed()
222226
try (Connection clientConnection =
223227
connectionBuilder()
224228
.name(clientConnectionName)
229+
.isolateResources(isolateResources)
225230
.listeners(recoveredListener(clientConnectionLatch))
226231
.recovery()
227232
.backOffDelayPolicy(backOffDelayPolicy)

src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,24 +93,26 @@ public static Duration waitAtMost(
9393
public static Duration waitAtMost(
9494
Duration timeout, CallableBooleanSupplier condition, Supplier<String> message)
9595
throws Exception {
96+
long start = System.nanoTime();
9697
if (condition.getAsBoolean()) {
9798
return Duration.ZERO;
9899
}
99-
int waitTime = 100;
100-
int waitedTime = 0;
101-
int timeoutInMs = (int) timeout.toMillis();
100+
Duration waitTime = Duration.ofMillis(100);
101+
Duration waitedTime = Duration.ofNanos(System.nanoTime() - start);
102102
Exception exception = null;
103-
while (waitedTime <= timeoutInMs) {
104-
Thread.sleep(waitTime);
105-
waitedTime += waitTime;
103+
while (waitedTime.compareTo(timeout) <= 0) {
104+
Thread.sleep(waitTime.toMillis());
105+
waitedTime = waitedTime.plus(waitTime);
106+
start = System.nanoTime();
106107
try {
107108
if (condition.getAsBoolean()) {
108-
return Duration.ofMillis(waitedTime);
109+
return waitedTime;
109110
}
110111
exception = null;
111112
} catch (Exception e) {
112113
exception = e;
113114
}
115+
waitedTime = waitedTime.plus(Duration.ofNanos(System.nanoTime() - start));
114116
}
115117
String msg;
116118
if (message == null) {
@@ -123,7 +125,7 @@ public static Duration waitAtMost(
123125
} else {
124126
fail(msg, exception);
125127
}
126-
return Duration.ofMillis(waitedTime);
128+
return waitedTime;
127129
}
128130

129131
public static class CountDownLatchReferenceConditions {

0 commit comments

Comments
 (0)