Skip to content

Commit 15e771a

Browse files
garyrussellartembilan
authored andcommitted
GH-2003: Fix RetryableTopic Delivery Attempts Hdr
Resolves #2003 **cherry-pick to 2.7.x**
1 parent df4a5b1 commit 15e771a

File tree

2 files changed

+56
-9
lines changed

2 files changed

+56
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.retrytopic;
1818

1919
import java.math.BigInteger;
20+
import java.nio.ByteBuffer;
2021
import java.time.Instant;
2122
import java.util.HashSet;
2223
import java.util.LinkedHashSet;
@@ -191,17 +192,28 @@ protected TopicPartition resolveTopicPartition(final ConsumerRecord<?, ?> cr, fi
191192

192193
private int getAttempts(ConsumerRecord<?, ?> consumerRecord) {
193194
Header header = consumerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
194-
return header != null
195-
? header.value()[0]
196-
: 1;
195+
if (header != null) {
196+
byte[] value = header.value();
197+
if (value.length == 1) { // backwards compatibility
198+
return value[0];
199+
}
200+
else if (value.length == 4) {
201+
return ByteBuffer.wrap(value).getInt();
202+
}
203+
else {
204+
LOGGER.debug(() -> "Unexected size for " + RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS + " header: "
205+
+ value.length);
206+
}
207+
}
208+
return 1;
197209
}
198210

199211
private Headers addHeaders(ConsumerRecord<?, ?> consumerRecord, Exception e, int attempts) {
200212
Headers headers = new RecordHeaders();
201213
byte[] originalTimestampHeader = getOriginalTimestampHeaderBytes(consumerRecord);
202214
headers.add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, originalTimestampHeader);
203215
headers.add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS,
204-
BigInteger.valueOf(attempts + 1).toByteArray());
216+
ByteBuffer.wrap(new byte[4]).putInt(attempts + 1).array());
205217
headers.add(RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP,
206218
BigInteger.valueOf(getNextExecutionTimestamp(consumerRecord, e, originalTimestampHeader))
207219
.toByteArray());

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactoryTests.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import static org.mockito.Mockito.times;
2626

2727
import java.math.BigInteger;
28+
import java.nio.ByteBuffer;
2829
import java.time.Clock;
2930
import java.time.Instant;
3031
import java.time.LocalDateTime;
@@ -127,22 +128,22 @@ void shouldSendMessage() {
127128
// assert headers
128129
Header attemptsHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
129130
assertThat(attemptsHeader).isNotNull();
130-
assertThat(attemptsHeader.value()[0]).isEqualTo(Integer.valueOf(2).byteValue());
131+
assertThat(ByteBuffer.wrap(attemptsHeader.value()).getInt()).isEqualTo(2);
131132
Header timestampHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP);
132133
assertThat(timestampHeader).isNotNull();
133134
assertThat(new BigInteger(timestampHeader.value()).longValue()).isEqualTo(failureTimestamp + 1000L);
134135
}
135136

136137
@Test
137-
void shouldIncreaseAttempts() {
138+
void shouldIncreaseAttemptsInLegacyHeader() {
138139

139140
// setup
140141
RuntimeException e = new RuntimeException();
141142
ConsumerRecord consumerRecord = new ConsumerRecord(testTopic, 0, 0, key, value);
142-
consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, BigInteger.valueOf(1).toByteArray());
143+
consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS, BigInteger.valueOf(127).toByteArray());
143144
consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, this.originalTimestampBytes);
144145

145-
given(destinationTopicResolver.resolveDestinationTopic(testTopic, 1, e, originalTimestamp))
146+
given(destinationTopicResolver.resolveDestinationTopic(testTopic, 127, e, originalTimestamp))
146147
.willReturn(destinationTopic);
147148
given(destinationTopic.isNoOpsTopic()).willReturn(false);
148149
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
@@ -161,7 +162,41 @@ void shouldIncreaseAttempts() {
161162
ProducerRecord producerRecord = producerRecordCaptor.getValue();
162163
Header attemptsHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
163164
assertThat(attemptsHeader).isNotNull();
164-
assertThat(attemptsHeader.value()[0]).isEqualTo(Integer.valueOf(2).byteValue());
165+
assertThat(attemptsHeader.value().length).isEqualTo(4); // handled a legacy one byte header ok
166+
assertThat(ByteBuffer.wrap(attemptsHeader.value()).getInt()).isEqualTo(128);
167+
}
168+
169+
@Test
170+
void shouldIncreaseAttemptsInNewHeader() {
171+
172+
// setup
173+
RuntimeException e = new RuntimeException();
174+
ConsumerRecord consumerRecord = new ConsumerRecord(testTopic, 0, 0, key, value);
175+
consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS,
176+
ByteBuffer.wrap(new byte[4]).putInt(127).array());
177+
consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, this.originalTimestampBytes);
178+
179+
given(destinationTopicResolver.resolveDestinationTopic(testTopic, 127, e, originalTimestamp))
180+
.willReturn(destinationTopic);
181+
given(destinationTopic.isNoOpsTopic()).willReturn(false);
182+
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
183+
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
184+
willReturn(kafkaOperations).given(destinationTopic).getKafkaOperations();
185+
given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture);
186+
187+
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver);
188+
189+
// when
190+
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = factory.create();
191+
deadLetterPublishingRecoverer.accept(consumerRecord, e);
192+
193+
// then
194+
then(kafkaOperations).should(times(1)).send(producerRecordCaptor.capture());
195+
ProducerRecord producerRecord = producerRecordCaptor.getValue();
196+
Header attemptsHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
197+
assertThat(attemptsHeader).isNotNull();
198+
assertThat(attemptsHeader.value().length).isEqualTo(4);
199+
assertThat(ByteBuffer.wrap(attemptsHeader.value()).getInt()).isEqualTo(128);
165200
}
166201

167202
@Test

0 commit comments

Comments
 (0)