@@ -50,7 +50,7 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
50
50
private static final Logger LOGGER = LoggerFactory .getLogger (AMQConnection .class );
51
51
// we want socket write and channel shutdown timeouts to kick in after
52
52
// the heartbeat one, so we use a value of 105% of the effective heartbeat timeout
53
- public static final double CHANNEL_SHUTDOWN_TIMEOUT_MULTIPLIER = 1.05 ;
53
+ static final double CHANNEL_SHUTDOWN_TIMEOUT_MULTIPLIER = 1.05 ;
54
54
55
55
private final ExecutorService consumerWorkServiceExecutor ;
56
56
private final ScheduledExecutorService heartbeatExecutor ;
@@ -60,7 +60,7 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
60
60
private String id ;
61
61
62
62
private final List <RecoveryCanBeginListener > recoveryCanBeginListeners =
63
- Collections .synchronizedList (new ArrayList <RecoveryCanBeginListener >());
63
+ Collections .synchronizedList (new ArrayList <>());
64
64
65
65
private final ErrorOnWriteListener errorOnWriteListener ;
66
66
@@ -77,14 +77,14 @@ public class AMQConnection extends ShutdownNotifierComponent implements Connecti
77
77
* @see Connection#getClientProperties
78
78
*/
79
79
public static Map <String , Object > defaultClientProperties () {
80
- Map <String ,Object > props = new HashMap <String , Object >();
80
+ Map <String ,Object > props = new HashMap <>();
81
81
props .put ("product" , LongStringHelper .asLongString ("RabbitMQ" ));
82
82
props .put ("version" , LongStringHelper .asLongString (ClientVersion .VERSION ));
83
83
props .put ("platform" , LongStringHelper .asLongString ("Java" ));
84
84
props .put ("copyright" , LongStringHelper .asLongString (Copyright .COPYRIGHT ));
85
85
props .put ("information" , LongStringHelper .asLongString (Copyright .LICENSE ));
86
86
87
- Map <String , Object > capabilities = new HashMap <String , Object >();
87
+ Map <String , Object > capabilities = new HashMap <>();
88
88
capabilities .put ("publisher_confirms" , true );
89
89
capabilities .put ("exchange_exchange_bindings" , true );
90
90
capabilities .put ("basic.nack" , true );
@@ -117,7 +117,7 @@ public static Map<String, Object> defaultClientProperties() {
117
117
/** Object used for blocking main application thread when doing all the necessary
118
118
* connection shutdown operations
119
119
*/
120
- private final BlockingCell <Object > _appContinuation = new BlockingCell <Object >();
120
+ private final BlockingCell <Object > _appContinuation = new BlockingCell <>();
121
121
122
122
/** Flag indicating whether the client received Connection.Close message from the broker */
123
123
private volatile boolean _brokerInitiatedShutdown ;
@@ -137,7 +137,7 @@ public static Map<String, Object> defaultClientProperties() {
137
137
private final int handshakeTimeout ;
138
138
private final int shutdownTimeout ;
139
139
private final CredentialsProvider credentialsProvider ;
140
- private final Collection <BlockedListener > blockedListeners = new CopyOnWriteArrayList <BlockedListener >();
140
+ private final Collection <BlockedListener > blockedListeners = new CopyOnWriteArrayList <>();
141
141
protected final MetricsCollector metricsCollector ;
142
142
private final int channelRpcTimeout ;
143
143
private final boolean channelShouldCheckRpcResponseType ;
@@ -157,10 +157,10 @@ public static Map<String, Object> defaultClientProperties() {
157
157
private volatile Map <String , Object > _serverProperties ;
158
158
159
159
/**
160
- * Protected API - respond, in the driver thread, to a ShutdownSignal.
160
+ * Protected API - respond, in the main I/O loop thread, to a ShutdownSignal.
161
161
* @param channel the channel to disconnect
162
162
*/
163
- public final void disconnectChannel (ChannelN channel ) {
163
+ final void disconnectChannel (ChannelN channel ) {
164
164
ChannelManager cm = _channelManager ;
165
165
if (cm != null )
166
166
cm .releaseChannelNumber (channel );
@@ -367,15 +367,12 @@ public void start()
367
367
throw new PossibleAuthenticationFailureException (e );
368
368
}
369
369
} while (connTune == null );
370
- } catch (TimeoutException te ) {
370
+ } catch (TimeoutException | IOException te ) {
371
371
_frameHandler .close ();
372
372
throw te ;
373
373
} catch (ShutdownSignalException sse ) {
374
374
_frameHandler .close ();
375
375
throw AMQChannel .wrap (sse );
376
- } catch (IOException ioe ) {
377
- _frameHandler .close ();
378
- throw ioe ;
379
376
}
380
377
381
378
try {
@@ -509,7 +506,7 @@ public ThreadFactory getThreadFactory() {
509
506
510
507
@ Override
511
508
public Map <String , Object > getClientProperties () {
512
- return new HashMap <String , Object >(_clientProperties );
509
+ return new HashMap <>(_clientProperties );
513
510
}
514
511
515
512
@ Override
@@ -560,7 +557,7 @@ public Channel createChannel() throws IOException {
560
557
/**
561
558
* Public API - sends a frame directly to the broker.
562
559
*/
563
- public void writeFrame (Frame f ) throws IOException {
560
+ void writeFrame (Frame f ) throws IOException {
564
561
_frameHandler .writeFrame (f );
565
562
_heartbeatSender .signalActivity ();
566
563
}
@@ -755,6 +752,7 @@ public void addRecoveryCanBeginListener(RecoveryCanBeginListener fn) {
755
752
this .recoveryCanBeginListeners .add (fn );
756
753
}
757
754
755
+ @ SuppressWarnings ("unused" )
758
756
public void removeRecoveryCanBeginListener (RecoveryCanBeginListener fn ) {
759
757
this .recoveryCanBeginListeners .remove (fn );
760
758
}
@@ -842,7 +840,7 @@ public boolean processControlCommand(Command c) throws IOException
842
840
}
843
841
}
844
842
845
- public void handleConnectionClose (Command closeCommand ) {
843
+ private void handleConnectionClose (Command closeCommand ) {
846
844
ShutdownSignalException sse = shutdown (closeCommand .getMethod (), false , null , _inConnectionNegotiation );
847
845
try {
848
846
_channel0 .quiescingTransmit (new AMQP .Connection .CloseOk .Builder ().build ());
@@ -863,13 +861,13 @@ public void handleConnectionClose(Command closeCommand) {
863
861
}
864
862
}
865
863
866
- // same as ConnectionFactory.DEFAULT_SHUTDOWN_TIMEOUT
867
- private static long SOCKET_CLOSE_TIMEOUT = 10000 ;
868
-
869
864
private class SocketCloseWait implements Runnable {
865
+ // same as ConnectionFactory.DEFAULT_SHUTDOWN_TIMEOUT
866
+ private long SOCKET_CLOSE_TIMEOUT = 10000 ;
867
+
870
868
private final ShutdownSignalException cause ;
871
869
872
- public SocketCloseWait (ShutdownSignalException sse ) {
870
+ SocketCloseWait (ShutdownSignalException sse ) {
873
871
cause = sse ;
874
872
}
875
873
@@ -1055,12 +1053,9 @@ public AMQCommand transformReply(AMQCommand command) {
1055
1053
sse .initCause (cause );
1056
1054
throw sse ;
1057
1055
}
1058
- } catch (ShutdownSignalException sse ) {
1056
+ } catch (ShutdownSignalException | IOException sse ) {
1059
1057
if (!abort )
1060
1058
throw sse ;
1061
- } catch (IOException ioe ) {
1062
- if (!abort )
1063
- throw ioe ;
1064
1059
} finally {
1065
1060
if (sync ) _frameHandler .close ();
1066
1061
}
0 commit comments