Skip to content

Commit b286190

Browse files
authored
GH-1352: ConsumerCustomizer Improvements
- pass in the listener id, if available - narrow to a specific interface to aid Boot auto configuration - add null check * Apply Suggestion to add @FuntionalInterface
1 parent 3004e1d commit b286190

File tree

4 files changed

+49
-13
lines changed

4 files changed

+49
-13
lines changed

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/config/StreamRabbitListenerContainerFactory.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.springframework.rabbit.stream.config;
1818

1919
import java.lang.reflect.Method;
20-
import java.util.function.Consumer;
2120

2221
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
2322
import org.springframework.amqp.rabbit.config.BaseRabbitListenerContainerFactory;
@@ -26,11 +25,11 @@
2625
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint;
2726
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
2827
import org.springframework.lang.Nullable;
28+
import org.springframework.rabbit.stream.listener.ConsumerCustomizer;
2929
import org.springframework.rabbit.stream.listener.StreamListenerContainer;
3030
import org.springframework.rabbit.stream.listener.adapter.StreamMessageListenerAdapter;
3131
import org.springframework.util.Assert;
3232

33-
import com.rabbitmq.stream.ConsumerBuilder;
3433
import com.rabbitmq.stream.Environment;
3534

3635
/**
@@ -47,7 +46,7 @@ public class StreamRabbitListenerContainerFactory
4746

4847
private boolean nativeListener;
4948

50-
private Consumer<ConsumerBuilder> consumerCustomizer;
49+
private ConsumerCustomizer consumerCustomizer;
5150

5251
private ContainerCustomizer<StreamListenerContainer> containerCustomizer;
5352

@@ -72,7 +71,7 @@ public void setNativeListener(boolean nativeListener) {
7271
* Customize the consumer builder before it is built.
7372
* @param consumerCustomizer the customizer.
7473
*/
75-
public void setConsumerCustomizer(java.util.function.Consumer<ConsumerBuilder> consumerCustomizer) {
74+
public void setConsumerCustomizer(ConsumerCustomizer consumerCustomizer) {
7675
this.consumerCustomizer = consumerCustomizer;
7776
}
7877

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.rabbit.stream.listener;
18+
19+
import java.util.function.BiConsumer;
20+
21+
import com.rabbitmq.stream.ConsumerBuilder;
22+
23+
/**
24+
* Customizer for {@link ConsumerBuilder}.
25+
*
26+
* @author Gary Russell
27+
* @since 2.4
28+
*
29+
*/
30+
@FunctionalInterface
31+
public interface ConsumerCustomizer extends BiConsumer<String, ConsumerBuilder> {
32+
}

spring-rabbit-stream/src/main/java/org/springframework/rabbit/stream/listener/StreamListenerContainer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class StreamListenerContainer implements MessageListenerContainer, BeanNa
5050

5151
private StreamMessageConverter messageConverter;
5252

53-
private java.util.function.Consumer<ConsumerBuilder> consumerCustomizer = c -> { };
53+
private ConsumerCustomizer consumerCustomizer = (id, con) -> { };
5454

5555
private Consumer consumer;
5656

@@ -112,7 +112,8 @@ public void setMessageConverter(StreamMessageConverter messageConverter) {
112112
* Customize the consumer builder before it is built.
113113
* @param consumerCustomizer the customizer.
114114
*/
115-
public synchronized void setConsumerCustomizer(java.util.function.Consumer<ConsumerBuilder> consumerCustomizer) {
115+
public synchronized void setConsumerCustomizer(ConsumerCustomizer consumerCustomizer) {
116+
Assert.notNull(consumerCustomizer, "'consumerCustomizer' cannot be null");
116117
this.consumerCustomizer = consumerCustomizer;
117118
}
118119

@@ -167,7 +168,7 @@ public synchronized boolean isRunning() {
167168
@Override
168169
public synchronized void start() {
169170
if (this.consumer == null) {
170-
this.consumerCustomizer.accept(this.builder);
171+
this.consumerCustomizer.accept(getListenerId(), this.builder);
171172
this.consumer = this.builder.build();
172173
}
173174
}

spring-rabbit-stream/src/test/java/org/springframework/rabbit/stream/listener/RabbitListenerTests.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,14 @@ public class RabbitListenerTests extends AbstractIntegrationTests {
6161

6262
@Test
6363
void simple(@Autowired RabbitTemplate template) throws InterruptedException {
64-
6564
template.convertAndSend("test.stream.queue1", "foo");
6665
assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue();
6766
assertThat(this.config.received).isEqualTo("foo");
67+
assertThat(this.config.id).isEqualTo("test");
6868
}
6969

7070
@Test
7171
void nativeMsg(@Autowired RabbitTemplate template) throws InterruptedException {
72-
7372
template.convertAndSend("test.stream.queue2", "foo");
7473
assertThat(this.config.latch2.await(10, TimeUnit.SECONDS)).isTrue();
7574
assertThat(this.config.receivedNative).isNotNull();
@@ -97,6 +96,8 @@ public static class Config {
9796

9897
volatile Context context;
9998

99+
volatile String id;
100+
100101
@Bean
101102
Environment environment() {
102103
return Environment.builder()
@@ -140,13 +141,16 @@ void listen(String in) {
140141
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
141142
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
142143
factory.setNativeListener(true);
143-
factory.setConsumerCustomizer(builder -> builder.name("myConsumer")
144-
.offset(OffsetSpecification.first())
145-
.manualTrackingStrategy());
144+
factory.setConsumerCustomizer((id, builder) -> {
145+
builder.name("myConsumer")
146+
.offset(OffsetSpecification.first())
147+
.manualTrackingStrategy();
148+
this.id = id;
149+
});
146150
return factory;
147151
}
148152

149-
@RabbitListener(queues = "test.stream.queue2", containerFactory = "nativeFactory")
153+
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
150154
void nativeMsg(Message in, Context context) {
151155
this.receivedNative = in;
152156
this.context = context;

0 commit comments

Comments
 (0)