Skip to content

Add assertions to fail early on absent values using StreamMessageListenerContainer #2472

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
survivant opened this issue Dec 21, 2022 · 1 comment
Labels
type: enhancement A general enhancement

Comments

@survivant
Copy link

survivant commented Dec 21, 2022

I have a junit that publish and listen for messages on a Redis Stream. Something when I start the code, I get a NPE exception. I found that it's the thread that start receiving messages when the listener wasn't yet added. I found this thread that had the same error then me, and after I ran my code in debug mode and found out the issue. redisson/redisson#4135

image

here my code and configs

I have the same issue but with lettuce. Here the code that I have.

logs

07:52:55.111 [SimpleAsyncTaskExecutor-1] ERROR o.s.d.r.s.DefaultStreamMessageListenerContainer$LoggingErrorHandler - Unexpected error occurred in scheduled task.
java.lang.NullPointerException: null
	at org.springframework.data.redis.stream.StreamPollTask.deserializeAndEmitRecords(StreamPollTask.java:177)
	at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:148)
	at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132)
	at java.base/java.lang.Thread.run(Thread.java:829)
07:52:55.109 [SimpleAsyncTaskExecutor-2] INFO  c.e.demoredis.controller.StreamTest - Message received in JUNIT for group-consumer [group-a-consumer-a] [DemoMessage(id=0, message=message1, createdBy=user1, modifyBy=null)]
07:52:55.117 [pool-2-thread-2] INFO  c.e.demoredis.controller.StreamTest - Messages received
 @Test
    public void testStreamListener() {
        publishDemoMessage(1);

        var future = CompletableFuture.supplyAsync(() -> waitNotification(countDownLatch), executor)
                .thenAcceptAsync(release -> LOGGER.info("Messages received"), executor);

        // wait for the future complete
        waitForFutureToComplete(future);

        assertEquals(0, countDownLatch.getCount());
}
private void publishDemoMessage(int count) {
        for (var i = 0; i<count; i++) {
            var message = DemoMessage.builder()
                    .id(String.valueOf(movieID++))
                    .message("message1")
                    .createdBy("user1")
                    .build();

            var record = StreamRecords.newRecord()
                    .ofObject(message)
                    .withStreamKey(KEY);

            var recordId = redisTemplate.opsForStream().add(record).block();
        }
    }
@Configuration
@RequiredArgsConstructor
public class StreamRedisConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamRedisConfiguration.class);

    public static String STREAM_CHANNEL;
    public static String STREAM_GROUP;
    public static String STREAM_CONSUMER;

    private StreamListener<String, ObjectRecord<String, DemoMessage>> streamListener;

    @Value("${spring.redis.stream.channel}")
    private void setStreamChannel(String channel){
        STREAM_CHANNEL = channel;
    }

    @Value("${spring.redis.stream.group}")
    private void setStreamGroup(String group){
        STREAM_GROUP = group;
    }

    @Value("${spring.redis.stream.consumer}")
    private void setChannelConsumer(String consumer){
        STREAM_CONSUMER = consumer;
    }

    @Bean
    public StreamMessageListenerContainer<String, ObjectRecord<String, DemoMessage>> listenerContainer(RedisConnectionFactory redisConnectionFactory) {
        var options = StreamMessageListenerContainerOptions
                .builder()
                .pollTimeout(Duration.ofSeconds(1))
                .targetType(DemoMessage.class)
                .build();
        var listenerContainer = StreamMessageListenerContainer.create(redisConnectionFactory, options);

        return listenerContainer;
    }

    @Bean
    public Subscription subscription(RedisConnectionFactory redisConnectionFactory, ReactiveRedisTemplate<String, DemoMessage> reactiveRedisTemplate) {

        try {
            redisConnectionFactory.getConnection()
                    .xGroupCreate(STREAM_CHANNEL.getBytes(), STREAM_GROUP, ReadOffset.from("0-0"), true);
        } catch (RedisSystemException exception) {
            LOGGER.warn(exception.getCause().getMessage());
        }

        var listenerContainer = listenerContainer(redisConnectionFactory);

        var subscription = listenerContainer
                .receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
                StreamOffset.create(STREAM_CHANNEL, ReadOffset.lastConsumed())
                , streamListener);
        listenerContainer.start();
        return subscription;
    }
}
@Service
@RequiredArgsConstructor
public class RedisStreamMessageConsumer implements StreamListener<String, ObjectRecord<String, DemoMessage>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisStreamMessageConsumer.class);

	private final ReactiveRedisTemplate<String, String> redisTemplate;

	@Override
	public void onMessage(ObjectRecord<String, DemoMessage> record) {
        LOGGER.info("Stream Subscriber >> [{}]", record.getValue());
	}

}

my pom.xml

in my parent

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.comact.demo</groupId>
        <artifactId>redis-demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <relativePath>../pom.xml</relativePath>
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo-redis-service</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>service-watcher</name>
    <description>service-watcher</description>
    <properties>
        <java.version>11</java.version>
        <springdoc.version>1.6.12</springdoc.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>com.example</groupId>
            <artifactId>demo-redis-pojo</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-ui</artifactId>
            <version>${springdoc.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-webflux-ui</artifactId>
            <version>${springdoc.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
            <version>3.1.8</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bootstrap</artifactId>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.playtika.testcontainers/embedded-redis -->
        <dependency>
            <groupId>com.playtika.testcontainers</groupId>
            <artifactId>embedded-redis</artifactId>
            <version>2.2.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springdoc</groupId>
                <artifactId>springdoc-openapi-maven-plugin</artifactId>
                <version>1.4</version>
            </plugin>
        </plugins>
    </build>

</project>

and my config

spring:
  redis:
    host: ${embedded.redis.host}
    port: ${embedded.redis.port}
    password: ${embedded.redis.password}
    pubSub:
      channel: 'JUNIT-MESSAGES_CHANNEL'
    stream:
      channel: 'JUNIT-STREAM_CHANNEL'
      group: 'group'
      consumer: 'consumer'
@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Dec 21, 2022
@mp911de
Copy link
Member

mp911de commented Jan 2, 2023

It looks pretty much as if your streamListener was null.

@mp911de mp911de added type: enhancement A general enhancement and removed status: waiting-for-triage An issue we've not yet triaged labels Jan 2, 2023
@mp911de mp911de changed the title The doLoop method in the StreamPollTask class always reports a null pointer exception Add assertions to fail early on absent values using StreamMessageListenerContainer Jan 2, 2023
@mp911de mp911de added this to the 2.7.7 (2021.2.7) milestone Jan 2, 2023
mp911de added a commit that referenced this issue Jan 2, 2023
@mp911de mp911de closed this as completed in fb11f31 Jan 2, 2023
mp911de added a commit that referenced this issue Jan 2, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

3 participants