Skip to content

Commit 89b6ffc

Browse files
authored
GH-1612: Option: Producer Fenced: Stop Container
Resolves #1612 **cherry-pick to 2.5.x** * * Add @SInCE to javadocs; retain route cause of `StopAfterFenceException`. * * Add reason to `ConsumerStoppedEvent`. Resolves #1618 Also provide access to the actual container that stopped the consumer, for example to allow restarting after stopping due to a producer fenced exception. * * Add `@Nullable`s. * * Test Polishing.
1 parent 078474e commit 89b6ffc

File tree

5 files changed

+196
-36
lines changed

5 files changed

+196
-36
lines changed

spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerStoppedEvent.java

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2019 the original author or authors.
2+
* Copyright 2018-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.event;
1818

19+
import org.springframework.lang.Nullable;
20+
1921
/**
2022
* An event published when a consumer is stopped. While it is best practice to use
2123
* stateless listeners, you can consume this event to clean up any thread-based resources
@@ -30,6 +32,30 @@ public class ConsumerStoppedEvent extends KafkaEvent {
3032

3133
private static final long serialVersionUID = 1L;
3234

35+
public enum Reason {
36+
37+
/**
38+
* The consumer was stopped because the container was stopped.
39+
*/
40+
NORMAL,
41+
42+
/**
43+
* The transactional producer was fenced anf the container
44+
* {@code stopContainerWhenFenced} property is true.
45+
*/
46+
FENCED,
47+
48+
/**
49+
* A {@link java.lang.Error} was thrown.
50+
*/
51+
ERROR
52+
53+
}
54+
55+
private final Reason reason;
56+
57+
private final Object container;
58+
3359
/**
3460
* Construct an instance with the provided source.
3561
* @param source the container.
@@ -45,13 +71,52 @@ public ConsumerStoppedEvent(Object source) {
4571
* @param container the container or the parent container if the container is a child.
4672
* @since 2.2.1
4773
*/
74+
@Deprecated
4875
public ConsumerStoppedEvent(Object source, Object container) {
76+
this(source, container, null, Reason.NORMAL);
77+
}
78+
79+
/**
80+
* Construct an instance with the provided source and container.
81+
* @param source the container instance that generated the event.
82+
* @param container the container or the parent container if the container is a child.
83+
* @param childContainer the child container, or null.
84+
* @param reason the reason.
85+
* @since 2.5.8
86+
*/
87+
public ConsumerStoppedEvent(Object source, Object container, @Nullable Object childContainer,
88+
Reason reason) {
89+
4990
super(source, container);
91+
this.container = childContainer;
92+
this.reason = reason;
93+
}
94+
95+
/**
96+
* Return the reason why the consumer was stopped.
97+
* @return the reason.
98+
* @since 2.5.8
99+
*/
100+
public Reason getReason() {
101+
return this.reason;
102+
}
103+
104+
/**
105+
* Return the container that the Consumer belonged to.
106+
* @param <T> the container type.
107+
* @return the container.
108+
* @since 2.5.8
109+
*/
110+
@SuppressWarnings("unchecked")
111+
public <T> T getContainer() {
112+
return this.container == null ? (T) getSource() : (T) this.container;
50113
}
51114

52115
@Override
53116
public String toString() {
54-
return "ConsumerStoppedEvent [source=" + getSource() + "]";
117+
return "ConsumerStoppedEvent [source=" + getSource()
118+
+ (this.container == null ? "" : (", container=" + this.container))
119+
+ ", reason=" + this.reason + "]";
55120
}
56121

57122
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,8 @@ public enum EOSMode {
254254

255255
private TransactionDefinition transactionDefinition;
256256

257+
private boolean stopContainerWhenFenced;
258+
257259
/**
258260
* Create properties for a container that will subscribe to the specified topics.
259261
* @param topics the topics.
@@ -745,6 +747,30 @@ public void setAdviceChain(Advice... adviceChain) {
745747
}
746748
}
747749

750+
/**
751+
* When true, the container will stop after a
752+
* {@link org.apache.kafka.common.errors.ProducerFencedException}.
753+
* @return the stopContainerWhenFenced
754+
* @since 2.5.8
755+
*/
756+
public boolean isStopContainerWhenFenced() {
757+
return this.stopContainerWhenFenced;
758+
}
759+
760+
/**
761+
* Set to true to stop the container when a
762+
* {@link org.apache.kafka.common.errors.ProducerFencedException} is thrown.
763+
* Currently, there is no way to determine if such an exception is thrown due to a
764+
* rebalance Vs. a timeout. We therefore cannot call the after rollback processor. The
765+
* best solution is to ensure that the {@code transaction.timeout.ms} is large enough
766+
* so that transactions don't time out.
767+
* @param stopContainerWhenFenced true to stop the container.
768+
* @since 2.5.8
769+
*/
770+
public void setStopContainerWhenFenced(boolean stopContainerWhenFenced) {
771+
this.stopContainerWhenFenced = stopContainerWhenFenced;
772+
}
773+
748774
private void adviseListenerIfNeeded() {
749775
if (!CollectionUtils.isEmpty(this.adviceChain)) {
750776
if (AopUtils.isAopProxy(this.messageListener)) {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.springframework.kafka.event.ConsumerStartedEvent;
8181
import org.springframework.kafka.event.ConsumerStartingEvent;
8282
import org.springframework.kafka.event.ConsumerStoppedEvent;
83+
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
8384
import org.springframework.kafka.event.ConsumerStoppingEvent;
8485
import org.springframework.kafka.event.ListenerContainerIdleEvent;
8586
import org.springframework.kafka.event.ListenerContainerNoLongerIdleEvent;
@@ -401,9 +402,15 @@ private void publishConsumerStoppingEvent(Consumer<?, ?> consumer) {
401402
}
402403
}
403404

404-
private void publishConsumerStoppedEvent() {
405+
private void publishConsumerStoppedEvent(@Nullable Throwable throwable) {
405406
if (getApplicationEventPublisher() != null) {
406-
getApplicationEventPublisher().publishEvent(new ConsumerStoppedEvent(this, this.thisOrParentContainer));
407+
getApplicationEventPublisher().publishEvent(new ConsumerStoppedEvent(this, this.thisOrParentContainer,
408+
this.thisOrParentContainer.equals(this) ? null : this,
409+
throwable instanceof Error
410+
? Reason.ERROR
411+
: throwable instanceof StopAfterFenceException
412+
? Reason.FENCED
413+
: Reason.NORMAL));
407414
}
408415
}
409416

@@ -1076,20 +1083,20 @@ public void run() {
10761083
+ "' has been fenced");
10771084
break;
10781085
}
1079-
catch (Exception e) {
1080-
handleConsumerException(e);
1081-
}
1082-
catch (Error e) { // NOSONAR - rethrown
1086+
catch (StopAfterFenceException | Error e) { // NOSONAR - rethrown
10831087
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
10841088
if (runnable != null) {
10851089
runnable.run();
10861090
}
10871091
this.logger.error(e, "Stopping container due to an Error");
1088-
wrapUp();
1092+
wrapUp(e);
10891093
throw e;
10901094
}
1095+
catch (Exception e) {
1096+
handleConsumerException(e);
1097+
}
10911098
}
1092-
wrapUp();
1099+
wrapUp(null);
10931100
}
10941101

10951102
private void setupSeeks() {
@@ -1331,7 +1338,7 @@ private void idleBetweenPollIfNecessary() {
13311338
}
13321339
}
13331340

1334-
private void wrapUp() {
1341+
private void wrapUp(@Nullable Throwable throwable) {
13351342
KafkaUtils.clearConsumerGroupId();
13361343
if (this.micrometerHolder != null) {
13371344
this.micrometerHolder.destroy();
@@ -1370,7 +1377,7 @@ private void wrapUp() {
13701377
this.consumerSeekAwareListener.unregisterSeekCallback();
13711378
}
13721379
this.logger.info(() -> getGroupId() + ": Consumer stopped");
1373-
publishConsumerStoppedEvent();
1380+
publishConsumerStoppedEvent(throwable);
13741381
}
13751382

13761383
/**
@@ -1540,6 +1547,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
15401547
this.logger.error(e, "Producer or '"
15411548
+ ConsumerConfig.GROUP_INSTANCE_ID_CONFIG
15421549
+ "' fenced during transaction");
1550+
if (this.containerProperties.isStopContainerWhenFenced()) {
1551+
throw new StopAfterFenceException("Container stopping due to fencing", e);
1552+
}
15431553
}
15441554
catch (RuntimeException e) {
15451555
this.logger.error(e, "Transaction rolled back");
@@ -1806,6 +1816,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
18061816
}
18071817
catch (ProducerFencedException | FencedInstanceIdException e) {
18081818
this.logger.error(e, "Producer or 'group.instance.id' fenced during transaction");
1819+
if (this.containerProperties.isStopContainerWhenFenced()) {
1820+
throw new StopAfterFenceException("Container stopping due to fencing", e);
1821+
}
18091822
break;
18101823
}
18111824
catch (RuntimeException e) {
@@ -2093,12 +2106,7 @@ else if (this.producer != null
20932106
this.acks.add(record);
20942107
}
20952108
if (this.producer != null) {
2096-
try {
2097-
sendOffsetsToTransaction();
2098-
}
2099-
catch (Exception e) {
2100-
this.logger.error(e, "Send offsets to transaction failed");
2101-
}
2109+
sendOffsetsToTransaction();
21022110
}
21032111
}
21042112

@@ -2848,4 +2856,13 @@ public void onSuccess(Object result) {
28482856

28492857
}
28502858

2859+
@SuppressWarnings("serial")
2860+
private static class StopAfterFenceException extends KafkaException {
2861+
2862+
StopAfterFenceException(String message, Throwable t) {
2863+
super(message, t);
2864+
}
2865+
2866+
}
2867+
28512868
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 60 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
import org.springframework.kafka.core.ProducerFactory;
8585
import org.springframework.kafka.core.ProducerFactoryUtils;
8686
import org.springframework.kafka.event.ConsumerStoppedEvent;
87+
import org.springframework.kafka.event.ConsumerStoppedEvent.Reason;
8788
import org.springframework.kafka.event.ListenerContainerIdleEvent;
8889
import org.springframework.kafka.listener.ContainerProperties.AckMode;
8990
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
@@ -172,15 +173,29 @@ public void testConsumeAndProduceTransactionKTM_BETA() throws Exception {
172173
testConsumeAndProduceTransactionGuts(false, false, AckMode.RECORD, EOSMode.BETA);
173174
}
174175

176+
@Test
177+
public void testConsumeAndProduceTransactionStopWhenFenced() throws Exception {
178+
testConsumeAndProduceTransactionGuts(false, false, AckMode.RECORD, EOSMode.BETA, true);
179+
}
180+
175181
@SuppressWarnings({ "rawtypes", "unchecked" })
176182
private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handleError, AckMode ackMode,
177183
EOSMode eosMode) throws Exception {
178184

185+
testConsumeAndProduceTransactionGuts(chained, handleError, ackMode, eosMode, false);
186+
}
187+
188+
@SuppressWarnings({ "rawtypes", "unchecked" })
189+
private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handleError, AckMode ackMode,
190+
EOSMode eosMode, boolean stopWhenFenced) throws Exception {
191+
179192
Consumer consumer = mock(Consumer.class);
193+
AtomicBoolean assigned = new AtomicBoolean();
180194
final TopicPartition topicPartition = new TopicPartition("foo", 0);
181195
willAnswer(i -> {
182196
((ConsumerRebalanceListener) i.getArgument(1))
183197
.onPartitionsAssigned(Collections.singletonList(topicPartition));
198+
assigned.set(true);
184199
return null;
185200
}).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class));
186201
ConsumerRecords records = new ConsumerRecords(Collections.singletonMap(topicPartition,
@@ -199,6 +214,14 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
199214
ConsumerFactory cf = mock(ConsumerFactory.class);
200215
willReturn(consumer).given(cf).createConsumer("group", "", null, KafkaTestUtils.defaultPropertyOverrides());
201216
Producer producer = mock(Producer.class);
217+
if (stopWhenFenced) {
218+
willAnswer(inv -> {
219+
if (assigned.get()) {
220+
throw new ProducerFencedException("fenced");
221+
}
222+
return null;
223+
}).given(producer).sendOffsetsToTransaction(any(), any(ConsumerGroupMetadata.class));
224+
}
202225
given(producer.send(any(), any())).willReturn(new SettableListenableFuture<>());
203226
final CountDownLatch closeLatch = new CountDownLatch(2);
204227
willAnswer(i -> {
@@ -224,6 +247,7 @@ private void testConsumeAndProduceTransactionGuts(boolean chained, boolean handl
224247
props.setTransactionManager(ptm);
225248
props.setAssignmentCommitOption(AssignmentCommitOption.ALWAYS);
226249
props.setEosMode(eosMode);
250+
props.setStopContainerWhenFenced(stopWhenFenced);
227251
ConsumerGroupMetadata consumerGroupMetadata = new ConsumerGroupMetadata("group");
228252
given(consumer.groupMetadata()).willReturn(consumerGroupMetadata);
229253
final KafkaTemplate template = new KafkaTemplate(pf);
@@ -260,6 +284,14 @@ public void onMessage(Object data) {
260284
if (handleError) {
261285
container.setErrorHandler((e, data) -> { });
262286
}
287+
CountDownLatch stopEventLatch = new CountDownLatch(1);
288+
AtomicReference<ConsumerStoppedEvent> stopEvent = new AtomicReference<>();
289+
container.setApplicationEventPublisher(event -> {
290+
if (event instanceof ConsumerStoppedEvent) {
291+
stopEvent.set((ConsumerStoppedEvent) event);
292+
stopEventLatch.countDown();
293+
}
294+
});
263295
container.start();
264296
assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue();
265297
InOrder inOrder = inOrder(producer);
@@ -272,27 +304,37 @@ public void onMessage(Object data) {
272304
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
273305
new OffsetAndMetadata(0)), consumerGroupMetadata);
274306
}
275-
inOrder.verify(producer).commitTransaction();
276-
inOrder.verify(producer).close(any());
277-
inOrder.verify(producer).beginTransaction();
278-
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
279-
inOrder.verify(producer).send(captor.capture(), any(Callback.class));
280-
assertThat(captor.getValue()).isEqualTo(new ProducerRecord("bar", "baz"));
281-
if (eosMode.equals(EOSMode.ALPHA)) {
282-
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
283-
new OffsetAndMetadata(1)), "group");
307+
if (stopWhenFenced) {
308+
assertThat(stopEventLatch.await(10, TimeUnit.SECONDS)).isTrue();
309+
assertThat(stopEvent.get().getReason()).isEqualTo(Reason.FENCED);
284310
}
285311
else {
286-
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
287-
new OffsetAndMetadata(1)), consumerGroupMetadata);
312+
inOrder.verify(producer).commitTransaction();
313+
inOrder.verify(producer).close(any());
314+
inOrder.verify(producer).beginTransaction();
315+
ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
316+
inOrder.verify(producer).send(captor.capture(), any(Callback.class));
317+
assertThat(captor.getValue()).isEqualTo(new ProducerRecord("bar", "baz"));
318+
if (eosMode.equals(EOSMode.ALPHA)) {
319+
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
320+
new OffsetAndMetadata(1)), "group");
321+
}
322+
else {
323+
inOrder.verify(producer).sendOffsetsToTransaction(Collections.singletonMap(topicPartition,
324+
new OffsetAndMetadata(1)), consumerGroupMetadata);
325+
}
326+
inOrder.verify(producer).commitTransaction();
327+
inOrder.verify(producer).close(any());
328+
container.stop();
329+
verify(pf, times(2)).createProducer(isNull());
330+
verifyNoMoreInteractions(producer);
331+
assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0");
332+
assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0");
333+
assertThat(stopEventLatch.await(10, TimeUnit.SECONDS)).isTrue();
334+
assertThat(stopEvent.get().getReason()).isEqualTo(Reason.NORMAL);
288335
}
289-
inOrder.verify(producer).commitTransaction();
290-
inOrder.verify(producer).close(any());
291-
container.stop();
292-
verify(pf, times(2)).createProducer(isNull());
293-
verifyNoMoreInteractions(producer);
294-
assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0");
295-
assertThat(transactionalIds.get(0)).isEqualTo("group.foo.0");
336+
MessageListenerContainer stoppedContainer = stopEvent.get().getContainer();
337+
assertThat(stoppedContainer).isSameAs(container);
296338
}
297339

298340
@SuppressWarnings({ "rawtypes", "unchecked" })

0 commit comments

Comments
 (0)