|
28 | 28 | import java.util.UUID;
|
29 | 29 | import java.util.concurrent.CountDownLatch;
|
30 | 30 | import java.util.concurrent.TimeUnit;
|
| 31 | +import java.util.concurrent.atomic.AtomicLong; |
31 | 32 |
|
32 | 33 | import org.junit.Test;
|
33 | 34 |
|
|
40 | 41 | class RejectingConsumer extends DefaultConsumer {
|
41 | 42 | private CountDownLatch latch;
|
42 | 43 | private Map<String, Object> headers;
|
| 44 | + private AtomicLong counter; |
43 | 45 |
|
44 | 46 | public RejectingConsumer(Channel channel, CountDownLatch latch) {
|
45 | 47 | super(channel);
|
46 | 48 | this.latch = latch;
|
| 49 | + this.counter = new AtomicLong(latch.getCount()); |
47 | 50 | }
|
48 | 51 |
|
49 | 52 | @Override
|
50 | 53 | public void handleDelivery(String consumerTag, Envelope envelope,
|
51 | 54 | AMQP.BasicProperties properties, byte[] body)
|
52 | 55 | throws IOException {
|
| 56 | + |
53 | 57 | if(this.latch.getCount() > 0) {
|
54 | 58 | this.getChannel().basicReject(envelope.getDeliveryTag(), false);
|
55 | 59 | } else {
|
56 | 60 | if(this.getChannel().isOpen()) {
|
57 | 61 | this.getChannel().basicAck(envelope.getDeliveryTag(), false);
|
58 | 62 | }
|
59 | 63 | }
|
60 |
| - this.headers = properties.getHeaders(); |
| 64 | + if(this.counter.decrementAndGet() == 0) { |
| 65 | + // get headers only when the message has been redelivered |
| 66 | + // the expected number of times. |
| 67 | + // it looks like the message can be redelivered because |
| 68 | + // of the reject when handleDelivery isn't done yet or |
| 69 | + // before the latch releases the main thread. There's then |
| 70 | + // an additional delivery and the checks (transiently) fail. |
| 71 | + this.headers = properties.getHeaders(); |
| 72 | + } |
61 | 73 | latch.countDown();
|
62 | 74 | }
|
63 | 75 |
|
@@ -88,7 +100,7 @@ public class XDeathHeaderGrowth extends BrokerTestCase {
|
88 | 100 | this.channel.queueBind(q3, x2, "");
|
89 | 101 |
|
90 | 102 | final String qz = "issues.rabbitmq-server-78.destination";
|
91 |
| - declareTransientQueue(qz, argumentsForDeadLetteringTo(x3)); |
| 103 | + declareTransientQueue(qz, argumentsForDeadLetteringWithoutTtlTo(x3)); |
92 | 104 | this.channel.queueBind(qz, x3, "");
|
93 | 105 |
|
94 | 106 | CountDownLatch latch = new CountDownLatch(10);
|
@@ -143,7 +155,7 @@ private void cleanUpQueues(String... qs) throws IOException {
|
143 | 155 | this.channel.queueBind(q2, x1, "");
|
144 | 156 |
|
145 | 157 | final String qz = "issues.rabbitmq-server-152.destination";
|
146 |
| - declareTransientQueue(qz, argumentsForDeadLetteringTo(x2)); |
| 158 | + declareTransientQueue(qz, argumentsForDeadLetteringWithoutTtlTo(x2)); |
147 | 159 | this.channel.queueBind(qz, x2, "");
|
148 | 160 |
|
149 | 161 | CountDownLatch latch = new CountDownLatch(10);
|
@@ -215,11 +227,17 @@ private Map<String, Object> argumentsForDeadLetteringTo(String dlx) {
|
215 | 227 | return argumentsForDeadLetteringTo(dlx, 1);
|
216 | 228 | }
|
217 | 229 |
|
| 230 | + private Map<String, Object> argumentsForDeadLetteringWithoutTtlTo(String dlx) { |
| 231 | + return argumentsForDeadLetteringTo(dlx, -1); |
| 232 | + } |
| 233 | + |
218 | 234 | private Map<String, Object> argumentsForDeadLetteringTo(String dlx, int ttl) {
|
219 | 235 | Map<String, Object> m = new HashMap<String, Object>();
|
220 | 236 | m.put("x-dead-letter-exchange", dlx);
|
221 | 237 | m.put("x-dead-letter-routing-key", "some-routing-key");
|
222 |
| - m.put("x-message-ttl", ttl); |
| 238 | + if(ttl > 0) { |
| 239 | + m.put("x-message-ttl", ttl); |
| 240 | + } |
223 | 241 | return m;
|
224 | 242 | }
|
225 | 243 | }
|
0 commit comments