Skip to content

Commit 2e0a6ba

Browse files
committed
Add test for publisher with moving quorum queue
1 parent 59f0b9d commit 2e0a6ba

18 files changed

+461
-409
lines changed

src/test/java/com/rabbitmq/client/amqp/impl/AddressFormatTest.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import static com.rabbitmq.client.amqp.Management.ExchangeType.DIRECT;
2121
import static com.rabbitmq.client.amqp.Management.ExchangeType.FANOUT;
22-
import static com.rabbitmq.client.amqp.impl.TestUtils.assertThat;
2322
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2423

2524
import com.rabbitmq.client.amqp.*;
@@ -52,7 +51,7 @@ void exchangeKeyInAddress(TestInfo info) {
5251
failedLatch.countDown();
5352
}
5453
});
55-
assertThat(failedLatch).completes();
54+
Assertions.assertThat(failedLatch).completes();
5655

5756
CountDownLatch consumeLatch = new CountDownLatch(1);
5857
connection
@@ -67,7 +66,7 @@ void exchangeKeyInAddress(TestInfo info) {
6766

6867
management.binding().sourceExchange(e).key(k).destinationQueue(q).bind();
6968
publisher.publish(publisher.message(), ctx -> {});
70-
assertThat(consumeLatch).completes();
69+
Assertions.assertThat(consumeLatch).completes();
7170
} finally {
7271
management.queueDeletion().delete(q);
7372
management.exchangeDeletion().delete(e);
@@ -93,7 +92,7 @@ void exchangeInAddress(TestInfo info) {
9392
failedLatch.countDown();
9493
}
9594
});
96-
assertThat(failedLatch).completes();
95+
Assertions.assertThat(failedLatch).completes();
9796

9897
CountDownLatch consumeLatch = new CountDownLatch(1);
9998
connection
@@ -108,7 +107,7 @@ void exchangeInAddress(TestInfo info) {
108107

109108
management.binding().sourceExchange(e).destinationQueue(q).bind();
110109
publisher.publish(publisher.message(), ctx -> {});
111-
assertThat(consumeLatch).completes();
110+
Assertions.assertThat(consumeLatch).completes();
112111
} finally {
113112
management.queueDeletion().delete(q);
114113
management.exchangeDeletion().delete(e);
@@ -136,7 +135,7 @@ void queueInTargetAddress(TestInfo info) {
136135
.build();
137136

138137
publisher.publish(publisher.message(), ctx -> {});
139-
assertThat(consumeLatch).completes();
138+
Assertions.assertThat(consumeLatch).completes();
140139
} finally {
141140
management.queueDeletion().delete(q);
142141
}
@@ -170,7 +169,7 @@ void exchangeKeyInToField(TestInfo info) {
170169
failedLatch.countDown();
171170
}
172171
});
173-
assertThat(failedLatch).completes();
172+
Assertions.assertThat(failedLatch).completes();
174173

175174
CountDownLatch consumeLatch = new CountDownLatch(2);
176175
connection
@@ -185,7 +184,7 @@ void exchangeKeyInToField(TestInfo info) {
185184

186185
publisher.publish(publisher.message().toAddress().exchange(e).key(k).message(), ctx -> {});
187186
publisher.publish(publisher.message().toAddress().queue(q).message(), ctx -> {});
188-
assertThat(consumeLatch).completes();
187+
Assertions.assertThat(consumeLatch).completes();
189188
} finally {
190189
management.queueDeletion().delete(q);
191190
management.exchangeDeletion().delete(e);
@@ -220,18 +219,18 @@ void noToFieldDoesNotCloseAllConnectionPublishers() {
220219
})
221220
.build();
222221
p1.publish(p1.message().toAddress().queue(q).message(), ctx -> {});
223-
assertThat(sync).completes();
222+
Assertions.assertThat(sync).completes();
224223

225224
sync.reset();
226225
Publisher p2 = connection.publisherBuilder().queue(q).build();
227226
p2.publish(p2.message(), ctx -> {});
228-
assertThat(sync).completes();
227+
Assertions.assertThat(sync).completes();
229228

230229
p1.publish(p1.message(), ctx -> {});
231230

232231
sync.reset();
233232
p2.publish(p2.message(), ctx -> {});
234-
assertThat(sync).completes();
233+
Assertions.assertThat(sync).completes();
235234
} finally {
236235
consumer.close();
237236
}

src/test/java/com/rabbitmq/client/amqp/impl/AlarmsTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ void alarmShouldBlockPublisher(String alarmType) throws Exception {
4343
Sync publishSync = sync(messageCount);
4444
range(0, messageCount)
4545
.forEach(ignored -> publisher.publish(publisher.message(), ctx -> publishSync.down()));
46-
assertThat(publishSync).completes();
46+
Assertions.assertThat(publishSync).completes();
4747
Sync consumeSync = sync(messageCount);
4848
try (AutoCloseable ignored = alarm(alarmType)) {
4949
Sync publishTimeoutSync = sync();
@@ -55,7 +55,7 @@ void alarmShouldBlockPublisher(String alarmType) throws Exception {
5555
if (e.getCause() instanceof ClientSendTimedOutException) publishTimeoutSync.down();
5656
}
5757
});
58-
assertThat(publishTimeoutSync).completes();
58+
Assertions.assertThat(publishTimeoutSync).completes();
5959
connection
6060
.consumerBuilder()
6161
.queue(q)
@@ -65,7 +65,7 @@ void alarmShouldBlockPublisher(String alarmType) throws Exception {
6565
consumeSync.down();
6666
})
6767
.build();
68-
assertThat(consumeSync).completes();
68+
Assertions.assertThat(consumeSync).completes();
6969
publishSync.reset(messageCount);
7070
consumeSync.reset(messageCount);
7171

@@ -78,8 +78,8 @@ void alarmShouldBlockPublisher(String alarmType) throws Exception {
7878

7979
range(0, messageCount)
8080
.forEach(ignored -> publisher.publish(publisher.message(), ctx -> publishSync.down()));
81-
assertThat(publishSync).completes();
82-
assertThat(consumeSync).completes();
81+
Assertions.assertThat(publishSync).completes();
82+
Assertions.assertThat(consumeSync).completes();
8383
}
8484

8585
private static AutoCloseable alarm(String type) throws Exception {

src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionRecoveryTest.java

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@
2020
import static com.rabbitmq.client.amqp.Publisher.Status.ACCEPTED;
2121
import static com.rabbitmq.client.amqp.Resource.State.OPEN;
2222
import static com.rabbitmq.client.amqp.Resource.State.RECOVERING;
23-
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
24-
import static com.rabbitmq.client.amqp.impl.TestUtils.CountDownLatchConditions.completed;
23+
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
2524
import static com.rabbitmq.client.amqp.impl.TestUtils.name;
2625
import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
2726
import static java.time.Duration.ofMillis;
@@ -34,9 +33,7 @@
3433
import com.rabbitmq.client.amqp.metrics.NoOpMetricsCollector;
3534
import java.util.*;
3635
import java.util.concurrent.ConcurrentHashMap;
37-
import java.util.concurrent.CountDownLatch;
3836
import java.util.concurrent.atomic.AtomicInteger;
39-
import java.util.concurrent.atomic.AtomicReference;
4037
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
4138
import org.junit.jupiter.api.*;
4239
import org.junit.jupiter.params.ParameterizedTest;
@@ -80,18 +77,18 @@ static void afterAll() {
8077
void connectionShouldRecoverAfterClosingIt(boolean isolateResources, TestInfo info) {
8178
String q = name(info);
8279
String connectionName = UUID.randomUUID().toString();
83-
Map<Resource.State, CountDownLatch> stateLatches = new ConcurrentHashMap<>();
84-
stateLatches.put(RECOVERING, new CountDownLatch(1));
85-
stateLatches.put(OPEN, new CountDownLatch(2));
80+
Map<Resource.State, TestUtils.Sync> stateSync = new ConcurrentHashMap<>();
81+
stateSync.put(RECOVERING, TestUtils.sync(1));
82+
stateSync.put(OPEN, TestUtils.sync(2));
8683
AmqpConnectionBuilder builder =
8784
(AmqpConnectionBuilder)
8885
new AmqpConnectionBuilder(environment)
8986
.name(connectionName)
9087
.isolateResources(isolateResources)
9188
.listeners(
9289
context -> {
93-
if (stateLatches.containsKey(context.currentState())) {
94-
stateLatches.get(context.currentState()).countDown();
90+
if (stateSync.containsKey(context.currentState())) {
91+
stateSync.get(context.currentState()).down();
9592
}
9693
})
9794
.recovery()
@@ -103,14 +100,14 @@ void connectionShouldRecoverAfterClosingIt(boolean isolateResources, TestInfo in
103100
c.management().queue().name(q).declare();
104101
AtomicInteger consumerOpenCount = new AtomicInteger(0);
105102
Collection<UUID> receivedMessageIds = Collections.synchronizedList(new ArrayList<>());
106-
AtomicReference<CountDownLatch> consumeLatch = new AtomicReference<>(new CountDownLatch(1));
103+
TestUtils.Sync consumeSync = TestUtils.sync();
107104
c.consumerBuilder()
108105
.queue(q)
109106
.messageHandler(
110107
(context, message) -> {
111108
context.accept();
112109
receivedMessageIds.add(message.messageIdAsUuid());
113-
consumeLatch.get().countDown();
110+
consumeSync.down();
114111
})
115112
.listeners(
116113
context -> {
@@ -119,7 +116,7 @@ void connectionShouldRecoverAfterClosingIt(boolean isolateResources, TestInfo in
119116
}
120117
})
121118
.build();
122-
AtomicReference<CountDownLatch> publishLatch = new AtomicReference<>(new CountDownLatch(1));
119+
TestUtils.Sync publishSync = TestUtils.sync();
123120
AtomicInteger publisherOpenCount = new AtomicInteger(0);
124121
Publisher p =
125122
c.publisherBuilder()
@@ -136,34 +133,34 @@ void connectionShouldRecoverAfterClosingIt(boolean isolateResources, TestInfo in
136133
context -> {
137134
if (context.status() == ACCEPTED) {
138135
publishedMessageIds.add(context.message().messageIdAsUuid());
139-
publishLatch.get().countDown();
136+
publishSync.down();
140137
}
141138
};
142139
p.publish(p.message().messageId(UUID.randomUUID()), outboundMessageCallback);
143140
assertThat(publisherOpenCount).hasValue(1);
144-
assertThat(publishLatch).is(CountDownLatchReferenceConditions.completed());
141+
assertThat(publishSync).completes();
145142

146143
assertThat(consumerOpenCount).hasValue(1);
147-
assertThat(consumeLatch).is(CountDownLatchReferenceConditions.completed());
144+
assertThat(consumeSync).completes();
148145
assertThat(receivedMessageIds)
149146
.hasSameSizeAs(publishedMessageIds)
150147
.containsAll(publishedMessageIds);
151148

152-
consumeLatch.set(new CountDownLatch(1));
149+
consumeSync.reset();
153150

154151
Cli.closeConnection(connectionName);
155-
assertThat(stateLatches.get(RECOVERING)).is(completed());
156-
assertThat(stateLatches.get(OPEN)).is(completed());
152+
assertThat(stateSync.get(RECOVERING)).completes();
153+
assertThat(stateSync.get(OPEN)).completes();
157154
assertThat(connectionAttemptCount).hasValue(2);
158155
waitAtMost(() -> consumerOpenCount.get() == 2);
159156
waitAtMost(() -> publisherOpenCount.get() == 2);
160157

161-
publishLatch.set(new CountDownLatch(1));
158+
publishSync.reset();
162159
p.publish(p.message().messageId(UUID.randomUUID()), outboundMessageCallback);
163-
assertThat(publishLatch).is(CountDownLatchReferenceConditions.completed());
160+
assertThat(publishSync).completes();
164161
assertThat(publishedMessageIds).hasSize(2);
165162

166-
assertThat(consumeLatch).is(CountDownLatchReferenceConditions.completed());
163+
assertThat(consumeSync).completes();
167164
assertThat(receivedMessageIds)
168165
.hasSameSizeAs(publishedMessageIds)
169166
.containsAll(publishedMessageIds);
@@ -179,18 +176,18 @@ void connectionShouldRecoverAfterClosingIt(boolean isolateResources, TestInfo in
179176
void connectionShouldRecoverAfterBrokerStopStart(boolean isolateResources, TestInfo info) {
180177
String q = name(info);
181178
String connectionName = UUID.randomUUID().toString();
182-
Map<Resource.State, CountDownLatch> stateLatches = new ConcurrentHashMap<>();
183-
stateLatches.put(RECOVERING, new CountDownLatch(1));
184-
stateLatches.put(OPEN, new CountDownLatch(2));
179+
Map<Resource.State, TestUtils.Sync> stateSync = new ConcurrentHashMap<>();
180+
stateSync.put(RECOVERING, TestUtils.sync(1));
181+
stateSync.put(OPEN, TestUtils.sync(2));
185182
AmqpConnectionBuilder builder =
186183
(AmqpConnectionBuilder)
187184
new AmqpConnectionBuilder(environment)
188185
.name(connectionName)
189186
.isolateResources(isolateResources)
190187
.listeners(
191188
context -> {
192-
if (stateLatches.containsKey(context.currentState())) {
193-
stateLatches.get(context.currentState()).countDown();
189+
if (stateSync.containsKey(context.currentState())) {
190+
stateSync.get(context.currentState()).down();
194191
}
195192
})
196193
.recovery()
@@ -201,7 +198,7 @@ void connectionShouldRecoverAfterBrokerStopStart(boolean isolateResources, TestI
201198
c.management().queue().name(q).autoDelete(true).exclusive(true).declare();
202199
try {
203200
Cli.stopBroker();
204-
assertThat(stateLatches.get(RECOVERING)).is(completed());
201+
assertThat(stateSync.get(RECOVERING)).completes();
205202
stream(
206203
new ThrowingCallable[] {
207204
() -> c.management().queue().name(q).exclusive(true).declare(),
@@ -216,7 +213,7 @@ void connectionShouldRecoverAfterBrokerStopStart(boolean isolateResources, TestI
216213
} finally {
217214
Cli.startBroker();
218215
}
219-
assertThat(stateLatches.get(OPEN)).is(completed());
216+
assertThat(stateSync.get(OPEN)).completes();
220217
assertThat(connectionAttemptCount).hasValueGreaterThan(1);
221218
c.management().queue().name(q).autoDelete(true).exclusive(true).declare();
222219
}

0 commit comments

Comments
 (0)