Skip to content

Commit d909f8b

Browse files
Split DefaultExceptionHandler into strict and forgiving versions
Currently DefaultExceptionHandler will close the channel when an unhandled consumer or listener exception occurs. Not everybody agrees this is a great idea. This commit introduces a forgiving version and keeps DefaultExceptionHandler strict. For 3.6.0 we will make DefaultExceptionHandler forgiving.
1 parent 76d85cc commit d909f8b

File tree

4 files changed

+242
-118
lines changed

4 files changed

+242
-118
lines changed

src/com/rabbitmq/client/impl/DefaultExceptionHandler.java

Lines changed: 3 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -16,123 +16,11 @@
1616

1717
package com.rabbitmq.client.impl;
1818

19-
import java.io.IOException;
20-
import java.net.ConnectException;
21-
import java.util.concurrent.TimeoutException;
22-
23-
import com.rabbitmq.client.AMQP;
24-
import com.rabbitmq.client.AlreadyClosedException;
25-
import com.rabbitmq.client.Channel;
26-
import com.rabbitmq.client.Connection;
27-
import com.rabbitmq.client.Consumer;
2819
import com.rabbitmq.client.ExceptionHandler;
29-
import com.rabbitmq.client.TopologyRecoveryException;
3020

3121
/**
32-
* Default implementation of {@link com.rabbitmq.client.ExceptionHandler} used by {@link AMQConnection}.
22+
* Default implementation of {@link com.rabbitmq.client.ExceptionHandler}
23+
* used by {@link AMQConnection}.
3324
*/
34-
public class DefaultExceptionHandler implements ExceptionHandler {
35-
public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
36-
// TODO: Log this somewhere, just in case we have a bug like
37-
// 16272 where exceptions aren't being propagated properly
38-
// again.
39-
40-
//System.err.println("DefaultExceptionHandler:");
41-
//exception.printStackTrace();
42-
}
43-
44-
public void handleReturnListenerException(Channel channel, Throwable exception) {
45-
handleChannelKiller(channel, exception, "ReturnListener.handleReturn");
46-
}
47-
48-
public void handleFlowListenerException(Channel channel, Throwable exception) {
49-
handleChannelKiller(channel, exception, "FlowListener.handleFlow");
50-
}
51-
52-
public void handleConfirmListenerException(Channel channel, Throwable exception) {
53-
handleChannelKiller(channel, exception, "ConfirmListener.handle{N,A}ck");
54-
}
55-
56-
public void handleBlockedListenerException(Connection connection, Throwable exception) {
57-
handleConnectionKiller(connection, exception, "BlockedListener");
58-
}
59-
60-
public void handleConsumerException(Channel channel, Throwable exception,
61-
Consumer consumer, String consumerTag,
62-
String methodName)
63-
{
64-
handleChannelKiller(channel, exception, "Consumer " + consumer
65-
+ " (" + consumerTag + ")"
66-
+ " method " + methodName
67-
+ " for channel " + channel);
68-
}
69-
70-
/**
71-
* @since 3.3.0
72-
*/
73-
public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
74-
// ignore java.net.ConnectException as those are
75-
// expected during recovery and will only produce noisy
76-
// traces
77-
if (exception instanceof ConnectException) {
78-
// no-op
79-
} else {
80-
System.err.println("Caught an exception during connection recovery!");
81-
exception.printStackTrace(System.err);
82-
}
83-
}
84-
85-
/**
86-
* @since 3.3.0
87-
*/
88-
public void handleChannelRecoveryException(Channel ch, Throwable exception) {
89-
System.err.println("Caught an exception when recovering channel " + ch.getChannelNumber());
90-
exception.printStackTrace(System.err);
91-
}
92-
93-
/**
94-
* @since 3.3.0
95-
*/
96-
public void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception) {
97-
System.err.println("Caught an exception when recovering topology " + exception.getMessage());
98-
exception.printStackTrace(System.err);
99-
}
100-
101-
protected void handleChannelKiller(Channel channel, Throwable exception, String what) {
102-
// TODO: log the exception
103-
System.err.println("DefaultExceptionHandler: " + what + " threw an exception for channel "
104-
+ channel + ":");
105-
exception.printStackTrace();
106-
try {
107-
channel.close(AMQP.REPLY_SUCCESS, "Closed due to exception from " + what);
108-
} catch (AlreadyClosedException ace) {
109-
// noop
110-
} catch (TimeoutException ace) {
111-
// noop
112-
} catch (IOException ioe) {
113-
// TODO: log the failure
114-
System.err.println("Failure during close of channel " + channel + " after " + exception
115-
+ ":");
116-
ioe.printStackTrace();
117-
channel.getConnection().abort(AMQP.INTERNAL_ERROR, "Internal error closing channel for " + what);
118-
}
119-
}
120-
121-
protected void handleConnectionKiller(Connection connection, Throwable exception, String what) {
122-
// TODO: log the exception
123-
System.err.println("DefaultExceptionHandler: " + what + " threw an exception for connection "
124-
+ connection + ":");
125-
exception.printStackTrace();
126-
try {
127-
connection.close(AMQP.REPLY_SUCCESS, "Closed due to exception from " + what);
128-
} catch (AlreadyClosedException ace) {
129-
// noop
130-
} catch (IOException ioe) {
131-
// TODO: log the failure
132-
System.err.println("Failure during close of connection " + connection + " after " + exception
133-
+ ":");
134-
ioe.printStackTrace();
135-
connection.abort(AMQP.INTERNAL_ERROR, "Internal error closing connection for " + what);
136-
}
137-
}
25+
public class DefaultExceptionHandler extends StrictExceptionHandler implements ExceptionHandler {
13826
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is GoPivotal, Inc.
14+
// Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
15+
//
16+
17+
package com.rabbitmq.client.impl;
18+
19+
import java.io.IOException;
20+
import java.net.ConnectException;
21+
import java.util.concurrent.TimeoutException;
22+
23+
import com.rabbitmq.client.AMQP;
24+
import com.rabbitmq.client.AlreadyClosedException;
25+
import com.rabbitmq.client.Channel;
26+
import com.rabbitmq.client.Connection;
27+
import com.rabbitmq.client.Consumer;
28+
import com.rabbitmq.client.ExceptionHandler;
29+
import com.rabbitmq.client.TopologyRecoveryException;
30+
31+
/**
32+
* An implementation of {@link com.rabbitmq.client.ExceptionHandler} that does not
33+
* close channels on unhandled consumer and listener exception.
34+
*
35+
* Used by {@link AMQConnection}.
36+
*
37+
* @see ExceptionHandler
38+
* @see com.rabbitmq.client.ConnectionFactory#setExceptionHandler(com.rabbitmq.client.ExceptionHandler)
39+
*/
40+
public class ForgivingExceptionHandler implements ExceptionHandler {
41+
public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
42+
// TODO: Log this somewhere, just in case we have a bug like
43+
// 16272 where exceptions aren't being propagated properly
44+
// again.
45+
46+
//System.err.println("DefaultExceptionHandler:");
47+
//exception.printStackTrace();
48+
}
49+
50+
public void handleReturnListenerException(Channel channel, Throwable exception) {
51+
handleChannelKiller(channel, exception, "ReturnListener.handleReturn");
52+
}
53+
54+
public void handleFlowListenerException(Channel channel, Throwable exception) {
55+
handleChannelKiller(channel, exception, "FlowListener.handleFlow");
56+
}
57+
58+
public void handleConfirmListenerException(Channel channel, Throwable exception) {
59+
handleChannelKiller(channel, exception, "ConfirmListener.handle{N,A}ck");
60+
}
61+
62+
public void handleBlockedListenerException(Connection connection, Throwable exception) {
63+
handleConnectionKiller(connection, exception, "BlockedListener");
64+
}
65+
66+
public void handleConsumerException(Channel channel, Throwable exception,
67+
Consumer consumer, String consumerTag,
68+
String methodName)
69+
{
70+
handleChannelKiller(channel, exception, "Consumer " + consumer
71+
+ " (" + consumerTag + ")"
72+
+ " method " + methodName
73+
+ " for channel " + channel);
74+
}
75+
76+
/**
77+
* @since 3.3.0
78+
*/
79+
public void handleConnectionRecoveryException(Connection conn, Throwable exception) {
80+
// ignore java.net.ConnectException as those are
81+
// expected during recovery and will only produce noisy
82+
// traces
83+
if (exception instanceof ConnectException) {
84+
// no-op
85+
} else {
86+
System.err.println("Caught an exception during connection recovery!");
87+
exception.printStackTrace(System.err);
88+
}
89+
}
90+
91+
/**
92+
* @since 3.3.0
93+
*/
94+
public void handleChannelRecoveryException(Channel ch, Throwable exception) {
95+
System.err.println("Caught an exception when recovering channel " + ch.getChannelNumber());
96+
exception.printStackTrace(System.err);
97+
}
98+
99+
/**
100+
* @since 3.3.0
101+
*/
102+
public void handleTopologyRecoveryException(Connection conn, Channel ch, TopologyRecoveryException exception) {
103+
System.err.println("Caught an exception when recovering topology " + exception.getMessage());
104+
exception.printStackTrace(System.err);
105+
}
106+
107+
protected void handleChannelKiller(Channel channel, Throwable exception, String what) {
108+
System.err.println(this.getClass().getName() + ": " + what + " threw an exception for channel "
109+
+ channel + ":");
110+
exception.printStackTrace();
111+
}
112+
113+
protected void handleConnectionKiller(Connection connection, Throwable exception, String what) {
114+
// TODO: log the exception
115+
System.err.println("DefaultExceptionHandler: " + what + " threw an exception for connection "
116+
+ connection + ":");
117+
exception.printStackTrace();
118+
try {
119+
connection.close(AMQP.REPLY_SUCCESS, "Closed due to exception from " + what);
120+
} catch (AlreadyClosedException ace) {
121+
// noop
122+
} catch (IOException ioe) {
123+
// TODO: log the failure
124+
System.err.println("Failure during close of connection " + connection + " after " + exception
125+
+ ":");
126+
ioe.printStackTrace();
127+
connection.abort(AMQP.INTERNAL_ERROR, "Internal error closing connection for " + what);
128+
}
129+
}
130+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// The contents of this file are subject to the Mozilla Public License
2+
// Version 1.1 (the "License"); you may not use this file except in
3+
// compliance with the License. You may obtain a copy of the License
4+
// at http://www.mozilla.org/MPL/
5+
//
6+
// Software distributed under the License is distributed on an "AS IS"
7+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
8+
// the License for the specific language governing rights and
9+
// limitations under the License.
10+
//
11+
// The Original Code is RabbitMQ.
12+
//
13+
// The Initial Developer of the Original Code is GoPivotal, Inc.
14+
// Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
15+
//
16+
17+
package com.rabbitmq.client.impl;
18+
19+
import java.io.IOException;
20+
import java.net.ConnectException;
21+
import java.util.concurrent.TimeoutException;
22+
23+
import com.rabbitmq.client.AMQP;
24+
import com.rabbitmq.client.AlreadyClosedException;
25+
import com.rabbitmq.client.Channel;
26+
import com.rabbitmq.client.Connection;
27+
import com.rabbitmq.client.Consumer;
28+
import com.rabbitmq.client.ExceptionHandler;
29+
import com.rabbitmq.client.TopologyRecoveryException;
30+
31+
/**
32+
* An implementation of {@link com.rabbitmq.client.ExceptionHandler} that does not
33+
* close channels on unhandled consumer exception.
34+
*
35+
* Used by {@link AMQConnection}.
36+
*
37+
* @see ExceptionHandler
38+
* @see com.rabbitmq.client.ConnectionFactory#setExceptionHandler(ExceptionHandler)
39+
*/
40+
public class StrictExceptionHandler extends ForgivingExceptionHandler implements ExceptionHandler {
41+
public void handleReturnListenerException(Channel channel, Throwable exception) {
42+
handleChannelKiller(channel, exception, "ReturnListener.handleReturn");
43+
}
44+
45+
public void handleFlowListenerException(Channel channel, Throwable exception) {
46+
handleChannelKiller(channel, exception, "FlowListener.handleFlow");
47+
}
48+
49+
public void handleConfirmListenerException(Channel channel, Throwable exception) {
50+
handleChannelKiller(channel, exception, "ConfirmListener.handle{N,A}ck");
51+
}
52+
53+
public void handleBlockedListenerException(Connection connection, Throwable exception) {
54+
handleConnectionKiller(connection, exception, "BlockedListener");
55+
}
56+
57+
public void handleConsumerException(Channel channel, Throwable exception,
58+
Consumer consumer, String consumerTag,
59+
String methodName)
60+
{
61+
handleChannelKiller(channel, exception, "Consumer " + consumer
62+
+ " (" + consumerTag + ")"
63+
+ " method " + methodName
64+
+ " for channel " + channel);
65+
}
66+
67+
protected void handleChannelKiller(Channel channel, Throwable exception, String what) {
68+
System.err.println(this.getClass().getName() + ": " + what + " threw an exception for channel "
69+
+ channel + ":");
70+
exception.printStackTrace();
71+
try {
72+
channel.close(AMQP.REPLY_SUCCESS, "Closed due to exception from " + what);
73+
} catch (AlreadyClosedException ace) {
74+
// noop
75+
} catch (TimeoutException ace) {
76+
// noop
77+
} catch (IOException ioe) {
78+
// TODO: log the failure
79+
System.err.println("Failure during close of channel " + channel + " after " + exception
80+
+ ":");
81+
ioe.printStackTrace();
82+
channel.getConnection().abort(AMQP.INTERNAL_ERROR, "Internal error closing channel for " + what);
83+
}
84+
}
85+
}

test/src/com/rabbitmq/client/test/functional/ExceptionHandling.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.rabbitmq.client.Envelope;
1010
import com.rabbitmq.client.ExceptionHandler;
1111
import com.rabbitmq.client.impl.DefaultExceptionHandler;
12+
import com.rabbitmq.client.impl.ForgivingExceptionHandler;
1213
import junit.framework.TestCase;
1314

1415
import java.io.IOException;
@@ -23,15 +24,34 @@ private ConnectionFactory newConnectionFactory(ExceptionHandler eh) {
2324
return cf;
2425
}
2526

26-
public void testHandleConsumerException() throws IOException, InterruptedException, TimeoutException {
27+
public void testDefaultConsumerHandleConsumerException() throws IOException, InterruptedException, TimeoutException {
2728
final CountDownLatch latch = new CountDownLatch(1);
28-
final DefaultExceptionHandler eh = new DefaultExceptionHandler() {
29+
final ExceptionHandler eh = new DefaultExceptionHandler() {
2930
@Override
3031
public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) {
3132
super.handleConsumerException(channel, exception, consumer, consumerTag, methodName);
3233
latch.countDown();
3334
}
3435
};
36+
37+
testConsumerHandleConsumerException(eh, latch, true);
38+
}
39+
40+
public void testForgivingConsumerHandleConsumerException() throws IOException, InterruptedException, TimeoutException {
41+
final CountDownLatch latch = new CountDownLatch(1);
42+
final ExceptionHandler eh = new ForgivingExceptionHandler() {
43+
@Override
44+
public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) {
45+
super.handleConsumerException(channel, exception, consumer, consumerTag, methodName);
46+
latch.countDown();
47+
}
48+
};
49+
50+
testConsumerHandleConsumerException(eh, latch, false);
51+
}
52+
53+
protected void testConsumerHandleConsumerException(ExceptionHandler eh, CountDownLatch latch, boolean expectChannelClose)
54+
throws InterruptedException, TimeoutException, IOException {
3555
ConnectionFactory cf = newConnectionFactory(eh);
3656
assertEquals(cf.getExceptionHandler(), eh);
3757
Connection conn = cf.newConnection();
@@ -47,7 +67,8 @@ public void handleDelivery(String consumerTag, Envelope envelope,
4767
});
4868
ch.basicPublish("", q, null, "".getBytes());
4969
wait(latch);
50-
assertFalse(ch.isOpen());
70+
71+
assertEquals(!expectChannelClose, ch.isOpen());
5172
}
5273

5374
public void testNullExceptionHandler() {

0 commit comments

Comments
 (0)