Skip to content

Commit 73f7f35

Browse files
Merge pull request #191 from rabbitmq/prevent-socket-write-blocking
Prevent deadlock/hanging on JDK socket write
2 parents 7fc783b + bad2dd4 commit 73f7f35

File tree

5 files changed

+94
-16
lines changed

5 files changed

+94
-16
lines changed

src/main/java/com/rabbitmq/client/ConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -642,7 +642,7 @@ public MetricsCollector getMetricsCollector() {
642642
}
643643

644644
protected FrameHandlerFactory createFrameHandlerFactory() throws IOException {
645-
return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL());
645+
return new FrameHandlerFactory(connectionTimeout, factory, socketConf, isSSL(), this.shutdownExecutor);
646646
}
647647

648648
/**

src/main/java/com/rabbitmq/client/impl/AMQConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ final class Copyright {
4545
public class AMQConnection extends ShutdownNotifierComponent implements Connection, NetworkConnection {
4646

4747
private static final Logger LOGGER = LoggerFactory.getLogger(AMQConnection.class);
48+
// we want socket write and channel shutdown timeouts to kick in after
49+
// the heartbeat one, so we use a value of 105% of the effective heartbeat timeout
50+
public static final double CHANNEL_SHUTDOWN_TIMEOUT_MULTIPLIER = 1.05;
4851

4952
private final ExecutorService consumerWorkServiceExecutor;
5053
private final ScheduledExecutorService heartbeatExecutor;
@@ -393,6 +396,7 @@ public void start()
393396
protected ChannelManager instantiateChannelManager(int channelMax, ThreadFactory threadFactory) {
394397
ChannelManager result = new ChannelManager(this._workService, channelMax, threadFactory, this.metricsCollector);
395398
result.setShutdownExecutor(this.shutdownExecutor);
399+
result.setChannelShutdownTimeout((int) ((requestedHeartbeat * CHANNEL_SHUTDOWN_TIMEOUT_MULTIPLIER) * 1000));
396400
return result;
397401
}
398402

src/main/java/com/rabbitmq/client/impl/ChannelManager.java

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,28 @@
1515

1616
package com.rabbitmq.client.impl;
1717

18+
import com.rabbitmq.client.ConnectionFactory;
19+
import com.rabbitmq.client.MetricsCollector;
20+
import com.rabbitmq.client.NoOpMetricsCollector;
21+
import com.rabbitmq.client.ShutdownSignalException;
22+
import com.rabbitmq.utility.IntAllocator;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
1826
import java.io.IOException;
1927
import java.util.HashMap;
2028
import java.util.HashSet;
2129
import java.util.Map;
2230
import java.util.Set;
23-
import java.util.concurrent.CountDownLatch;
24-
import java.util.concurrent.ExecutorService;
25-
import java.util.concurrent.Executors;
26-
import java.util.concurrent.ThreadFactory;
27-
import java.util.concurrent.TimeUnit;
28-
29-
import com.rabbitmq.client.NoOpMetricsCollector;
30-
import com.rabbitmq.client.ShutdownSignalException;
31-
import com.rabbitmq.client.MetricsCollector;
32-
import com.rabbitmq.utility.IntAllocator;
31+
import java.util.concurrent.*;
3332

3433
/**
3534
* Manages a set of channels, indexed by channel number (<code><b>1.._channelMax</b></code>).
3635
*/
3736
public class ChannelManager {
37+
38+
private static final Logger LOGGER = LoggerFactory.getLogger(ChannelManager.class);
39+
3840
/** Monitor for <code>_channelMap</code> and <code>channelNumberAllocator</code> */
3941
private final Object monitor = new Object();
4042
/** Mapping from <code><b>1.._channelMax</b></code> to {@link ChannelN} instance */
@@ -50,6 +52,8 @@ public class ChannelManager {
5052
private ExecutorService shutdownExecutor;
5153
private final ThreadFactory threadFactory;
5254

55+
private int channelShutdownTimeout = (int) ((ConnectionFactory.DEFAULT_HEARTBEAT * 1.05) * 1000);
56+
5357
protected final MetricsCollector metricsCollector;
5458

5559
public int getChannelMax(){
@@ -97,14 +101,33 @@ public ChannelN getChannel(int channelNumber) {
97101
* Handle shutdown. All the managed {@link com.rabbitmq.client.Channel Channel}s are shutdown.
98102
* @param signal reason for shutdown
99103
*/
100-
public void handleSignal(ShutdownSignalException signal) {
104+
public void handleSignal(final ShutdownSignalException signal) {
101105
Set<ChannelN> channels;
102106
synchronized(this.monitor) {
103107
channels = new HashSet<ChannelN>(_channelMap.values());
104108
}
105-
for (ChannelN channel : channels) {
109+
110+
for (final ChannelN channel : channels) {
106111
releaseChannelNumber(channel);
107-
channel.processShutdownSignal(signal, true, true);
112+
// async shutdown if possible
113+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/194
114+
Runnable channelShutdownRunnable = new Runnable() {
115+
@Override
116+
public void run() {
117+
channel.processShutdownSignal(signal, true, true);
118+
}
119+
};
120+
if(this.shutdownExecutor == null) {
121+
channelShutdownRunnable.run();
122+
} else {
123+
Future<?> channelShutdownTask = this.shutdownExecutor.submit(channelShutdownRunnable);
124+
try {
125+
channelShutdownTask.get(channelShutdownTimeout, TimeUnit.MILLISECONDS);
126+
} catch (Exception e) {
127+
LOGGER.warn("Couldn't properly close channel {} on shutdown after waiting for {} ms", channel.getChannelNumber(), channelShutdownTimeout);
128+
channelShutdownTask.cancel(true);
129+
}
130+
}
108131
shutdownSet.add(channel.getShutdownLatch());
109132
channel.notifyListeners();
110133
}
@@ -225,4 +248,16 @@ public ExecutorService getShutdownExecutor() {
225248
public void setShutdownExecutor(ExecutorService shutdownExecutor) {
226249
this.shutdownExecutor = shutdownExecutor;
227250
}
251+
252+
/**
253+
* Set the shutdown timeout for channels.
254+
* This is the amount of time the manager waits for a channel to
255+
* shutdown before giving up.
256+
* Works only when the {@code shutdownExecutor} property is set.
257+
* Default to {@link com.rabbitmq.client.ConnectionFactory#DEFAULT_HEARTBEAT} + 5 % seconds
258+
* @param channelShutdownTimeout shutdown timeout in milliseconds
259+
*/
260+
public void setChannelShutdownTimeout(int channelShutdownTimeout) {
261+
this.channelShutdownTimeout = channelShutdownTimeout;
262+
}
228263
}

src/main/java/com/rabbitmq/client/impl/FrameHandlerFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,25 @@
2323
import java.io.IOException;
2424
import java.net.InetSocketAddress;
2525
import java.net.Socket;
26+
import java.util.concurrent.ExecutorService;
2627

2728
public class FrameHandlerFactory {
2829
private final int connectionTimeout;
2930
private final SocketFactory factory;
3031
private final SocketConfigurator configurator;
32+
private final ExecutorService shutdownExecutor;
3133
private final boolean ssl;
3234

3335
public FrameHandlerFactory(int connectionTimeout, SocketFactory factory, SocketConfigurator configurator, boolean ssl) {
36+
this(connectionTimeout, factory, configurator, ssl, null);
37+
}
38+
39+
public FrameHandlerFactory(int connectionTimeout, SocketFactory factory, SocketConfigurator configurator, boolean ssl, ExecutorService shutdownExecutor) {
3440
this.connectionTimeout = connectionTimeout;
3541
this.factory = factory;
3642
this.configurator = configurator;
3743
this.ssl = ssl;
44+
this.shutdownExecutor = shutdownExecutor;
3845
}
3946

4047
public FrameHandler create(Address addr) throws IOException {
@@ -55,7 +62,7 @@ public FrameHandler create(Address addr) throws IOException {
5562

5663
public FrameHandler create(Socket sock) throws IOException
5764
{
58-
return new SocketFrameHandler(sock);
65+
return new SocketFrameHandler(sock, this.shutdownExecutor);
5966
}
6067

6168
private static void quietTrySocketClose(Socket socket) {

src/main/java/com/rabbitmq/client/impl/SocketFrameHandler.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.net.InetAddress;
2424
import java.net.Socket;
2525
import java.net.SocketException;
26+
import java.util.concurrent.*;
2627

2728
import com.rabbitmq.client.AMQP;
2829

@@ -34,6 +35,11 @@ public class SocketFrameHandler implements FrameHandler {
3435
/** The underlying socket */
3536
private final Socket _socket;
3637

38+
/**
39+
* Optional {@link ExecutorService} for final flush.
40+
*/
41+
private final ExecutorService _shutdownExecutor;
42+
3743
/** Socket's inputstream - data from the broker - synchronized on */
3844
private final DataInputStream _inputStream;
3945

@@ -47,7 +53,15 @@ public class SocketFrameHandler implements FrameHandler {
4753
* @param socket the socket to use
4854
*/
4955
public SocketFrameHandler(Socket socket) throws IOException {
56+
this(socket, null);
57+
}
58+
59+
/**
60+
* @param socket the socket to use
61+
*/
62+
public SocketFrameHandler(Socket socket, ExecutorService shutdownExecutor) throws IOException {
5063
_socket = socket;
64+
_shutdownExecutor = shutdownExecutor;
5165

5266
_inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
5367
_outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
@@ -152,7 +166,25 @@ public void flush() throws IOException {
152166
@SuppressWarnings("unused")
153167
public void close() {
154168
try { _socket.setSoLinger(true, SOCKET_CLOSING_TIMEOUT); } catch (Exception _e) {}
155-
try { flush(); } catch (Exception _e) {}
169+
// async flush if possible
170+
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/194
171+
Callable<Void> flushCallable = new Callable<Void>() {
172+
@Override
173+
public Void call() throws Exception {
174+
flush();
175+
return null;
176+
}
177+
};
178+
try {
179+
if(this._shutdownExecutor == null) {
180+
flushCallable.call();
181+
} else {
182+
Future<Void> flushTask = this._shutdownExecutor.submit(flushCallable);
183+
flushTask.get(SOCKET_CLOSING_TIMEOUT, TimeUnit.SECONDS);
184+
}
185+
} catch(Exception e) {
186+
187+
}
156188
try { _socket.close(); } catch (Exception _e) {}
157189
}
158190
}

0 commit comments

Comments
 (0)