Skip to content

Allow consumer to stop processing messages upon closing #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from Aug 7, 2019
Merged

Allow consumer to stop processing messages upon closing #41

merged 1 commit into from Aug 7, 2019

Conversation

ghost
Copy link

@ghost ghost commented Aug 23, 2017

Based on issue #10 this patch updates the SQSMessageConsumerPrefetch to throw an exception when the consumer is closed rather than returning null. This allows for the AsyncMessageListenerInvoker in Spring JMS to stop requesting the consumer receive messages when the SQSMessageConsumer has been closed.

@ghost
Copy link
Author

ghost commented Aug 28, 2017

This fix has been tested in a production environment, and the problem has not reproduced in a week.

@ghost
Copy link
Author

ghost commented Sep 11, 2017

Fix has been working stably in a large-scale production environment for 3 weeks.

@thwinkle
Copy link

I can confirm tha twe have been running into the same issue and until now the fix is working wee for us as well. It would be great if that can be merged into a new version soon.

@thwinkle
Copy link

thwinkle commented Oct 5, 2017

Unfortunately I have to report that the issue has not resolved for us, after a few days running fine the problem occured again. @mhuffman-r7 how is your experience, did this fix solve it permanently for you or did you have to make further changes? We are seeing when monitoring via JMX the threads are hanging at SQSMessageConsumerPrefetch.java:484 although there are messages in the queue.
image

@ghost
Copy link
Author

ghost commented Oct 5, 2017

@thwinkle We haven't observed any additional problems. The trace you gave seems a bit different of a problem as its blocking rather than looping (and its in the prefetch).

@thwinkle
Copy link

@mhuffman-r7 you are right, when further digging into details we found out that there was an issue with a CredentialsProvider which did not properly overwrite the refresh() method, therefore it stopped working after a couple of hours. It took us a while to find that out as due to autoscaling instances got swapped out periodically.

@reecefenwick
Copy link

reecefenwick commented Apr 5, 2018

@mhuffman-r7 whats the latest status for you? All going good still?

Any chance you could share how you configure your container factory?

I'm not really sure what the best settings are for high concurrency + volumes, but this is what we have below (simplified) :

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        final SQSConnectionFactory connectionFactory = SQSConnectionFactory.builder()
                .withRegion(Region.getRegion(Regions.AP_SOUTHEAST_2))
                .withAWSCredentialsProvider(customCredentialsProvider)
                .withClientConfiguration(clientConfiguration)
                .withNumberOfMessagesToPrefetch(0)
                .build();
        
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setTargetConnectionFactory(connectionFactory);
        cachingConnectionFactory.setReconnectOnException(true);
        
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(getErrorHandler());
        factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        factory.setCacheLevel(CACHE_AUTO); // We don-t actually set this, but AUTO is the default AFAIK
        return factory;
    }

@ghost
Copy link
Author

ghost commented Apr 5, 2018

@reecefenwick We don't use a much different approach. We do enable concurrency for our use case, and as a consequence enable UNORDERED_ACKNOWLEDGE session acknowledgement mode. As we hook up our queues with DLQs this works well for us. I have not personally experimented with cache level settings.

Unfortunately this project is looking dormant these days so I have no idea when this PR will be accepted.

@reecefenwick
Copy link

reecefenwick commented Apr 5, 2018

Cheers, it is a shame to see this sitting here so long, we will likely fork as well at this point

We also set a concurrency level (and use DLQs), just not at the factory level.

We have dynamic SQS listener needs, so I implement a JmsListenerConfigurer in spring and register them like so:

        for (Executor executor : executors) {
            final SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
            endpoint.setId(executorName);
            endpoint.setDestination(executorQueue);
            endpoint.setConcurrency(executorConcurrency);
            endpoint.setMessageListener(executionRequestListener);
            registrar.registerEndpoint(endpoint);
        }

Perhaps our setup gets around the concurrency issues of the RangedAcknowledger.java used for CLIENT_ACKNOWLEDGE

@rkass
Copy link

rkass commented Nov 8, 2018

@mhuffman-r7 are you still using this solution? has it been working? We're considering utilizing it as we're seeing this issue currently

@ghost
Copy link
Author

ghost commented Nov 8, 2018

@rkass Yes, we've been using this in a high-volume production environment for over a year. Unfortunately it appears as if this repository is gone dormant.

@robin-aws
Copy link
Collaborator

robin-aws commented Aug 7, 2019

This looks like a good bug fix to me, and the spec is specific about only having concurrent receive calls return null, not future calls after closing. I'm accepting it and will backfill an explicit unit test for the behaviour after the fact.

Apologies for the silence on this repo. Better late than never. :)

@robin-aws robin-aws merged commit 04b2881 into awslabs:master Aug 7, 2019
@ghost
Copy link

ghost commented Aug 11, 2019

@mhuffman-r7, Thanks for the fix.
@robin-aws When can we get this fix in a release? The new release 1.0.7 doesn't contain this fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants