Skip to content

Commit be0cc7c

Browse files
garyrussellartembilan
authored andcommitted
Add ListenerContainerNoLongerIdleEvent
1 parent 145240e commit be0cc7c

File tree

5 files changed

+153
-8
lines changed

5 files changed

+153
-8
lines changed
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.event;
18+
19+
import java.util.ArrayList;
20+
import java.util.Collection;
21+
import java.util.Collections;
22+
import java.util.List;
23+
24+
import org.apache.kafka.clients.consumer.Consumer;
25+
import org.apache.kafka.common.TopicPartition;
26+
27+
/**
28+
* An event that is emitted when a container is no longer idle if configured to publish
29+
* idle events.
30+
*
31+
* @author Gary Russell
32+
* @since 2.6.2
33+
*/
34+
public class ListenerContainerNoLongerIdleEvent extends KafkaEvent {
35+
36+
private static final long serialVersionUID = 1L;
37+
38+
private final long idleTime;
39+
40+
private final String listenerId;
41+
42+
private final List<TopicPartition> topicPartitions;
43+
44+
private transient Consumer<?, ?> consumer;
45+
46+
/**
47+
* Construct an instance with the provided arguments.
48+
* @param source the container instance that generated the event.
49+
* @param container the container or the parent container if the container is a child.
50+
* @param idleTime how long the container was idle.
51+
* @param id the container id.
52+
* @param topicPartitions the topics/partitions currently assigned.
53+
* @param consumer the consumer.
54+
*/
55+
public ListenerContainerNoLongerIdleEvent(Object source, Object container, long idleTime, String id,
56+
Collection<TopicPartition> topicPartitions, Consumer<?, ?> consumer) {
57+
58+
super(source, container);
59+
this.idleTime = idleTime;
60+
this.listenerId = id;
61+
this.topicPartitions = topicPartitions == null ? null : new ArrayList<>(topicPartitions);
62+
this.consumer = consumer;
63+
}
64+
65+
/**
66+
* The TopicPartitions the container is listening to.
67+
* @return the TopicPartition list.
68+
*/
69+
public Collection<TopicPartition> getTopicPartitions() {
70+
return this.topicPartitions == null ? null : Collections.unmodifiableList(this.topicPartitions);
71+
}
72+
73+
/**
74+
* How long the container was idle.
75+
* @return the time in milliseconds.
76+
*/
77+
public long getIdleTime() {
78+
return this.idleTime;
79+
}
80+
81+
/**
82+
* The id of the listener (if {@code @KafkaListener}) or the container bean name.
83+
* @return the id.
84+
*/
85+
public String getListenerId() {
86+
return this.listenerId;
87+
}
88+
89+
/**
90+
* Retrieve the consumer. Only populated if the listener is consumer-aware.
91+
* Allows the listener to resume a paused consumer.
92+
* @return the consumer.
93+
*/
94+
public Consumer<?, ?> getConsumer() {
95+
return this.consumer;
96+
}
97+
98+
@Override
99+
public String toString() {
100+
return "ListenerContainerNoLongerIdleEvent [idleTime="
101+
+ ((float) this.idleTime / 1000) + "s, listenerId=" + this.listenerId // NOSONAR magic #
102+
+ ", container=" + getSource()
103+
+ ", topicPartitions=" + this.topicPartitions + "]";
104+
}
105+
106+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.springframework.kafka.event.ConsumerStoppedEvent;
8282
import org.springframework.kafka.event.ConsumerStoppingEvent;
8383
import org.springframework.kafka.event.ListenerContainerIdleEvent;
84+
import org.springframework.kafka.event.ListenerContainerNoLongerIdleEvent;
8485
import org.springframework.kafka.event.NonResponsiveConsumerEvent;
8586
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback;
8687
import org.springframework.kafka.listener.ContainerProperties.AckMode;
@@ -358,6 +359,13 @@ private void publishIdleContainerEvent(long idleTime, Consumer<?, ?> consumer, b
358359
}
359360
}
360361

362+
private void publishNoLongerIdleContainerEvent(long idleTime, Consumer<?, ?> consumer) {
363+
if (getApplicationEventPublisher() != null) {
364+
getApplicationEventPublisher().publishEvent(new ListenerContainerNoLongerIdleEvent(this,
365+
this.thisOrParentContainer, idleTime, getBeanName(), getAssignedPartitions(), consumer));
366+
}
367+
}
368+
361369
private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<?, ?> consumer) {
362370
if (getApplicationEventPublisher() != null) {
363371
getApplicationEventPublisher().publishEvent(
@@ -591,6 +599,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
591599

592600
private boolean commitRecovered;
593601

602+
private boolean wasIdle;
603+
594604
private volatile boolean consumerPaused;
595605

596606
private volatile Thread consumerThread;
@@ -1122,16 +1132,25 @@ protected void pollAndInvoke() {
11221132
debugRecords(records);
11231133
if (records != null && records.count() > 0) {
11241134
savePositionsIfNeeded(records);
1125-
if (this.containerProperties.getIdleEventInterval() != null) {
1126-
this.lastReceive = System.currentTimeMillis();
1127-
}
1135+
notIdle();
11281136
invokeListener(records);
11291137
}
11301138
else {
11311139
checkIdle();
11321140
}
11331141
}
11341142

1143+
private void notIdle() {
1144+
if (this.containerProperties.getIdleEventInterval() != null) {
1145+
long now = System.currentTimeMillis();
1146+
if (this.wasIdle) {
1147+
this.wasIdle = false;
1148+
publishNoLongerIdleContainerEvent(now - this.lastReceive, this.consumer);
1149+
}
1150+
this.lastReceive = now;
1151+
}
1152+
}
1153+
11351154
private void savePositionsIfNeeded(ConsumerRecords<K, V> records) {
11361155
if (this.fixTxOffsets) {
11371156
this.savedPositions.clear();
@@ -1274,6 +1293,7 @@ private void checkIdle() {
12741293
long now = System.currentTimeMillis();
12751294
if (now > this.lastReceive + this.containerProperties.getIdleEventInterval()
12761295
&& now > this.lastAlertAt + this.containerProperties.getIdleEventInterval()) {
1296+
this.wasIdle = true;
12771297
publishIdleContainerEvent(now - this.lastReceive, this.consumer, this.consumerPaused);
12781298
this.lastAlertAt = now;
12791299
if (this.consumerSeekAwareListener != null) {

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.springframework.kafka.core.MicrometerProducerListener;
9090
import org.springframework.kafka.core.ProducerFactory;
9191
import org.springframework.kafka.event.ListenerContainerIdleEvent;
92+
import org.springframework.kafka.event.ListenerContainerNoLongerIdleEvent;
9293
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
9394
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
9495
import org.springframework.kafka.listener.ConsumerAwareErrorHandler;
@@ -247,13 +248,11 @@ public void testAnonymous() {
247248
public void testSimple() throws Exception {
248249
this.recordFilter.called = false;
249250
template.send("annotated1", 0, "foo");
250-
template.flush();
251251
assertThat(this.listener.latch1.await(60, TimeUnit.SECONDS)).isTrue();
252252
assertThat(this.config.globalErrorThrowable).isNotNull();
253253
assertThat(this.listener.receivedGroupId).isEqualTo("foo");
254254

255255
template.send("annotated2", 0, 123, "foo");
256-
template.flush();
257256
assertThat(this.listener.latch2.await(60, TimeUnit.SECONDS)).isTrue();
258257
assertThat(this.listener.key).isEqualTo(123);
259258
assertThat(this.listener.partition).isNotNull();
@@ -277,13 +276,11 @@ public void testSimple() throws Exception {
277276
.isEqualTo(2L);
278277

279278
template.send("annotated3", 0, "foo");
280-
template.flush();
281279
assertThat(this.listener.latch3.await(60, TimeUnit.SECONDS)).isTrue();
282280
assertThat(this.listener.capturedRecord.value()).isEqualTo("foo");
283281
assertThat(this.config.listen3Exception).isNotNull();
284282

285283
template.send("annotated4", 0, "foo");
286-
template.flush();
287284
assertThat(this.listener.latch4.await(60, TimeUnit.SECONDS)).isTrue();
288285
assertThat(this.listener.capturedRecord.value()).isEqualTo("foo");
289286
assertThat(this.listener.ack).isNotNull();
@@ -317,6 +314,10 @@ public void testSimple() throws Exception {
317314
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumer.clientId"))
318315
.isEqualTo("clientIdViaProps3-0");
319316

317+
template.send("annotated4", 0, "foo");
318+
assertThat(this.listener.noLongerIdleEventLatch.await(60, TimeUnit.SECONDS)).isTrue();
319+
assertThat(this.listener.noLongerIdleEvent.getListenerId().startsWith("qux-"));
320+
320321
template.send("annotated5", 0, 0, "foo");
321322
template.send("annotated5", 1, 0, "bar");
322323
template.send("annotated6", 0, 0, "baz");
@@ -1637,6 +1638,8 @@ static class Listener implements ConsumerSeekAware {
16371638

16381639
final CountDownLatch eventLatch = new CountDownLatch(1);
16391640

1641+
final CountDownLatch noLongerIdleEventLatch = new CountDownLatch(1);
1642+
16401643
final CountDownLatch keyLatch = new CountDownLatch(1);
16411644

16421645
final AtomicBoolean reposition12 = new AtomicBoolean();
@@ -1667,6 +1670,8 @@ static class Listener implements ConsumerSeekAware {
16671670

16681671
volatile ListenerContainerIdleEvent event;
16691672

1673+
volatile ListenerContainerNoLongerIdleEvent noLongerIdleEvent;
1674+
16701675
volatile List<Integer> keys;
16711676

16721677
volatile List<Integer> partitions;
@@ -1757,6 +1762,12 @@ public void eventHandler(ListenerContainerIdleEvent event) {
17571762
eventLatch.countDown();
17581763
}
17591764

1765+
@EventListener(condition = "event.listenerId.startsWith('qux')")
1766+
public void eventHandler(ListenerContainerNoLongerIdleEvent event) {
1767+
this.noLongerIdleEvent = event;
1768+
noLongerIdleEventLatch.countDown();
1769+
}
1770+
17601771
@KafkaListener(id = "fiz", topicPartitions = {
17611772
@TopicPartition(topic = "annotated5", partitions = { "#{'${foo:0,1}'.split(',')}" }),
17621773
@TopicPartition(topic = "annotated6", partitions = "0",

src/reference/asciidoc/kafka.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2535,6 +2535,7 @@ The following Spring application events are published by listener containers and
25352535
This event might signal that the configured task executor has insufficient threads to support the containers it is used in and their concurrency.
25362536
An error message is also logged when this condition occurs.
25372537
* `ListenerContainerIdleEvent`: published when no messages have been received in `idleInterval` (if configured).
2538+
* `ListenerContainerNoLongerIdleEvent`: published when a record is consumed after previously publishing a `ListenerContainerIdleEvent`.
25382539
* `NonResponsiveConsumerEvent`: published when the consumer appears to be blocked in the `poll` method.
25392540
* `ConsumerPausedEvent`: published by each consumer when the container is paused.
25402541
* `ConsumerResumedEvent`: published by each consumer when the container is resumed.
@@ -2558,6 +2559,8 @@ For example, if the consumer's `pause()` method was previously called, it can `r
25582559
* `paused`: Whether the container is currently paused.
25592560
See <<pause-resume>> for more information.
25602561

2562+
The `ListenerContainerNoLongerIdleEvent` has the same properties, except `idleTime` and `paused`.
2563+
25612564
The `NonResponsiveConsumerEvent` has the following properties:
25622565

25632566
* `source`: The listener container instance that published the event.
@@ -2637,6 +2640,8 @@ You can modify this behavior by setting the `monitorInterval` (default 30 second
26372640
The `noPollThreshold` should be greater than `1.0` to avoid getting spurious events due to a race condition.
26382641
Receiving such an event lets you stop the containers, thus waking the consumer so that it can stop.
26392642

2643+
Starting with version 2.6.2, if a container has published a `ListenerContainerIdleEvent`, it will publish a `ListenerContainerNoLongerIdleEvent` when a record is subsequently received.
2644+
26402645
====== Event Consumption
26412646

26422647
You can capture these events by implementing `ApplicationListener` -- either a general listener or one narrowed to only receive this specific event.

src/reference/asciidoc/whats-new.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ See <<seek-to-current>>, <<recovering-batch-eh>>, <<dead-letters>> and <<after-r
2020
You can now configure an `adviceChain` in the container properties.
2121
See <<container-props>> for more information.
2222

23-
==== @KafkaLisener Changes
23+
When the container is configured to publish `ListenerContainerIdleEvent` s, it now publishes a `ListenerContainerNoLongerIdleEvent` when a record is received after publishing an idle event.
24+
See <<events>> and <<idle-containers>> for more information.
25+
26+
==== @KafkaListener Changes
2427

2528
When using manual partition assignment, you can now specify a wildcard for determining which partitions should be reset to the initial offset.
2629
In addition, if the listener implements `ConsumerSeekAware`, `onPartitionsAssigned()` is called after the manual assignment.

0 commit comments

Comments
 (0)