Skip to content

Commit d0383cb

Browse files
committed
Force ClientFutureFactory to "progressive"
1 parent 534db74 commit d0383cb

File tree

2 files changed

+26
-11
lines changed

2 files changed

+26
-11
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.atomic.AtomicBoolean;
2727
import java.util.concurrent.atomic.AtomicLong;
2828
import org.apache.qpid.protonj2.client.*;
29+
import org.apache.qpid.protonj2.client.futures.ClientFutureFactory;
2930

3031
class AmqpEnvironment implements Environment {
3132

@@ -55,7 +56,7 @@ class AmqpEnvironment implements Environment {
5556
this.id = ID_SEQUENCE.getAndIncrement();
5657
connectionSettings.copyTo(this.connectionSettings);
5758
this.connectionSettings.consolidate();
58-
ClientOptions clientOptions = new ClientOptions();
59+
ClientOptions clientOptions = new ClientOptions().futureType(ClientFutureFactory.PROGRESSIVE);
5960
this.client = Client.create(clientOptions);
6061

6162
if (executorService == null) {

src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.slf4j.LoggerFactory;
4343

4444
@DisabledIfNotCluster
45-
@Disabled
4645
public class RecoveryClusterTest {
4746

4847
private static final Logger LOGGER = LoggerFactory.getLogger(RecoveryClusterTest.class);
@@ -60,20 +59,21 @@ public class RecoveryClusterTest {
6059
@BeforeAll
6160
static void initAll() {
6261
nodes = Cli.nodes();
62+
LOGGER.info("Available processor(s): {}", Runtime.getRuntime().availableProcessors());
6363
}
6464

6565
@BeforeEach
6666
void init(TestInfo info) {
6767
environment =
6868
new AmqpEnvironmentBuilder().connectionSettings().uris(URIS).environmentBuilder().build();
6969
this.connection =
70-
(AmqpConnection)
71-
environment
72-
.connectionBuilder()
73-
.recovery()
74-
.backOffDelayPolicy(BACK_OFF_DELAY_POLICY)
75-
.connectionBuilder()
76-
.build();
70+
connection(
71+
b ->
72+
b.name("c-management")
73+
.recovery()
74+
.backOffDelayPolicy(BACK_OFF_DELAY_POLICY)
75+
.connectionBuilder()
76+
.build());
7777
this.management = connection.management();
7878
this.testInfo = info;
7979
}
@@ -92,22 +92,36 @@ void clusterRestart() {
9292
List<ConsumerState> consumerStates = Collections.emptyList();
9393
try {
9494
qqNames.forEach(n -> management.queue(n).type(Management.QueueType.QUORUM).declare());
95+
AtomicInteger counter = new AtomicInteger(0);
9596
consumerStates =
9697
qqNames.stream()
9798
.map(
9899
n ->
99100
new ConsumerState(
100101
n,
101-
connection(b -> b.affinity().queue(n).operation(CONSUME).connection())))
102+
connection(
103+
b ->
104+
b.name("consumer-" + counter.getAndIncrement())
105+
.affinity()
106+
.queue(n)
107+
.operation(CONSUME)
108+
.connection())))
102109
.collect(toList());
103110

111+
counter.set(0);
104112
publisherStates =
105113
qqNames.stream()
106114
.map(
107115
n ->
108116
new PublisherState(
109117
n,
110-
connection(b -> b.affinity().queue(n).operation(PUBLISH).connection())))
118+
connection(
119+
b ->
120+
b.name("publisher-" + counter.getAndIncrement())
121+
.affinity()
122+
.queue(n)
123+
.operation(PUBLISH)
124+
.connection())))
111125
.collect(toList());
112126

113127
publisherStates.forEach(PublisherState::start);

0 commit comments

Comments
 (0)