Skip to content

Commit 45be8f0

Browse files
GH-1766 - Use next topic's delay in retry topic (#1767)
Fixes #1766
1 parent 99ff5cf commit 45be8f0

File tree

2 files changed

+16
-24
lines changed

2 files changed

+16
-24
lines changed

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,8 @@ private long getNextExecutionTimestamp(ConsumerRecord<?, ?> consumerRecord, Exce
144144
long originalTimestamp = new BigInteger(originalTimestampHeader).longValue();
145145
long failureTimestamp = getFailureTimestamp(e);
146146
long nextExecutionTimestamp = failureTimestamp + this.destinationTopicResolver
147-
.getDestinationTopicByName(consumerRecord.topic()).getDestinationDelay();
147+
.resolveDestinationTopic(consumerRecord.topic(), getAttempts(consumerRecord), e, originalTimestamp)
148+
.getDestinationDelay();
148149
LOGGER.debug(() -> String.format("FailureTimestamp: %s, Original timestamp: %s, nextExecutionTimestamp: %s",
149150
failureTimestamp, originalTimestamp, nextExecutionTimestamp));
150151
return nextExecutionTimestamp;

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactoryTests.java

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,6 @@ class DeadLetterPublishingRecovererFactoryTests {
7979
@Mock
8080
private KafkaOperations<?, ?> kafkaOperations;
8181

82-
@Mock
83-
private KafkaOperations<?, ?> kafkaOperations2;
84-
8582
@Mock
8683
private ListenableFuture<?> listenableFuture;
8784

@@ -106,11 +103,10 @@ void shouldSendMessage() {
106103
given(destinationTopic.isNoOpsTopic()).willReturn(false);
107104
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
108105
given(destinationTopic.getDestinationPartitions()).willReturn(3);
109-
given(destinationTopicResolver.getDestinationTopicByName(testTopic)).willReturn(destinationTopic);
110106
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
111107
given(destinationTopic.getDestinationDelay()).willReturn(1000L);
112-
willReturn(this.kafkaOperations2).given(destinationTopic).getKafkaOperations();
113-
given(kafkaOperations2.send(any(ProducerRecord.class))).willReturn(listenableFuture);
108+
willReturn(this.kafkaOperations).given(destinationTopic).getKafkaOperations();
109+
given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture);
114110
this.consumerRecord.headers().add(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP, originalTimestampBytes);
115111

116112
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver);
@@ -120,7 +116,7 @@ void shouldSendMessage() {
120116
deadLetterPublishingRecoverer.accept(this.consumerRecord, e);
121117

122118
// then
123-
then(kafkaOperations2).should(times(1)).send(producerRecordCaptor.capture());
119+
then(kafkaOperations).should(times(1)).send(producerRecordCaptor.capture());
124120
ProducerRecord producerRecord = producerRecordCaptor.getValue();
125121
assertThat(producerRecord.topic()).isEqualTo(testRetryTopic);
126122
assertThat(producerRecord.value()).isEqualTo(value);
@@ -151,9 +147,8 @@ void shouldIncreaseAttempts() {
151147
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
152148
given(destinationTopic.getDestinationPartitions()).willReturn(1);
153149
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
154-
given(destinationTopicResolver.getDestinationTopicByName(testTopic)).willReturn(destinationTopic);
155-
willReturn(kafkaOperations2).given(destinationTopic).getKafkaOperations();
156-
given(kafkaOperations2.send(any(ProducerRecord.class))).willReturn(listenableFuture);
150+
willReturn(kafkaOperations).given(destinationTopic).getKafkaOperations();
151+
given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture);
157152

158153
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver);
159154

@@ -162,7 +157,7 @@ void shouldIncreaseAttempts() {
162157
deadLetterPublishingRecoverer.accept(consumerRecord, e);
163158

164159
// then
165-
then(kafkaOperations2).should(times(1)).send(producerRecordCaptor.capture());
160+
then(kafkaOperations).should(times(1)).send(producerRecordCaptor.capture());
166161
ProducerRecord producerRecord = producerRecordCaptor.getValue();
167162
Header attemptsHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
168163
assertThat(attemptsHeader).isNotNull();
@@ -182,10 +177,8 @@ void shouldAddOriginalTimestampHeader() {
182177
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
183178
given(destinationTopic.getDestinationPartitions()).willReturn(1);
184179
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
185-
long nextExecutionTimestamp = this.nowTimestamp + destinationTopic.getDestinationDelay();
186-
given(destinationTopicResolver.getDestinationTopicByName(testTopic)).willReturn(destinationTopic);
187-
willReturn(this.kafkaOperations2).given(destinationTopic).getKafkaOperations();
188-
given(kafkaOperations2.send(any(ProducerRecord.class))).willReturn(listenableFuture);
180+
willReturn(this.kafkaOperations).given(destinationTopic).getKafkaOperations();
181+
given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture);
189182

190183
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver);
191184

@@ -194,7 +187,7 @@ void shouldAddOriginalTimestampHeader() {
194187
deadLetterPublishingRecoverer.accept(consumerRecord, e);
195188

196189
// then
197-
then(kafkaOperations2).should(times(1)).send(producerRecordCaptor.capture());
190+
then(kafkaOperations).should(times(1)).send(producerRecordCaptor.capture());
198191
ProducerRecord producerRecord = producerRecordCaptor.getValue();
199192
Header originalTimestampHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP);
200193
assertThat(originalTimestampHeader).isNotNull();
@@ -215,10 +208,8 @@ void shouldNotReplaceOriginalTimestampHeader() {
215208
given(destinationTopic.getDestinationName()).willReturn(testRetryTopic);
216209
given(destinationTopic.getDestinationPartitions()).willReturn(1);
217210
given(destinationTopicResolver.getDestinationTopicByName(testRetryTopic)).willReturn(destinationTopic);
218-
long nextExecutionTimestamp = this.nowTimestamp + destinationTopic.getDestinationDelay();
219-
given(destinationTopicResolver.getDestinationTopicByName(testTopic)).willReturn(destinationTopic);
220-
willReturn(this.kafkaOperations2).given(destinationTopic).getKafkaOperations();
221-
given(kafkaOperations2.send(any(ProducerRecord.class))).willReturn(listenableFuture);
211+
willReturn(this.kafkaOperations).given(destinationTopic).getKafkaOperations();
212+
given(kafkaOperations.send(any(ProducerRecord.class))).willReturn(listenableFuture);
222213

223214
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(this.destinationTopicResolver);
224215

@@ -227,7 +218,7 @@ void shouldNotReplaceOriginalTimestampHeader() {
227218
deadLetterPublishingRecoverer.accept(consumerRecord, e);
228219

229220
// then
230-
then(kafkaOperations2).should(times(1)).send(producerRecordCaptor.capture());
221+
then(kafkaOperations).should(times(1)).send(producerRecordCaptor.capture());
231222
ProducerRecord producerRecord = producerRecordCaptor.getValue();
232223
Header originalTimestampHeader = producerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ORIGINAL_TIMESTAMP);
233224
assertThat(originalTimestampHeader).isNotNull();
@@ -249,7 +240,7 @@ void shouldNotSendMessageIfNoOpsDestination() {
249240
deadLetterPublishingRecoverer.accept(this.consumerRecord, e);
250241

251242
// then
252-
then(kafkaOperations2).should(times(0)).send(any(ProducerRecord.class));
243+
then(kafkaOperations).should(times(0)).send(any(ProducerRecord.class));
253244
}
254245

255246
@Test
@@ -264,7 +255,7 @@ void shouldThrowIfKafkaBackoffException() {
264255
.isThrownBy(() -> deadLetterPublishingRecoverer.accept(this.consumerRecord, e));
265256

266257
// then
267-
then(kafkaOperations2).should(times(0)).send(any(ProducerRecord.class));
258+
then(kafkaOperations).should(times(0)).send(any(ProducerRecord.class));
268259
}
269260

270261
@Test

0 commit comments

Comments
 (0)