Skip to content

Commit 4d2b9b8

Browse files
authored
GH-1362: Enable Capture of Tx Synchronization Fail
Resolves #1362 **cherry-pick to 2.3.x** * Add missing doc changes. * Fix test to reset flag.
1 parent 4c19d7c commit 4d2b9b8

File tree

4 files changed

+116
-2
lines changed

4 files changed

+116
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.amqp.rabbit.connection;
18+
19+
import org.springframework.amqp.AmqpException;
20+
21+
/**
22+
* Represents a failure to commit or rollback when performing afterCompletion
23+
* after the primary transaction completes.
24+
*
25+
* @author Gary Russell
26+
* @since 2.4
27+
*/
28+
public class AfterCompletionFailedException extends AmqpException {
29+
30+
private static final long serialVersionUID = 1L;
31+
32+
private final int syncStatus;
33+
34+
/**
35+
* Construct an instance with the provided properties.
36+
* @param syncStatus the synchronization status.
37+
* @param cause the cause.
38+
*/
39+
public AfterCompletionFailedException(int syncStatus, Throwable cause) {
40+
super(cause);
41+
this.syncStatus = syncStatus;
42+
}
43+
44+
/**
45+
* Return the synchronization status.
46+
* @return the status.
47+
*/
48+
public int getSyncStatus() {
49+
return this.syncStatus;
50+
}
51+
52+
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/ConnectionFactoryUtils.java

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.amqp.rabbit.connection;
1818

1919
import java.io.IOException;
20+
import java.util.function.Consumer;
2021

2122
import org.springframework.amqp.AmqpIOException;
2223
import org.springframework.lang.Nullable;
@@ -42,6 +43,10 @@
4243
*/
4344
public final class ConnectionFactoryUtils {
4445

46+
private static final ThreadLocal<AfterCompletionFailedException> failures = new ThreadLocal<>();
47+
48+
private static boolean captureAfterCompletionExceptions;
49+
4550
private ConnectionFactoryUtils() {
4651
}
4752

@@ -181,11 +186,42 @@ public static RabbitResourceHolder bindResourceToTransaction(RabbitResourceHolde
181186
resourceHolder.setSynchronizedWithTransaction(true);
182187
if (TransactionSynchronizationManager.isSynchronizationActive()) {
183188
TransactionSynchronizationManager.registerSynchronization(new RabbitResourceSynchronization(resourceHolder,
184-
connectionFactory));
189+
connectionFactory, ConnectionFactoryUtils::completionFailed));
185190
}
186191
return resourceHolder;
187192
}
188193

194+
private static void completionFailed(AfterCompletionFailedException ex) {
195+
if (captureAfterCompletionExceptions) {
196+
failures.set(ex);
197+
}
198+
}
199+
200+
/**
201+
* Call this method to enable capturing {@link AfterCompletionFailedException}s
202+
* when using transaction synchronization. Exceptions are stored in a {@link ThreadLocal}
203+
* which must be cleared by calling {@link #checkAfterCompletion()} after the transaction
204+
* has completed.
205+
* @param enable true to enable capture.
206+
*/
207+
public static void enableAfterCompletionFailureCapture(boolean enable) {
208+
captureAfterCompletionExceptions = enable;
209+
}
210+
211+
/**
212+
* When using transaction synchronization, call this method after the transaction commits to
213+
* verify that the RabbitMQ transaction committed.
214+
* @throws AfterCompletionFailedException if synchronization failed.
215+
* @since 2.3.10
216+
*/
217+
public static void checkAfterCompletion() {
218+
AfterCompletionFailedException ex = failures.get();
219+
if (ex != null) {
220+
failures.remove();
221+
throw ex;
222+
}
223+
}
224+
189225
public static void registerDeliveryTag(ConnectionFactory connectionFactory, Channel channel, Long tag) {
190226

191227
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
@@ -318,9 +354,14 @@ private static final class RabbitResourceSynchronization extends
318354

319355
private final RabbitResourceHolder resourceHolder;
320356

321-
RabbitResourceSynchronization(RabbitResourceHolder resourceHolder, Object resourceKey) {
357+
private final Consumer<AfterCompletionFailedException> afterCompletionCallback;
358+
359+
RabbitResourceSynchronization(RabbitResourceHolder resourceHolder, Object resourceKey,
360+
Consumer<AfterCompletionFailedException> afterCompletionCallback) {
361+
322362
super(resourceHolder, resourceKey);
323363
this.resourceHolder = resourceHolder;
364+
this.afterCompletionCallback = afterCompletionCallback;
324365
}
325366

326367
@Override
@@ -338,6 +379,9 @@ public void afterCompletion(int status) {
338379
this.resourceHolder.rollbackAll();
339380
}
340381
}
382+
catch (RuntimeException ex) {
383+
this.afterCompletionCallback.accept(new AfterCompletionFailedException(status, ex));
384+
}
341385
finally {
342386
if (this.resourceHolder.isReleaseAfterCompletion()) {
343387
this.resourceHolder.setSynchronizedWithTransaction(false);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplateTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.amqp.rabbit.core;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2021
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
2122
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2223
import static org.assertj.core.api.Assertions.fail;
@@ -62,8 +63,10 @@
6263
import org.springframework.amqp.core.ReceiveAndReplyCallback;
6364
import org.springframework.amqp.core.ReturnedMessage;
6465
import org.springframework.amqp.rabbit.connection.AbstractRoutingConnectionFactory;
66+
import org.springframework.amqp.rabbit.connection.AfterCompletionFailedException;
6567
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
6668
import org.springframework.amqp.rabbit.connection.ChannelProxy;
69+
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
6770
import org.springframework.amqp.rabbit.connection.PublisherCallbackChannel;
6871
import org.springframework.amqp.rabbit.connection.RabbitUtils;
6972
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
@@ -616,6 +619,7 @@ void resourcesClearedAfterTxFails() throws IOException, TimeoutException {
616619

617620
@Test
618621
void resourcesClearedAfterTxFailsWithSync() throws IOException, TimeoutException {
622+
ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true);
619623
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
620624
Connection mockConnection = mock(Connection.class);
621625
Channel mockChannel = mock(Channel.class);
@@ -638,6 +642,9 @@ void resourcesClearedAfterTxFailsWithSync() throws IOException, TimeoutException
638642
assertThatIllegalStateException()
639643
.isThrownBy(() -> (TransactionSynchronizationManager.getSynchronizations()).isEmpty())
640644
.withMessage("Transaction synchronization is not active");
645+
assertThatExceptionOfType(AfterCompletionFailedException.class)
646+
.isThrownBy(() -> ConnectionFactoryUtils.checkAfterCompletion());
647+
ConnectionFactoryUtils.enableAfterCompletionFailureCapture(false);
641648
}
642649

643650
@SuppressWarnings("serial")

src/reference/asciidoc/amqp.adoc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5670,6 +5670,17 @@ If you prefer XML configuration, you can declare the following bean in your XML
56705670
----
56715671
====
56725672

5673+
[[tx-sync]]
5674+
===== Transaction Synchronization
5675+
5676+
Synchronizing a RabbitMQ transaction with some other (e.g. DBMS) transaction provides "Best Effort One Phase Commit" semantics.
5677+
It is possible that the RabbitMQ transaction fails to commit during the after completion phase of transaction synchronization.
5678+
This is logged by the `spring-tx` infrastructure as an error, but no exception is thrown to the calling code.
5679+
Starting with version 2.3.10, you can call `ConnectionUtils.checkAfterCompletion()` after the transaction has committed on the same thread that processed the transaction.
5680+
It will simply return if no exception occurred; otherwise it will throw an `AfterCompletionFailedException` which will have a property representing the synchronization status of the completion.
5681+
5682+
Enable this feature by calling `ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)`; this is a global flag and applies to all threads.
5683+
56735684
[[containerAttributes]]
56745685
==== Message Listener Container Configuration
56755686

0 commit comments

Comments
 (0)