19
19
20
20
import static com .rabbitmq .client .amqp .ConnectionSettings .Affinity .Operation .CONSUME ;
21
21
import static com .rabbitmq .client .amqp .ConnectionSettings .Affinity .Operation .PUBLISH ;
22
+ import static com .rabbitmq .client .amqp .Resource .State .OPEN ;
22
23
import static com .rabbitmq .client .amqp .impl .Assertions .assertThat ;
23
24
import static com .rabbitmq .client .amqp .impl .TestUtils .*;
25
+ import static java .lang .String .format ;
24
26
import static java .time .Duration .ofSeconds ;
25
27
import static java .util .stream .Collectors .toList ;
26
28
import static org .assertj .core .api .Assertions .assertThat ;
@@ -123,7 +125,9 @@ void clusterRestart() {
123
125
Cli .rebalance ();
124
126
LOGGER .info ("Rebalancing over." );
125
127
126
- waitAtMost (() -> connection .state () == Resource .State .OPEN );
128
+ waitAtMost (
129
+ () -> connection .state () == OPEN ,
130
+ () -> format ("Test connection state is %s, expecting %s" , connection .state (), OPEN ));
127
131
LOGGER .info ("Test connection has recovered" );
128
132
129
133
qqNames .forEach (
@@ -144,8 +148,8 @@ void clusterRestart() {
144
148
syncs .forEach (s -> assertThat (s ).completes ());
145
149
LOGGER .info ("Check consumers have recovered." );
146
150
147
- assertThat (publisherStates ).allMatch (s -> s .state () == Resource . State . OPEN );
148
- assertThat (consumerStates ).allMatch (s -> s .state () == Resource . State . OPEN );
151
+ assertThat (publisherStates ).allMatch (s -> s .state () == OPEN );
152
+ assertThat (consumerStates ).allMatch (s -> s .state () == OPEN );
149
153
150
154
System .out .println ("Queues:" );
151
155
qqNames .forEach (
@@ -217,7 +221,7 @@ void start() {
217
221
.newThread (
218
222
() -> {
219
223
while (!stopped .get () && !Thread .currentThread ().isInterrupted ()) {
220
- if (state .get () == Resource . State . OPEN ) {
224
+ if (state .get () == OPEN ) {
221
225
try {
222
226
this .limiter .acquire (1 );
223
227
this .publisher .publish (publisher .message (BODY ), callback );
0 commit comments