Skip to content

Commit f3cfa19

Browse files
garyrussellartembilan
authored andcommitted
GH-2525: Fix Paused Partition Resume (Rebalance)
Resolves #2525 If a paused partition is revoked and re-assigned during a retry topic pause delay, the partition is not resumed because it no longer exists in the `pausedPartitions` field. When re-pausing a paused partition, it must be re-added to the paused partitions so that it will not be filtered out of the resume logic. **cherry-pick to 2.9.x**
1 parent 425f837 commit f3cfa19

File tree

2 files changed

+89
-1
lines changed

2 files changed

+89
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -3596,6 +3596,7 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
35963596
ListenerConsumer.this.consumer.pause(toRepause);
35973597
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + toRepause);
35983598
publishConsumerPausedEvent(toRepause, "Re-paused after rebalance");
3599+
ListenerConsumer.this.pausedPartitions.addAll(toRepause);
35993600
}
36003601
this.revoked.removeAll(toRepause);
36013602
ListenerConsumer.this.pausedPartitions.removeAll(this.revoked);

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2824,6 +2824,93 @@ public void rePausePartitionAfterRebalance() throws Exception {
28242824
container.stop();
28252825
}
28262826

2827+
@SuppressWarnings({ "unchecked" })
2828+
@Test
2829+
public void resumePartitionAfterRevokeAndReAssign() throws Exception {
2830+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2831+
Consumer<Integer, String> consumer = mock(Consumer.class);
2832+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2833+
AtomicBoolean first = new AtomicBoolean(true);
2834+
TopicPartition tp0 = new TopicPartition("foo", 0);
2835+
TopicPartition tp1 = new TopicPartition("foo", 1);
2836+
given(consumer.assignment()).willReturn(Set.of(tp0, tp1));
2837+
final CountDownLatch pauseLatch1 = new CountDownLatch(1);
2838+
final CountDownLatch suspendConsumerThread = new CountDownLatch(1);
2839+
Set<TopicPartition> pausedParts = ConcurrentHashMap.newKeySet();
2840+
Thread testThread = Thread.currentThread();
2841+
AtomicBoolean paused = new AtomicBoolean();
2842+
willAnswer(i -> {
2843+
pausedParts.clear();
2844+
pausedParts.addAll(i.getArgument(0));
2845+
if (!Thread.currentThread().equals(testThread)) {
2846+
paused.set(true);
2847+
}
2848+
return null;
2849+
}).given(consumer).pause(any());
2850+
given(consumer.paused()).willReturn(pausedParts);
2851+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2852+
if (paused.get()) {
2853+
pauseLatch1.countDown();
2854+
// hold up the consumer thread while we revoke/assign partitions on the test thread
2855+
suspendConsumerThread.await(10, TimeUnit.SECONDS);
2856+
}
2857+
Thread.sleep(50);
2858+
return ConsumerRecords.empty();
2859+
});
2860+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
2861+
Collection<String> foos = new ArrayList<>();
2862+
foos.add("foo");
2863+
willAnswer(inv -> {
2864+
rebal.set(inv.getArgument(1));
2865+
rebal.get().onPartitionsAssigned(Set.of(tp0, tp1));
2866+
return null;
2867+
}).given(consumer).subscribe(eq(foos), any(ConsumerRebalanceListener.class));
2868+
final CountDownLatch resumeLatch = new CountDownLatch(1);
2869+
willAnswer(i -> {
2870+
pausedParts.removeAll(i.getArgument(0));
2871+
resumeLatch.countDown();
2872+
return null;
2873+
}).given(consumer).resume(any());
2874+
ContainerProperties containerProps = new ContainerProperties("foo");
2875+
containerProps.setGroupId("grp");
2876+
containerProps.setAckMode(AckMode.RECORD);
2877+
containerProps.setClientId("clientId");
2878+
containerProps.setIdleEventInterval(100L);
2879+
containerProps.setMessageListener((MessageListener) rec -> { });
2880+
containerProps.setMissingTopicsFatal(false);
2881+
KafkaMessageListenerContainer<Integer, String> container =
2882+
new KafkaMessageListenerContainer<>(cf, containerProps);
2883+
container.start();
2884+
container.pausePartition(tp0);
2885+
container.pausePartition(tp1);
2886+
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
2887+
assertThat(pausedParts).hasSize(2)
2888+
.contains(tp0, tp1);
2889+
rebal.get().onPartitionsRevoked(Set.of(tp0, tp1));
2890+
rebal.get().onPartitionsAssigned(Collections.singleton(tp0));
2891+
rebal.get().onPartitionsRevoked(Set.of(tp0));
2892+
rebal.get().onPartitionsAssigned(Set.of(tp0, tp1));
2893+
assertThat(pausedParts).hasSize(2)
2894+
.contains(tp0, tp1);
2895+
assertThat(container).extracting("listenerConsumer")
2896+
.extracting("pausedPartitions")
2897+
.asInstanceOf(InstanceOfAssertFactories.collection(TopicPartition.class))
2898+
.hasSize(2)
2899+
.contains(tp0, tp1);
2900+
assertThat(container)
2901+
.extracting("pauseRequestedPartitions")
2902+
.asInstanceOf(InstanceOfAssertFactories.collection(TopicPartition.class))
2903+
.hasSize(2)
2904+
.contains(tp0, tp1);
2905+
container.resumePartition(tp0);
2906+
container.resumePartition(tp1);
2907+
suspendConsumerThread.countDown();
2908+
assertThat(resumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
2909+
assertThat(pausedParts).hasSize(0);
2910+
verify(consumer).resume(List.of(tp0, tp1));
2911+
container.stop();
2912+
}
2913+
28272914
@SuppressWarnings({ "unchecked", "rawtypes" })
28282915
@Test
28292916
public void testInitialSeek() throws Exception {

0 commit comments

Comments
 (0)