Skip to content

Commit ca34db1

Browse files
garyrussellartembilan
authored andcommitted
GH-1181: Fix memory leak with user correlation
Resolves #1181
1 parent 838b88a commit ca34db1

File tree

2 files changed

+16
-14
lines changed

2 files changed

+16
-14
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1929,25 +1929,19 @@ private Message doSendAndReceiveWithDirect(String exchange, String routingKey, M
19291929
private Message doSendAndReceiveAsListener(final String exchange, final String routingKey, final Message message,
19301930
final CorrelationData correlationData, Channel channel) throws Exception { // NOSONAR
19311931
final PendingReply pendingReply = new PendingReply();
1932-
String messageTag = String.valueOf(this.messageTagProvider.incrementAndGet());
1932+
String messageTag = null;
19331933
if (this.userCorrelationId) {
1934-
String correlationId;
19351934
if (this.correlationKey != null) {
1936-
correlationId = (String) message.getMessageProperties().getHeaders().get(this.correlationKey);
1935+
messageTag = (String) message.getMessageProperties().getHeaders().get(this.correlationKey);
19371936
}
19381937
else {
1939-
correlationId = message.getMessageProperties().getCorrelationId();
1940-
}
1941-
if (correlationId == null) {
1942-
this.replyHolder.put(messageTag, pendingReply);
1943-
}
1944-
else {
1945-
this.replyHolder.put(correlationId, pendingReply);
1938+
messageTag = message.getMessageProperties().getCorrelationId();
19461939
}
19471940
}
1948-
else {
1949-
this.replyHolder.put(messageTag, pendingReply);
1941+
if (messageTag == null) {
1942+
messageTag = String.valueOf(this.messageTagProvider.incrementAndGet());
19501943
}
1944+
this.replyHolder.put(messageTag, pendingReply);
19511945
saveAndSetProperties(message, pendingReply, messageTag);
19521946

19531947
if (logger.isDebugEnabled()) {

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateIntegrationTests.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -659,6 +659,7 @@ public void testSendInExternalTransactionWithRollback() throws Exception {
659659
assertThat(result).isEqualTo(null);
660660
}
661661

662+
@SuppressWarnings("unchecked")
662663
@Test
663664
public void testAtomicSendAndReceive() throws Exception {
664665
final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
@@ -689,10 +690,12 @@ public void testAtomicSendAndReceive() throws Exception {
689690
// Message was consumed so nothing left on queue
690691
reply = template.receive();
691692
assertThat(reply).isEqualTo(null);
693+
assertThat(TestUtils.getPropertyValue(template, "replyHolder", Map.class)).hasSize(0);
692694
template.stop();
693695
cachingConnectionFactory.destroy();
694696
}
695697

698+
@SuppressWarnings("unchecked")
696699
@Test
697700
public void testAtomicSendAndReceiveUserCorrelation() throws Exception {
698701
final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
@@ -732,6 +735,7 @@ public void testAtomicSendAndReceiveUserCorrelation() throws Exception {
732735
// Message was consumed so nothing left on queue
733736
reply = template.receive();
734737
assertThat(reply).isEqualTo(null);
738+
assertThat(TestUtils.getPropertyValue(template, "replyHolder", Map.class)).hasSize(0);
735739
template.stop();
736740
container.stop();
737741
cachingConnectionFactory.destroy();
@@ -1330,6 +1334,7 @@ public void testSendAndReceiveNeverFastWitReplyQueue() {
13301334
sendAndReceiveFastGuts(true, true, false);
13311335
}
13321336

1337+
@SuppressWarnings("unchecked")
13331338
private void sendAndReceiveFastGuts(boolean tempQueue, boolean setDirectReplyToExplicitly, boolean expectUsedTemp) {
13341339
RabbitTemplate template = createSendAndReceiveRabbitTemplate(this.connectionFactory);
13351340
try {
@@ -1368,6 +1373,7 @@ public Message handleMessage(Message message) {
13681373
else {
13691374
assertThat(replyToWas.get()).startsWith(Address.AMQ_RABBITMQ_REPLY_TO);
13701375
}
1376+
assertThat(TestUtils.getPropertyValue(template, "replyHolder", Map.class)).hasSize(0);
13711377
}
13721378
catch (Exception e) {
13731379
assertThat(e.getCause().getCause().getMessage()).contains("404");
@@ -1378,6 +1384,7 @@ public Message handleMessage(Message message) {
13781384
}
13791385
}
13801386

1387+
@SuppressWarnings("unchecked")
13811388
@Test
13821389
public void testReplyCompressionWithContainer() {
13831390
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(
@@ -1406,6 +1413,7 @@ public String handleMessage(String message) {
14061413
GUnzipPostProcessor unzipper = new GUnzipPostProcessor();
14071414
reply = unzipper.postProcessMessage(reply);
14081415
assertThat(new String(reply.getBody())).isEqualTo("FOO");
1416+
assertThat(TestUtils.getPropertyValue(template, "replyHolder", Map.class)).hasSize(0);
14091417
}
14101418
finally {
14111419
template.stop();
@@ -1420,7 +1428,7 @@ protected RabbitTemplate createSendAndReceiveRabbitTemplate(ConnectionFactory co
14201428
}
14211429

14221430
@Test
1423-
public void testDegugLogOnPassiveDeclaration() {
1431+
public void testDebugLogOnPassiveDeclaration() {
14241432
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
14251433
Log logger = spy(TestUtils.getPropertyValue(connectionFactory, "logger", Log.class));
14261434
doReturn(true).when(logger).isDebugEnabled();

0 commit comments

Comments
 (0)