@@ -46,14 +46,14 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
46
46
47
47
private static final Logger LOGGER = LoggerFactory .getLogger (AMQChannel .class );
48
48
49
- protected static final int NO_RPC_TIMEOUT = 0 ;
49
+ static final int NO_RPC_TIMEOUT = 0 ;
50
50
51
51
/**
52
52
* Protected; used instead of synchronizing on the channel itself,
53
53
* so that clients can themselves use the channel to synchronize
54
54
* on.
55
55
*/
56
- protected final Object _channelMutex = new Object ();
56
+ final Object _channelMutex = new Object ();
57
57
58
58
/** The connection this channel is associated with. */
59
59
private final AMQConnection _connection ;
@@ -68,10 +68,10 @@ public abstract class AMQChannel extends ShutdownNotifierComponent {
68
68
private RpcWrapper _activeRpc = null ;
69
69
70
70
/** Whether transmission of content-bearing methods should be blocked */
71
- protected volatile boolean _blockContent = false ;
71
+ volatile boolean _blockContent = false ;
72
72
73
73
/** Timeout for RPC calls */
74
- protected final int _rpcTimeout ;
74
+ final int _rpcTimeout ;
75
75
76
76
private final boolean _checkRpcResponseType ;
77
77
@@ -107,7 +107,7 @@ public int getChannelNumber() {
107
107
* @param frame the incoming frame
108
108
* @throws IOException if an error is encountered
109
109
*/
110
- public void handleFrame (Frame frame ) throws IOException {
110
+ void handleFrame (Frame frame ) throws IOException {
111
111
AMQCommand command = _command ;
112
112
if (command .handleFrame (frame )) { // a complete command has rolled off the assembly line
113
113
_command = new AMQCommand (); // prepare for the next one
@@ -126,9 +126,7 @@ public static IOException wrap(ShutdownSignalException ex) {
126
126
}
127
127
128
128
public static IOException wrap (ShutdownSignalException ex , String message ) {
129
- IOException ioe = new IOException (message );
130
- ioe .initCause (ex );
131
- return ioe ;
129
+ return new IOException (message , ex );
132
130
}
133
131
134
132
/**
@@ -148,7 +146,7 @@ public AMQCommand exnWrappingRpc(Method m)
148
146
}
149
147
}
150
148
151
- public CompletableFuture <Command > exnWrappingAsyncRpc (Method m )
149
+ CompletableFuture <Command > exnWrappingAsyncRpc (Method m )
152
150
throws IOException
153
151
{
154
152
try {
@@ -167,7 +165,7 @@ public CompletableFuture<Command> exnWrappingAsyncRpc(Method m)
167
165
* @throws IOException if there's any problem
168
166
*
169
167
* @param command the incoming command
170
- * @throws IOException
168
+ * @throws IOException when operation is interrupted by an I/O exception
171
169
*/
172
170
public void handleCompleteInboundCommand (AMQCommand command ) throws IOException {
173
171
// First, offer the command to the asynchronous-command
@@ -208,7 +206,7 @@ public void enqueueRpc(RpcContinuation k)
208
206
doEnqueueRpc (() -> new RpcContinuationRpcWrapper (k ));
209
207
}
210
208
211
- public void enqueueAsyncRpc (Method method , CompletableFuture <Command > future ) {
209
+ private void enqueueAsyncRpc (Method method , CompletableFuture <Command > future ) {
212
210
doEnqueueRpc (() -> new CompletableFutureRpcWrapper (method , future ));
213
211
}
214
212
@@ -230,7 +228,7 @@ private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
230
228
}
231
229
}
232
230
233
- public boolean isOutstandingRpc ()
231
+ boolean isOutstandingRpc ()
234
232
{
235
233
synchronized (_channelMutex ) {
236
234
return (_activeRpc != null );
@@ -251,7 +249,7 @@ protected void markRpcFinished() {
251
249
// no-op
252
250
}
253
251
254
- public void ensureIsOpen ()
252
+ private void ensureIsOpen ()
255
253
throws AlreadyClosedException
256
254
{
257
255
if (!isOpen ()) {
@@ -308,7 +306,7 @@ private void cleanRpcChannelState() {
308
306
}
309
307
310
308
/** Cleans RPC channel state after a timeout and wraps the TimeoutException in a ChannelContinuationTimeoutException */
311
- protected ChannelContinuationTimeoutException wrapTimeoutException (final Method m , final TimeoutException e ) {
309
+ ChannelContinuationTimeoutException wrapTimeoutException (final Method m , final TimeoutException e ) {
312
310
cleanRpcChannelState ();
313
311
return new ChannelContinuationTimeoutException (e , this , this ._channelNumber , m );
314
312
}
@@ -343,7 +341,7 @@ public void rpc(Method m, RpcContinuation k)
343
341
}
344
342
}
345
343
346
- public void quiescingRpc (Method m , RpcContinuation k )
344
+ void quiescingRpc (Method m , RpcContinuation k )
347
345
throws IOException
348
346
{
349
347
synchronized (_channelMutex ) {
@@ -352,7 +350,7 @@ public void quiescingRpc(Method m, RpcContinuation k)
352
350
}
353
351
}
354
352
355
- public void asyncRpc (Method m , CompletableFuture <Command > future )
353
+ private void asyncRpc (Method m , CompletableFuture <Command > future )
356
354
throws IOException
357
355
{
358
356
synchronized (_channelMutex ) {
@@ -361,7 +359,7 @@ public void asyncRpc(Method m, CompletableFuture<Command> future)
361
359
}
362
360
}
363
361
364
- public void quiescingAsyncRpc (Method m , CompletableFuture <Command > future )
362
+ private void quiescingAsyncRpc (Method m , CompletableFuture <Command > future )
365
363
throws IOException
366
364
{
367
365
synchronized (_channelMutex ) {
@@ -409,33 +407,33 @@ public void processShutdownSignal(ShutdownSignalException signal,
409
407
}
410
408
}
411
409
412
- public void notifyOutstandingRpc (ShutdownSignalException signal ) {
410
+ void notifyOutstandingRpc (ShutdownSignalException signal ) {
413
411
RpcWrapper k = nextOutstandingRpc ();
414
412
if (k != null ) {
415
413
k .shutdown (signal );
416
414
}
417
415
}
418
416
419
- public void transmit (Method m ) throws IOException {
417
+ protected void transmit (Method m ) throws IOException {
420
418
synchronized (_channelMutex ) {
421
419
transmit (new AMQCommand (m ));
422
420
}
423
421
}
424
422
425
- public void transmit (AMQCommand c ) throws IOException {
423
+ void transmit (AMQCommand c ) throws IOException {
426
424
synchronized (_channelMutex ) {
427
425
ensureIsOpen ();
428
426
quiescingTransmit (c );
429
427
}
430
428
}
431
429
432
- public void quiescingTransmit (Method m ) throws IOException {
430
+ void quiescingTransmit (Method m ) throws IOException {
433
431
synchronized (_channelMutex ) {
434
432
quiescingTransmit (new AMQCommand (m ));
435
433
}
436
434
}
437
435
438
- public void quiescingTransmit (AMQCommand c ) throws IOException {
436
+ private void quiescingTransmit (AMQCommand c ) throws IOException {
439
437
synchronized (_channelMutex ) {
440
438
if (c .getMethod ().hasContent ()) {
441
439
while (_blockContent ) {
@@ -468,16 +466,16 @@ public interface RpcContinuation {
468
466
}
469
467
470
468
public static abstract class BlockingRpcContinuation <T > implements RpcContinuation {
471
- public final BlockingValueOrException <T , ShutdownSignalException > _blocker =
472
- new BlockingValueOrException <T , ShutdownSignalException >();
469
+ final BlockingValueOrException <T , ShutdownSignalException > _blocker =
470
+ new BlockingValueOrException <>();
473
471
474
472
protected final Method request ;
475
473
476
- public BlockingRpcContinuation () {
474
+ BlockingRpcContinuation () {
477
475
request = null ;
478
476
}
479
477
480
- public BlockingRpcContinuation (final Method request ) {
478
+ BlockingRpcContinuation (final Method request ) {
481
479
this .request = request ;
482
480
}
483
481
@@ -496,7 +494,7 @@ public T getReply() throws ShutdownSignalException
496
494
return _blocker .uninterruptibleGetValue ();
497
495
}
498
496
499
- public T getReply (int timeout )
497
+ T getReply (int timeout )
500
498
throws ShutdownSignalException , TimeoutException
501
499
{
502
500
return _blocker .uninterruptibleGetValue (timeout );
@@ -509,7 +507,7 @@ public boolean canHandleReply(AMQCommand command) {
509
507
510
508
public abstract T transformReply (AMQCommand command );
511
509
512
- public static boolean isResponseCompatibleWithRequest (Method request , Method response ) {
510
+ static boolean isResponseCompatibleWithRequest (Method request , Method response ) {
513
511
// make a best effort attempt to ensure the reply was intended for this rpc request
514
512
// Ideally each rpc request would tag an id on it that could be returned and referenced on its reply.
515
513
// But because that would be a very large undertaking to add passively this logic at least protects against ClassCastExceptions
@@ -570,11 +568,11 @@ public static class SimpleBlockingRpcContinuation
570
568
extends BlockingRpcContinuation <AMQCommand >
571
569
{
572
570
573
- public SimpleBlockingRpcContinuation () {
571
+ SimpleBlockingRpcContinuation () {
574
572
super ();
575
573
}
576
574
577
- public SimpleBlockingRpcContinuation (final Method method ) {
575
+ SimpleBlockingRpcContinuation (final Method method ) {
578
576
super (method );
579
577
}
580
578
0 commit comments