-
Notifications
You must be signed in to change notification settings - Fork 148
Allow consumer to stop processing messages upon closing #41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This fix has been tested in a production environment, and the problem has not reproduced in a week. |
Fix has been working stably in a large-scale production environment for 3 weeks. |
I can confirm tha twe have been running into the same issue and until now the fix is working wee for us as well. It would be great if that can be merged into a new version soon. |
@thwinkle We haven't observed any additional problems. The trace you gave seems a bit different of a problem as its blocking rather than looping (and its in the prefetch). |
@mhuffman-r7 you are right, when further digging into details we found out that there was an issue with a CredentialsProvider which did not properly overwrite the refresh() method, therefore it stopped working after a couple of hours. It took us a while to find that out as due to autoscaling instances got swapped out periodically. |
@mhuffman-r7 whats the latest status for you? All going good still? Any chance you could share how you configure your container factory? I'm not really sure what the best settings are for high concurrency + volumes, but this is what we have below (simplified) : @Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
final SQSConnectionFactory connectionFactory = SQSConnectionFactory.builder()
.withRegion(Region.getRegion(Regions.AP_SOUTHEAST_2))
.withAWSCredentialsProvider(customCredentialsProvider)
.withClientConfiguration(clientConfiguration)
.withNumberOfMessagesToPrefetch(0)
.build();
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setTargetConnectionFactory(connectionFactory);
cachingConnectionFactory.setReconnectOnException(true);
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(getErrorHandler());
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
factory.setCacheLevel(CACHE_AUTO); // We don-t actually set this, but AUTO is the default AFAIK
return factory;
} |
@reecefenwick We don't use a much different approach. We do enable concurrency for our use case, and as a consequence enable Unfortunately this project is looking dormant these days so I have no idea when this PR will be accepted. |
Cheers, it is a shame to see this sitting here so long, we will likely fork as well at this point We also set a concurrency level (and use DLQs), just not at the factory level. We have dynamic SQS listener needs, so I implement a JmsListenerConfigurer in spring and register them like so: for (Executor executor : executors) {
final SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId(executorName);
endpoint.setDestination(executorQueue);
endpoint.setConcurrency(executorConcurrency);
endpoint.setMessageListener(executionRequestListener);
registrar.registerEndpoint(endpoint);
} Perhaps our setup gets around the concurrency issues of the |
@mhuffman-r7 are you still using this solution? has it been working? We're considering utilizing it as we're seeing this issue currently |
@rkass Yes, we've been using this in a high-volume production environment for over a year. Unfortunately it appears as if this repository is gone dormant. |
This looks like a good bug fix to me, and the spec is specific about only having concurrent receive calls return null, not future calls after closing. I'm accepting it and will backfill an explicit unit test for the behaviour after the fact. Apologies for the silence on this repo. Better late than never. :) |
@mhuffman-r7, Thanks for the fix. |
Based on issue #10 this patch updates the
SQSMessageConsumerPrefetch
to throw an exception when the consumer is closed rather than returningnull
. This allows for theAsyncMessageListenerInvoker
in Spring JMS to stop requesting the consumer receive messages when theSQSMessageConsumer
has been closed.