Skip to content

Commit e3aafb8

Browse files
committed
Add alarm test
Publishers are blocked when an alarm is triggered, but consumers are not.
1 parent a427638 commit e3aafb8

File tree

3 files changed

+218
-4
lines changed

3 files changed

+218
-4
lines changed
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.model.amqp;
19+
20+
import static com.rabbitmq.model.amqp.TestUtils.*;
21+
import static java.util.stream.IntStream.range;
22+
23+
import com.rabbitmq.model.Connection;
24+
import com.rabbitmq.model.Environment;
25+
import com.rabbitmq.model.Publisher;
26+
import org.junit.jupiter.api.*;
27+
import org.junit.jupiter.params.ParameterizedTest;
28+
import org.junit.jupiter.params.provider.ValueSource;
29+
30+
public class AlarmsTest {
31+
32+
static Environment environment;
33+
Connection connection;
34+
35+
@BeforeAll
36+
static void initAll() {
37+
environment = environmentBuilder().build();
38+
}
39+
40+
@BeforeEach
41+
void init() {
42+
this.connection = environment.connectionBuilder().build();
43+
}
44+
45+
@AfterEach
46+
void tearDown() {
47+
this.connection.close();
48+
}
49+
50+
@AfterAll
51+
static void tearDownAll() {
52+
environment.close();
53+
}
54+
55+
@ParameterizedTest
56+
@ValueSource(strings = {"disk", "memory"})
57+
void alarmShouldBlockPublisher(String alarmType) throws Exception {
58+
String q = connection.management().queue().exclusive(true).declare().name();
59+
Publisher publisher = connection.publisherBuilder().queue(q).build();
60+
int messageCount = 100;
61+
Sync publishSync = sync(messageCount);
62+
range(0, messageCount)
63+
.forEach(ignored -> publisher.publish(publisher.message(), ctx -> publishSync.down()));
64+
assertThat(publishSync).completes();
65+
publishSync.reset(messageCount + 1);
66+
Sync consumeSync = sync(messageCount);
67+
try (AutoCloseable ignored = alarm(alarmType)) {
68+
new Thread(() -> publisher.publish(publisher.message(), ctx -> publishSync.down())).start();
69+
connection
70+
.consumerBuilder()
71+
.queue(q)
72+
.messageHandler(
73+
(ctx, msg) -> {
74+
ctx.accept();
75+
consumeSync.down();
76+
})
77+
.build();
78+
assertThat(consumeSync).completes();
79+
consumeSync.reset(messageCount + 1);
80+
}
81+
82+
range(0, messageCount)
83+
.forEach(ignored -> publisher.publish(publisher.message(), ctx -> publishSync.down()));
84+
assertThat(publishSync).completes();
85+
assertThat(consumeSync).completes();
86+
}
87+
88+
private static AutoCloseable alarm(String type) throws Exception {
89+
if ("disk".equals(type)) {
90+
return Cli.diskAlarm();
91+
} else if ("memory".equals(type)) {
92+
return Cli.memoryAlarm();
93+
} else {
94+
throw new IllegalArgumentException();
95+
}
96+
}
97+
}

src/test/java/com/rabbitmq/model/amqp/Cli.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.ArrayList;
2525
import java.util.Arrays;
2626
import java.util.List;
27+
import java.util.concurrent.Callable;
2728
import java.util.function.Predicate;
2829
import java.util.regex.Matcher;
2930
import java.util.regex.Pattern;
@@ -199,6 +200,54 @@ static QueueInfo queueInfo(String q) {
199200
return listQueues().stream().filter(info -> q.equals(info.name())).findFirst().get();
200201
}
201202

203+
public static AutoCloseable diskAlarm() throws Exception {
204+
return new CallableAutoCloseable(
205+
() -> {
206+
setDiskAlarm();
207+
return null;
208+
},
209+
() -> {
210+
clearDiskAlarm();
211+
return null;
212+
});
213+
}
214+
215+
public static AutoCloseable memoryAlarm() throws Exception {
216+
return new CallableAutoCloseable(
217+
() -> {
218+
setMemoryAlarm();
219+
return null;
220+
},
221+
() -> {
222+
clearMemoryAlarm();
223+
return null;
224+
});
225+
}
226+
227+
private static void setDiskAlarm() {
228+
setResourceAlarm("disk");
229+
}
230+
231+
private static void clearDiskAlarm() {
232+
clearResourceAlarm("disk");
233+
}
234+
235+
private static void setMemoryAlarm() {
236+
setResourceAlarm("memory");
237+
}
238+
239+
static void clearMemoryAlarm() {
240+
clearResourceAlarm("memory");
241+
}
242+
243+
private static void setResourceAlarm(String source) {
244+
rabbitmqctl("eval 'rabbit_alarm:set_alarm({{resource_limit, " + source + ", node()}, []}).'");
245+
}
246+
247+
private static void clearResourceAlarm(String source) {
248+
rabbitmqctl("eval 'rabbit_alarm:clear_alarm({resource_limit, " + source + ", node()}).'");
249+
}
250+
202251
private static class ConnectionInfo {
203252
private final String pid;
204253
private final int peerPort;
@@ -325,4 +374,19 @@ private static String extractConnectionName(String clientProperties) {
325374
static String hostname() {
326375
return executeCommand("hostname").output();
327376
}
377+
378+
private static final class CallableAutoCloseable implements AutoCloseable {
379+
380+
private final Callable<Void> end;
381+
382+
private CallableAutoCloseable(Callable<Void> start, Callable<Void> end) throws Exception {
383+
this.end = end;
384+
start.call();
385+
}
386+
387+
@Override
388+
public void close() throws Exception {
389+
this.end.call();
390+
}
391+
}
328392
}

src/test/java/com/rabbitmq/model/amqp/TestUtils.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,9 @@
4949
import org.junit.jupiter.api.extension.ExecutionCondition;
5050
import org.junit.jupiter.api.extension.ExtendWith;
5151
import org.junit.jupiter.api.extension.ExtensionContext;
52-
import org.slf4j.Logger;
53-
import org.slf4j.LoggerFactory;
5452

5553
public abstract class TestUtils {
5654

57-
private static final Logger LOGGER = LoggerFactory.getLogger(TestUtils.class);
58-
5955
private static final Duration DEFAULT_CONDITION_TIMEOUT = Duration.ofSeconds(10);
6056

6157
private TestUtils() {}
@@ -140,6 +136,34 @@ static QueueInfoAssert assertThat(Management.QueueInfo queueInfo) {
140136
return new QueueInfoAssert(queueInfo);
141137
}
142138

139+
static SyncAssert assertThat(Sync sync) {
140+
return new SyncAssert(sync);
141+
}
142+
143+
static class SyncAssert extends AbstractObjectAssert<SyncAssert, Sync> {
144+
145+
private SyncAssert(Sync sync) {
146+
super(sync, SyncAssert.class);
147+
}
148+
149+
SyncAssert completes() {
150+
return this.completes(DEFAULT_CONDITION_TIMEOUT);
151+
}
152+
153+
SyncAssert completes(Duration timeout) {
154+
try {
155+
boolean completed = actual.await(timeout);
156+
if (!completed) {
157+
fail("Sync timed out after %d ms", timeout.toMillis());
158+
}
159+
} catch (InterruptedException e) {
160+
Thread.interrupted();
161+
throw new RuntimeException(e);
162+
}
163+
return this;
164+
}
165+
}
166+
143167
static CountDownLatchAssert assertThat(AtomicReference<CountDownLatch> reference) {
144168
return new CountDownLatchAssert(reference);
145169
}
@@ -510,6 +534,14 @@ private QueueInfoAssert flag(String label, boolean expected, boolean actual) {
510534
}
511535
}
512536

537+
static Sync sync() {
538+
return sync(1);
539+
}
540+
541+
static Sync sync(int count) {
542+
return new Sync(count);
543+
}
544+
513545
private static class CloseableResourceWrapper<T>
514546
implements ExtensionContext.Store.CloseableResource {
515547

@@ -530,4 +562,25 @@ public void close() {
530562
this.closing.accept(this.resource);
531563
}
532564
}
565+
566+
static class Sync {
567+
568+
private final AtomicReference<CountDownLatch> latch = new AtomicReference<>();
569+
570+
private Sync(int count) {
571+
this.latch.set(new CountDownLatch(count));
572+
}
573+
574+
void down() {
575+
this.latch.get().countDown();
576+
}
577+
578+
private boolean await(Duration timeout) throws InterruptedException {
579+
return this.latch.get().await(timeout.toMillis(), TimeUnit.MILLISECONDS);
580+
}
581+
582+
void reset(int count) {
583+
this.latch.set(new CountDownLatch(count));
584+
}
585+
}
533586
}

0 commit comments

Comments
 (0)