Skip to content

Commit 8723a98

Browse files
committed
Make publishing in performance tool able to recover
1 parent 157a85a commit 8723a98

File tree

2 files changed

+46
-6
lines changed

2 files changed

+46
-6
lines changed

src/test/java/com/rabbitmq/client/amqp/perf/AmqpPerfTest.java

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,13 @@
3535
import java.nio.charset.StandardCharsets;
3636
import java.util.concurrent.*;
3737
import java.util.concurrent.atomic.AtomicBoolean;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
3840

3941
public class AmqpPerfTest {
4042

43+
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpPerfTest.class);
44+
4145
/*
4246
./mvnw -q clean test-compile exec:java \
4347
-Dexec.mainClass=com.rabbitmq.client.amqp.perf.AmqpPerfTest \
@@ -86,6 +90,12 @@ public static void main(String[] args) throws IOException {
8690

8791
connection
8892
.consumerBuilder()
93+
.listeners(
94+
context -> {
95+
if (context.currentState() == Resource.State.RECOVERING) {
96+
LOGGER.info("Consumer is recovering...");
97+
}
98+
})
8999
.queue(q)
90100
.initialCredits(1000)
91101
.messageHandler(
@@ -103,7 +113,24 @@ public static void main(String[] args) throws IOException {
103113

104114
executorService.submit(
105115
() -> {
106-
Publisher publisher = connection.publisherBuilder().exchange(e).key(rk).build();
116+
AtomicBoolean shouldPublish = new AtomicBoolean(false);
117+
Publisher publisher =
118+
connection
119+
.publisherBuilder()
120+
.exchange(e)
121+
.key(rk)
122+
.listeners(
123+
context -> {
124+
if (context.currentState() == Resource.State.OPEN) {
125+
shouldPublish.set(true);
126+
} else {
127+
if (context.currentState() == Resource.State.RECOVERING) {
128+
LOGGER.info("Publisher is recovering...");
129+
}
130+
shouldPublish.set(false);
131+
}
132+
})
133+
.build();
107134
Publisher.Callback callback =
108135
context -> {
109136
try {
@@ -116,11 +143,23 @@ public static void main(String[] args) throws IOException {
116143
};
117144
int msgSize = 10;
118145
while (!Thread.currentThread().isInterrupted()) {
119-
long creationTime = System.currentTimeMillis();
120-
byte[] payload = new byte[msgSize];
121-
writeLong(payload, creationTime);
122-
Message message = publisher.message(payload);
123-
publisher.publish(message, callback);
146+
if (shouldPublish.get()) {
147+
long creationTime = System.currentTimeMillis();
148+
byte[] payload = new byte[msgSize];
149+
writeLong(payload, creationTime);
150+
try {
151+
Message message = publisher.message(payload);
152+
publisher.publish(message, callback);
153+
} catch (Exception ex) {
154+
LOGGER.info("Error while trying to publish: {}", ex.getMessage());
155+
}
156+
} else {
157+
try {
158+
Thread.sleep(1000L);
159+
} catch (InterruptedException ex) {
160+
Thread.interrupted();
161+
}
162+
}
124163
}
125164
});
126165
out.println("Prometheus endpoint started on http://localhost:" + monitoringPort + "/metrics");

src/test/resources/logback-test.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
<logger name="com.rabbitmq.client.amqp.impl.AmqpConsumer" level="warn" />
1313
<logger name="com.rabbitmq.client.amqp.impl.AmqpManagement" level="warn" />
1414
<logger name="org.apache.qpid" level="warn" />
15+
<logger name="com.rabbitmq.client.amqp.perf" level="info" />
1516

1617
<root level="warn">
1718
<appender-ref ref="STDOUT" />

0 commit comments

Comments
 (0)