Skip to content

Commit f736bd9

Browse files
garyrussellartembilan
authored andcommitted
Prepare for GH-1480
Deprecate existing ARP method ready for the new method in 2.7. **Merge to 2.6.x only**
1 parent 5609205 commit f736bd9

File tree

5 files changed

+62
-27
lines changed

5 files changed

+62
-27
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AfterRollbackProcessor.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
2323

2424
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
25+
import org.springframework.lang.Nullable;
2526

2627
/**
2728
* Invoked by a listener container with remaining, unprocessed, records
@@ -58,7 +59,7 @@ public interface AfterRollbackProcessor<K, V> {
5859
* @since 2.2
5960
* @see #isProcessInTransaction()
6061
* @deprecated in favor of
61-
* {@link #process(List, Consumer, Exception, boolean, ContainerProperties.EOSMode)}.
62+
* {@link #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)}.
6263
*/
6364
@Deprecated
6465
void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception, boolean recoverable);
@@ -81,14 +82,42 @@ public interface AfterRollbackProcessor<K, V> {
8182
* @param recoverable the recoverable.
8283
* @param eosMode the {@link EOSMode}.
8384
* @since 2.5.3
85+
* @deprecated in favor of
86+
* {@link #process(List, Consumer, MessageListenerContainer, Exception, boolean, EOSMode)}.
8487
* @see #isProcessInTransaction()
8588
*/
89+
@Deprecated
8690
default void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
8791
boolean recoverable, EOSMode eosMode) {
8892

8993
process(records, consumer, exception, recoverable);
9094
}
9195

96+
/**
97+
* Process the remaining records. Recoverable will be true if the container is
98+
* processing individual records; this allows the processor to recover (skip) the
99+
* failed record rather than re-seeking it. This is not possible with a batch listener
100+
* since only the listener itself knows which record in the batch keeps failing.
101+
* IMPORTANT: If invoked in a transaction when the listener was invoked with a single
102+
* record, the transaction id will be based on the container group.id and the
103+
* topic/partition of the failed record, to avoid issues with zombie fencing. So,
104+
* generally, only its offset should be sent to the transaction. For other behavior
105+
* the process method should manage its own transaction.
106+
* @param records the records.
107+
* @param consumer the consumer.
108+
* @param container the container or parent container.
109+
* @param exception the exception
110+
* @param recoverable the recoverable.
111+
* @param eosMode the {@link EOSMode}.
112+
* @since 2.6.6
113+
* @see #isProcessInTransaction()
114+
*/
115+
default void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
116+
@Nullable MessageListenerContainer container, Exception exception, boolean recoverable, EOSMode eosMode) {
117+
118+
process(records, consumer, exception, recoverable, eosMode);
119+
}
120+
92121
/**
93122
* Optional method to clear thread state; will be called just before a consumer
94123
* thread terminates.
@@ -100,13 +129,13 @@ default void clearThreadState() {
100129

101130
/**
102131
* Return true to invoke
103-
* {@link #process(List, Consumer, Exception, boolean, ContainerProperties.EOSMode)}
132+
* {@link #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)}
104133
* in a new transaction. Because the container cannot infer the desired behavior, the
105134
* processor is responsible for sending the offset to the transaction if it decides to
106135
* skip the failing record.
107136
* @return true to run in a transaction; default false.
108137
* @since 2.2.5
109-
* @see #process(List, Consumer, Exception, boolean, ContainerProperties.EOSMode)
138+
* @see #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)
110139
*/
111140
default boolean isProcessInTransaction() {
112141
return false;

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -141,12 +141,13 @@ private void checkConfig() {
141141
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
142142
boolean recoverable) {
143143

144-
process(records, consumer, exception, recoverable, EOSMode.ALPHA);
144+
process(records, consumer, null, exception, recoverable, EOSMode.ALPHA);
145145
}
146146

147147
@SuppressWarnings({ "unchecked", "rawtypes" })
148148
@Override
149-
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer, Exception exception,
149+
public void process(List<ConsumerRecord<K, V>> records, Consumer<K, V> consumer,
150+
@Nullable MessageListenerContainer container, Exception exception,
150151
boolean recoverable, @Nullable EOSMode eosMode) {
151152

152153
if (SeekUtils.doSeeks(((List) records), consumer, exception, recoverable,
@@ -177,15 +178,15 @@ public boolean isProcessInTransaction() {
177178

178179
/**
179180
* {@inheritDoc} Set to true and the container will run the
180-
* {@link #process(List, Consumer, Exception, boolean, ContainerProperties.EOSMode)}
181+
* {@link #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)}
181182
* method in a transaction and, if a record is skipped and recovered, we will send its
182183
* offset to the transaction. Requires a {@link KafkaOperations}.
183184
* @param commitRecovered true to process in a transaction.
184185
* @since 2.3
185186
* @deprecated in favor of
186187
* {@link #DefaultAfterRollbackProcessor(BiConsumer, BackOff, KafkaOperations, boolean)}.
187188
* @see #isProcessInTransaction()
188-
* @see #process(List, Consumer, Exception, boolean, ContainerProperties.EOSMode)
189+
* @see #process(List, Consumer, MessageListenerContainer, Exception, boolean, ContainerProperties.EOSMode)
189190
*/
190191
@Deprecated
191192
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1615,12 +1615,14 @@ private void batchAfterRollback(final ConsumerRecords<K, V> records,
16151615
RuntimeException rollbackException = decorateException(e);
16161616
try {
16171617
if (recordList == null) {
1618-
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, rollbackException,
1619-
false, this.eosMode);
1618+
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer,
1619+
KafkaMessageListenerContainer.this.thisOrParentContainer,
1620+
rollbackException, false, this.eosMode);
16201621
}
16211622
else {
1622-
afterRollbackProcessorToUse.process(recordList, this.consumer, rollbackException, false,
1623-
this.eosMode);
1623+
afterRollbackProcessorToUse.process(recordList, this.consumer,
1624+
KafkaMessageListenerContainer.this.thisOrParentContainer,
1625+
rollbackException, false, this.eosMode);
16241626
}
16251627
}
16261628
catch (KafkaException ke) {
@@ -1891,15 +1893,17 @@ private void recordAfterRollback(Iterator<ConsumerRecord<K, V>> iterator, final
18911893

18921894
@Override
18931895
protected void doInTransactionWithoutResult(TransactionStatus status) {
1894-
afterRollbackProcessorToUse.process(unprocessed, ListenerConsumer.this.consumer, e, true,
1896+
afterRollbackProcessorToUse.process(unprocessed, ListenerConsumer.this.consumer,
1897+
KafkaMessageListenerContainer.this.thisOrParentContainer, e, true,
18951898
ListenerConsumer.this.eosMode);
18961899
}
18971900

18981901
});
18991902
}
19001903
else {
19011904
try {
1902-
afterRollbackProcessorToUse.process(unprocessed, this.consumer, e, true, this.eosMode);
1905+
afterRollbackProcessorToUse.process(unprocessed, this.consumer,
1906+
KafkaMessageListenerContainer.this.thisOrParentContainer, e, true, this.eosMode);
19031907
}
19041908
catch (KafkaException ke) {
19051909
ke.selfLog("AfterRollbackProcessor threw an exception", this.logger);

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2020 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -74,18 +74,18 @@ void testClassifier() {
7474
@SuppressWarnings("unchecked")
7575
Consumer<String, String> consumer = mock(Consumer.class);
7676
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
77-
processor.process(records, consumer, illegalState, true, EOSMode.ALPHA);
78-
processor.process(records, consumer, new DeserializationException("intended", null, false, illegalState), true,
79-
EOSMode.ALPHA);
77+
processor.process(records, consumer, null, illegalState, true, EOSMode.ALPHA);
78+
processor.process(records, consumer, null, new DeserializationException("intended", null, false, illegalState),
79+
true, EOSMode.ALPHA);
8080
verify(template).sendOffsetsToTransaction(anyMap());
8181
verify(template, never()).sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class));
8282
assertThat(recovered.get()).isSameAs(record1);
8383
processor.addNotRetryableExceptions(IllegalStateException.class);
8484
recovered.set(null);
8585
recovererShouldFail.set(true);
86-
processor.process(records, consumer, illegalState, true, EOSMode.ALPHA);
86+
processor.process(records, consumer, null, illegalState, true, EOSMode.ALPHA);
8787
verify(template, times(1)).sendOffsetsToTransaction(anyMap()); // recovery failed
88-
processor.process(records, consumer, illegalState, true, EOSMode.BETA);
88+
processor.process(records, consumer, null, illegalState, true, EOSMode.BETA);
8989
verify(template, times(1)).sendOffsetsToTransaction(anyMap(), any(ConsumerGroupMetadata.class));
9090
assertThat(recovered.get()).isSameAs(record1);
9191
InOrder inOrder = inOrder(consumer);
@@ -120,12 +120,12 @@ void testBatchBackOff() {
120120
@SuppressWarnings("unchecked")
121121
Consumer<String, String> consumer = mock(Consumer.class);
122122
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
123-
processor.process(records, consumer, illegalState, false, EOSMode.BETA);
124-
processor.process(records, consumer, illegalState, false, EOSMode.BETA);
123+
processor.process(records, consumer, null, illegalState, false, EOSMode.BETA);
124+
processor.process(records, consumer, null, illegalState, false, EOSMode.BETA);
125125
verify(backOff, times(2)).start();
126126
verify(execution.get(), times(2)).nextBackOff();
127127
processor.clearThreadState();
128-
processor.process(records, consumer, illegalState, false, EOSMode.BETA);
128+
processor.process(records, consumer, null, illegalState, false, EOSMode.BETA);
129129
verify(backOff, times(3)).start();
130130
}
131131

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -810,7 +810,8 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
810810
verify(afterRollbackProcessor, times(4)).isProcessInTransaction();
811811
ArgumentCaptor<Exception> captor = ArgumentCaptor.forClass(Exception.class);
812812
verify(afterRollbackProcessor, never()).process(any(), any(), captor.capture(), anyBoolean());
813-
verify(afterRollbackProcessor, times(4)).process(any(), any(), captor.capture(), anyBoolean(), any());
813+
verify(afterRollbackProcessor, never()).process(any(), any(), captor.capture(), anyBoolean(), any());
814+
verify(afterRollbackProcessor, times(4)).process(any(), any(), any(), captor.capture(), anyBoolean(), any());
814815
assertThat(captor.getValue()).isInstanceOf(ListenerExecutionFailedException.class)
815816
.extracting(ex -> ((ListenerExecutionFailedException) ex).getGroupId())
816817
.isEqualTo("groupInARBP");
@@ -943,7 +944,7 @@ void testNoAfterRollbackWhenFenced() throws Exception {
943944
inOrder.verifyNoMoreInteractions();
944945
assertThat(deliveryCount.get()).isEqualTo(1);
945946

946-
verify(arp, never()).process(any(), any(), any(), anyBoolean(), any());
947+
verify(arp, never()).process(any(), any(), any(), any(), anyBoolean(), any());
947948

948949
assertThat(KafkaTestUtils.getPropertyValue(container,
949950
"listenerConsumer.transactionTemplate.timeout", Integer.class))

0 commit comments

Comments
 (0)