Skip to content

Commit bad2dd4

Browse files
Merge branch 'master' into prevent-socket-write-blocking
2 parents 3d824dc + 7fc783b commit bad2dd4

File tree

4 files changed

+29
-8
lines changed

4 files changed

+29
-8
lines changed

src/test/java/com/rabbitmq/client/test/functional/ConnectionRecovery.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Arrays;
2727
import java.util.List;
2828
import java.util.UUID;
29+
import java.util.concurrent.CopyOnWriteArrayList;
2930
import java.util.concurrent.CountDownLatch;
3031
import java.util.concurrent.TimeUnit;
3132
import java.util.concurrent.TimeoutException;
@@ -145,7 +146,7 @@ public class ConnectionRecovery extends BrokerTestCase {
145146

146147
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
147148
@Test public void thatShutdownHooksOnConnectionFireBeforeRecoveryStarts() throws IOException, InterruptedException {
148-
final List<String> events = new ArrayList<String>();
149+
final List<String> events = new CopyOnWriteArrayList<String>();
149150
final CountDownLatch latch = new CountDownLatch(1);
150151
connection.addShutdownListener(new ShutdownListener() {
151152
public void shutdownCompleted(ShutdownSignalException cause) {

src/test/java/com/rabbitmq/client/test/functional/DeadLetterExchange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ private void sleep(long millis) {
583583
publication time + TTL + latency */
584584
private void checkPromptArrival(QueueingConsumer c,
585585
int count, long latency) throws Exception {
586-
long epsilon = TTL / 20;
586+
long epsilon = TTL / 15;
587587
for (int i = 0; i < count; i++) {
588588
Delivery d = c.nextDelivery(TTL + TTL + latency + epsilon);
589589
assertNotNull("message #" + i + " did not expire", d);

src/test/java/com/rabbitmq/client/test/functional/TTLHandling.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ protected void releaseResources() throws IOException {
156156
closeChannel();
157157
openChannel();
158158

159-
Thread.sleep(150);
159+
Thread.sleep(110);
160160
expectBodyAndRemainingMessages(MSG[1], 1);
161161
expectBodyAndRemainingMessages(MSG[2], 0);
162162
}

src/test/java/com/rabbitmq/client/test/server/XDeathHeaderGrowth.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.UUID;
2929
import java.util.concurrent.CountDownLatch;
3030
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicInteger;
32+
import java.util.concurrent.atomic.AtomicLong;
3133

3234
import org.junit.Test;
3335

@@ -39,25 +41,37 @@
3941

4042
class RejectingConsumer extends DefaultConsumer {
4143
private CountDownLatch latch;
42-
private Map<String, Object> headers;
44+
private volatile Map<String, Object> headers;
45+
private AtomicLong counter;
46+
4347

4448
public RejectingConsumer(Channel channel, CountDownLatch latch) {
4549
super(channel);
4650
this.latch = latch;
51+
this.counter = new AtomicLong(latch.getCount());
4752
}
4853

4954
@Override
5055
public void handleDelivery(String consumerTag, Envelope envelope,
5156
AMQP.BasicProperties properties, byte[] body)
5257
throws IOException {
58+
5359
if(this.latch.getCount() > 0) {
5460
this.getChannel().basicReject(envelope.getDeliveryTag(), false);
5561
} else {
5662
if(this.getChannel().isOpen()) {
5763
this.getChannel().basicAck(envelope.getDeliveryTag(), false);
5864
}
5965
}
60-
this.headers = properties.getHeaders();
66+
if(this.counter.decrementAndGet() == 0) {
67+
// get headers only when the message has been redelivered
68+
// the expected number of times.
69+
// it looks like the message can be redelivered because
70+
// of the reject when handleDelivery isn't done yet or
71+
// before the latch releases the main thread. There's then
72+
// an additional delivery and the checks (transiently) fail.
73+
this.headers = properties.getHeaders();
74+
}
6175
latch.countDown();
6276
}
6377

@@ -88,7 +102,7 @@ public class XDeathHeaderGrowth extends BrokerTestCase {
88102
this.channel.queueBind(q3, x2, "");
89103

90104
final String qz = "issues.rabbitmq-server-78.destination";
91-
declareTransientQueue(qz, argumentsForDeadLetteringTo(x3));
105+
declareTransientQueue(qz, argumentsForDeadLetteringWithoutTtlTo(x3));
92106
this.channel.queueBind(qz, x3, "");
93107

94108
CountDownLatch latch = new CountDownLatch(10);
@@ -143,7 +157,7 @@ private void cleanUpQueues(String... qs) throws IOException {
143157
this.channel.queueBind(q2, x1, "");
144158

145159
final String qz = "issues.rabbitmq-server-152.destination";
146-
declareTransientQueue(qz, argumentsForDeadLetteringTo(x2));
160+
declareTransientQueue(qz, argumentsForDeadLetteringWithoutTtlTo(x2));
147161
this.channel.queueBind(qz, x2, "");
148162

149163
CountDownLatch latch = new CountDownLatch(10);
@@ -215,11 +229,17 @@ private Map<String, Object> argumentsForDeadLetteringTo(String dlx) {
215229
return argumentsForDeadLetteringTo(dlx, 1);
216230
}
217231

232+
private Map<String, Object> argumentsForDeadLetteringWithoutTtlTo(String dlx) {
233+
return argumentsForDeadLetteringTo(dlx, -1);
234+
}
235+
218236
private Map<String, Object> argumentsForDeadLetteringTo(String dlx, int ttl) {
219237
Map<String, Object> m = new HashMap<String, Object>();
220238
m.put("x-dead-letter-exchange", dlx);
221239
m.put("x-dead-letter-routing-key", "some-routing-key");
222-
m.put("x-message-ttl", ttl);
240+
if(ttl > 0) {
241+
m.put("x-message-ttl", ttl);
242+
}
223243
return m;
224244
}
225245
}

0 commit comments

Comments
 (0)