Skip to content

Commit bd7656c

Browse files
garyrussellartembilan
authored andcommitted
GH-1309: Don't Bind Resource When Sync Not Active
Resolves #1309 If the `RabbitTemplate` was called from a `@TransactionalEventListener`, with phase `AFTER_COMMIT`, a transaction is "active", but synchronizations are already cleared. It is too late to synchronize this transaction. We end up with an orphaned `ResourceHolder` with pending transaction commits. Don't bind the resource holder if synchronization is not active. However, the proper solution, if users want to synchronize the rabbit transaction with the global transaction, is to use the `BEFORE_COMMIT` phase. See the discussion on the Github issue for more information. **cherry-pick to 2.2.x** * Fix since version. * Reset physical close required flag left over from another test. * Capture test results for all modules. # Conflicts: # .github/workflows/pr-build-workflow.yml # spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/ClientRecoveryCompatibilityTests.java
1 parent aa6ae4f commit bd7656c

File tree

3 files changed

+158
-19
lines changed

3 files changed

+158
-19
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -147,7 +147,8 @@ private static RabbitResourceHolder doGetTransactionalResourceHolder(// NOSONAR
147147
}
148148
resourceHolderToUse.addChannel(channel, connection);
149149

150-
if (!resourceHolderToUse.equals(resourceHolder)) {
150+
if (!resourceHolderToUse.equals(resourceHolder)
151+
&& TransactionSynchronizationManager.isSynchronizationActive()) {
151152
bindResourceToTransaction(resourceHolderToUse, connectionFactory,
152153
resourceFactory.isSynchedLocalTransactionAllowed());
153154
}
@@ -171,6 +172,7 @@ public static void releaseResources(@Nullable RabbitResourceHolder resourceHolde
171172

172173
public static RabbitResourceHolder bindResourceToTransaction(RabbitResourceHolder resourceHolder,
173174
ConnectionFactory connectionFactory, boolean synched) {
175+
174176
if (TransactionSynchronizationManager.hasResource(connectionFactory)
175177
|| !TransactionSynchronizationManager.isActualTransactionActive() || !synched) {
176178
return (RabbitResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory); // NOSONAR never null

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/ClientRecoveryCompatibilityTests.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package org.springframework.amqp.rabbit.connection;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20-
import static org.assertj.core.api.Assertions.fail;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121
import static org.mockito.ArgumentMatchers.any;
2222
import static org.mockito.ArgumentMatchers.anyString;
2323
import static org.mockito.Mockito.doAnswer;
@@ -43,6 +43,7 @@ public class ClientRecoveryCompatibilityTests {
4343

4444
@Test
4545
public void testDefeatRecovery() throws Exception {
46+
RabbitUtils.clearPhysicalCloseRequired(); // left over from some other test
4647
final Channel channel1 = mock(Channel.class);
4748
when(channel1.isOpen()).thenReturn(true);
4849
final Channel channel2 = mock(Channel.class);
@@ -57,37 +58,32 @@ public void testDefeatRecovery() throws Exception {
5758
ccf.setExecutor(mock(ExecutorService.class));
5859
Connection conn1 = ccf.createConnection();
5960
Channel channel = conn1.createChannel(false);
60-
verifyChannelIs(channel1, channel);
61+
ChannelProxy proxy = (ChannelProxy) channel;
62+
assertThat(proxy.getTargetChannel()).isSameAs(channel1);
6163
channel.close();
6264
conn1.close();
6365
Connection conn2 = ccf.createConnection();
6466
assertThat(conn2).isSameAs(conn1);
6567
channel = conn1.createChannel(false);
66-
verifyChannelIs(channel1, channel);
68+
proxy = (ChannelProxy) channel;
69+
assertThat(proxy.getTargetChannel()).isSameAs(channel1);
6770
channel.close();
6871
conn2.close();
6972

7073
when(rabbitConn.isOpen()).thenReturn(false).thenReturn(true);
7174
when(channel1.isOpen()).thenReturn(false);
72-
conn2 = ccf.createConnection();
73-
try {
74-
conn2.createChannel(false);
75-
fail("Expected AutoRecoverConnectionNotCurrentlyOpenException");
76-
}
77-
catch (AutoRecoverConnectionNotCurrentlyOpenException e) {
78-
assertThat(e.getMessage()).isEqualTo("Auto recovery connection is not currently open");
79-
}
75+
Connection conn3 = ccf.createConnection();
76+
assertThat(conn3).isSameAs(conn1);
77+
assertThatExceptionOfType(AutoRecoverConnectionNotCurrentlyOpenException.class).isThrownBy(() ->
78+
conn3.createChannel(false))
79+
.withMessage("Auto recovery connection is not currently open");
8080
channel = conn2.createChannel(false);
81-
verifyChannelIs(channel2, channel);
81+
proxy = (ChannelProxy) channel;
82+
assertThat(proxy.getTargetChannel()).isSameAs(channel2);
8283
channel.close();
8384

8485
verify(rabbitConn, never()).close();
8586
verify(channel1).close(); // physically closed to defeat recovery
8687
}
8788

88-
private void verifyChannelIs(Channel mockChannel, Channel channel) {
89-
ChannelProxy proxy = (ChannelProxy) channel;
90-
assertThat(proxy.getTargetChannel()).isSameAs(mockChannel);
91-
}
92-
9389
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.core;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.BDDMockito.given;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.verify;
23+
24+
import java.io.IOException;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
27+
import org.junit.jupiter.api.Test;
28+
29+
import org.springframework.amqp.rabbit.connection.Connection;
30+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
31+
import org.springframework.beans.factory.annotation.Autowired;
32+
import org.springframework.context.ApplicationEventPublisher;
33+
import org.springframework.context.annotation.Bean;
34+
import org.springframework.context.annotation.Configuration;
35+
import org.springframework.test.annotation.DirtiesContext;
36+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
37+
import org.springframework.transaction.PlatformTransactionManager;
38+
import org.springframework.transaction.TransactionDefinition;
39+
import org.springframework.transaction.TransactionException;
40+
import org.springframework.transaction.annotation.EnableTransactionManagement;
41+
import org.springframework.transaction.annotation.Transactional;
42+
import org.springframework.transaction.event.TransactionPhase;
43+
import org.springframework.transaction.event.TransactionalEventListener;
44+
import org.springframework.transaction.support.AbstractPlatformTransactionManager;
45+
import org.springframework.transaction.support.DefaultTransactionStatus;
46+
47+
import com.rabbitmq.client.Channel;
48+
49+
/**
50+
* @author Gary Russell
51+
* @since 2.2.16
52+
*
53+
*/
54+
@SpringJUnitConfig
55+
@DirtiesContext
56+
public class TransactionalEventListenerTests {
57+
58+
@Test
59+
void txCommits(@Autowired Config config, @Autowired AtomicBoolean committed,
60+
@Autowired Channel channel) throws IOException {
61+
62+
config.publish();
63+
assertThat(committed.get()).isTrue();
64+
verify(channel).txCommit();
65+
}
66+
67+
@Configuration(proxyBeanMethods = false)
68+
@EnableTransactionManagement
69+
public static class Config {
70+
71+
@Autowired
72+
ApplicationEventPublisher publisher;
73+
74+
@Autowired
75+
RabbitTemplate template;
76+
77+
@Bean
78+
AtomicBoolean committed() {
79+
return new AtomicBoolean();
80+
}
81+
82+
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
83+
public void el(String in) {
84+
this.template.convertAndSend("test");
85+
}
86+
87+
@Bean
88+
RabbitTemplate template(ConnectionFactory cf) {
89+
RabbitTemplate template = new RabbitTemplate(cf);
90+
template.setChannelTransacted(true);
91+
return template;
92+
}
93+
94+
@Bean
95+
Channel channel() {
96+
return mock(Channel.class);
97+
}
98+
99+
@Bean
100+
ConnectionFactory cf(Channel channel) {
101+
ConnectionFactory cf = mock(ConnectionFactory.class);
102+
Connection conn = mock(Connection.class);
103+
given(conn.isOpen()).willReturn(true);
104+
given(cf.createConnection()).willReturn(conn);
105+
given(conn.createChannel(true)).willReturn(channel);
106+
given(channel.isOpen()).willReturn(true);
107+
return cf;
108+
}
109+
110+
@Transactional
111+
public void publish() {
112+
publisher.publishEvent("test");
113+
}
114+
115+
@Bean
116+
PlatformTransactionManager transactionManager(AtomicBoolean committed) {
117+
return new AbstractPlatformTransactionManager() {
118+
119+
@Override
120+
protected void doRollback(DefaultTransactionStatus status) throws TransactionException {
121+
}
122+
123+
@Override
124+
protected Object doGetTransaction() throws TransactionException {
125+
return new Object();
126+
}
127+
128+
@Override
129+
protected void doCommit(DefaultTransactionStatus status) throws TransactionException {
130+
committed.set(true);
131+
}
132+
133+
@Override
134+
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
135+
}
136+
};
137+
}
138+
139+
}
140+
141+
}

0 commit comments

Comments
 (0)