1
1
/*
2
- * Copyright 2002-2019 the original author or authors.
2
+ * Copyright 2002-2020 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
44
44
import org .apache .commons .logging .Log ;
45
45
import org .apache .commons .logging .LogFactory ;
46
46
47
+ import org .springframework .beans .factory .BeanFactory ;
47
48
import org .springframework .context .Lifecycle ;
48
49
import org .springframework .core .convert .ConversionService ;
49
50
import org .springframework .expression .Expression ;
53
54
import org .springframework .integration .handler .ExpressionEvaluatingMessageProcessor ;
54
55
import org .springframework .integration .jms .util .JmsAdapterUtils ;
55
56
import org .springframework .integration .support .AbstractIntegrationMessageBuilder ;
57
+ import org .springframework .integration .util .JavaUtils ;
56
58
import org .springframework .jms .connection .ConnectionFactoryUtils ;
57
59
import org .springframework .jms .listener .DefaultMessageListenerContainer ;
58
60
import org .springframework .jms .support .JmsUtils ;
@@ -520,61 +522,67 @@ protected void doInit() {
520
522
"Exactly one of 'requestDestination', 'requestDestinationName', " +
521
523
"or 'requestDestinationExpression' is required." );
522
524
ConversionService conversionService = getConversionService ();
525
+ BeanFactory beanFactory = getBeanFactory ();
523
526
if (this .requestDestinationExpressionProcessor != null ) {
524
- this .requestDestinationExpressionProcessor .setBeanFactory (getBeanFactory () );
527
+ this .requestDestinationExpressionProcessor .setBeanFactory (beanFactory );
525
528
if (conversionService != null ) {
526
529
this .requestDestinationExpressionProcessor .setConversionService (conversionService );
527
530
}
528
531
}
529
532
if (this .replyDestinationExpressionProcessor != null ) {
530
- this .replyDestinationExpressionProcessor .setBeanFactory (getBeanFactory () );
533
+ this .replyDestinationExpressionProcessor .setBeanFactory (beanFactory );
531
534
if (conversionService != null ) {
532
535
this .replyDestinationExpressionProcessor .setConversionService (conversionService );
533
536
}
534
537
}
535
- /*
536
- * This is needed because there is no way to detect 2 or more gateways using the same reply queue
537
- * with no correlation key.
538
- */
539
- boolean hasAReplyDest = this .replyDestination != null || this .replyDestinationName != null
540
- || this .replyDestinationExpressionProcessor != null ;
541
- if (this .useReplyContainer && (this .correlationKey == null && hasAReplyDest )) {
542
- logger .warn ("The gateway cannot use a reply listener container with a specified " +
543
- "destination(Name/Expression) " +
544
- "without a 'correlation-key'; " +
545
- "a container will NOT be used; " +
546
- "to avoid this problem, set the 'correlation-key' attribute; " +
547
- "some consumers, including the Spring Integration <jms:inbound-gateway/>, " +
548
- "support the use of the value 'JMSCorrelationID' " +
549
- "for this purpose. Alternatively, do not specify a reply destination " +
550
- "and a temporary queue will be used for replies." );
551
- this .useReplyContainer = false ;
552
- }
553
- if (this .useReplyContainer ) {
554
- Assert .state (!"JMSCorrelationID*" .equals (this .correlationKey ),
555
- "Using an existing 'JMSCorrelationID' mapped from the 'requestMessage' ('JMSCorrelationID*') " +
556
- "can't be used when using a 'reply-container'" );
557
- GatewayReplyListenerContainer container = new GatewayReplyListenerContainer ();
558
- setContainerProperties (container );
559
- container .afterPropertiesSet ();
560
- this .replyContainer = container ;
561
- if (isAsync () && this .correlationKey == null ) {
562
- logger .warn ("'async=true' requires a correlationKey; ignored" );
563
- setAsync (false );
564
- }
538
+ initializeReplyContainer ();
539
+ this .initialized = true ;
540
+ }
541
+ }
542
+
543
+ private void initializeReplyContainer () {
544
+ /*
545
+ * This is needed because there is no way to detect 2 or more gateways using the same reply queue
546
+ * with no correlation key.
547
+ */
548
+ boolean hasAReplyDest = this .replyDestination != null || this .replyDestinationName != null
549
+ || this .replyDestinationExpressionProcessor != null ;
550
+ if (this .useReplyContainer && (this .correlationKey == null && hasAReplyDest )) {
551
+ logger .warn ("The gateway cannot use a reply listener container with a specified " +
552
+ "destination(Name/Expression) " +
553
+ "without a 'correlation-key'; " +
554
+ "a container will NOT be used; " +
555
+ "to avoid this problem, set the 'correlation-key' attribute; " +
556
+ "some consumers, including the Spring Integration <jms:inbound-gateway/>, " +
557
+ "support the use of the value 'JMSCorrelationID' " +
558
+ "for this purpose. Alternatively, do not specify a reply destination " +
559
+ "and a temporary queue will be used for replies." );
560
+ this .useReplyContainer = false ;
561
+ }
562
+ if (this .useReplyContainer ) {
563
+ Assert .state (!"JMSCorrelationID*" .equals (this .correlationKey ),
564
+ "Using an existing 'JMSCorrelationID' mapped from the 'requestMessage' ('JMSCorrelationID*') " +
565
+ "can't be used when using a 'reply-container'" );
566
+ GatewayReplyListenerContainer container = new GatewayReplyListenerContainer ();
567
+ setContainerProperties (container );
568
+ container .afterPropertiesSet ();
569
+ this .replyContainer = container ;
570
+ if (isAsync () && this .correlationKey == null ) {
571
+ logger .warn ("'async=true' requires a correlationKey; ignored" );
572
+ setAsync (false );
565
573
}
566
- else {
567
- if ( isAsync ()) {
568
- logger . warn ( "'async=true' is ignored when a reply container is not being used" );
569
- setAsync ( false );
570
- }
574
+ }
575
+ else {
576
+ if ( isAsync ()) {
577
+ logger . warn ( "'async=true' is ignored when a reply container is not being used" );
578
+ setAsync ( false );
571
579
}
572
- this .initialized = true ;
573
580
}
574
581
}
575
582
576
583
private void setContainerProperties (GatewayReplyListenerContainer container ) {
577
584
container .setConnectionFactory (this .connectionFactory );
585
+
578
586
if (this .replyDestination != null ) {
579
587
container .setDestination (this .replyDestination );
580
588
}
@@ -594,61 +602,57 @@ else if (StringUtils.hasText(this.replyDestinationName)) {
594
602
container .setMessageSelector (messageSelector );
595
603
}
596
604
container .setMessageListener (this );
597
- if (this .replyContainerProperties != null ) {
598
- if (this .replyContainerProperties .isSessionTransacted () != null ) {
599
- container .setSessionTransacted (this .replyContainerProperties .isSessionTransacted ());
600
- }
601
- if (this .replyContainerProperties .getCacheLevel () != null ) {
602
- container .setCacheLevel (this .replyContainerProperties .getCacheLevel ());
603
- }
604
- if (this .replyContainerProperties .getConcurrentConsumers () != null ) {
605
- container .setConcurrentConsumers (this .replyContainerProperties .getConcurrentConsumers ());
606
- }
607
- if (this .replyContainerProperties .getIdleConsumerLimit () != null ) {
608
- container .setIdleConsumerLimit (this .replyContainerProperties .getIdleConsumerLimit ());
609
- }
610
- if (this .replyContainerProperties .getIdleTaskExecutionLimit () != null ) {
611
- container .setIdleTaskExecutionLimit (this .replyContainerProperties .getIdleTaskExecutionLimit ());
612
- }
613
- if (this .replyContainerProperties .getMaxConcurrentConsumers () != null ) {
614
- container .setMaxConcurrentConsumers (this .replyContainerProperties .getMaxConcurrentConsumers ());
615
- }
616
- if (this .replyContainerProperties .getMaxMessagesPerTask () != null ) {
617
- container .setMaxMessagesPerTask (this .replyContainerProperties .getMaxMessagesPerTask ());
618
- }
619
- if (this .replyContainerProperties .getReceiveTimeout () != null ) {
620
- container .setReceiveTimeout (this .replyContainerProperties .getReceiveTimeout ());
621
- }
622
- if (this .replyContainerProperties .getRecoveryInterval () != null ) {
623
- container .setRecoveryInterval (this .replyContainerProperties .getRecoveryInterval ());
624
- }
625
- if (StringUtils .hasText (this .replyContainerProperties .getSessionAcknowledgeModeName ())) {
626
- Integer acknowledgeMode = JmsAdapterUtils .parseAcknowledgeMode (
627
- this .replyContainerProperties .getSessionAcknowledgeModeName ());
628
- if (acknowledgeMode != null ) {
629
- if (JmsAdapterUtils .SESSION_TRANSACTED == acknowledgeMode ) {
630
- container .setSessionTransacted (true );
631
- }
632
- else {
633
- container .setSessionAcknowledgeMode (acknowledgeMode );
634
- }
635
- }
636
- }
637
- else if (this .replyContainerProperties .getSessionAcknowledgeMode () != null ) {
638
- Integer sessionAcknowledgeMode = this .replyContainerProperties .getSessionAcknowledgeMode ();
639
- if (Session .SESSION_TRANSACTED == sessionAcknowledgeMode ) {
640
- container .setSessionTransacted (true );
641
- }
642
- else {
643
- container .setSessionAcknowledgeMode (sessionAcknowledgeMode );
644
- }
645
-
646
- }
605
+ applyReplyContainerProperties (container );
606
+ }
647
607
648
- if (this .replyContainerProperties .getTaskExecutor () != null ) {
649
- container .setTaskExecutor (this .replyContainerProperties .getTaskExecutor ());
650
- }
651
- else {
608
+ private void applyReplyContainerProperties (GatewayReplyListenerContainer container ) {
609
+ if (this .replyContainerProperties != null ) {
610
+ JavaUtils .INSTANCE
611
+ .acceptIfNotNull (this .replyContainerProperties .isSessionTransacted (),
612
+ container ::setSessionTransacted )
613
+ .acceptIfNotNull (this .replyContainerProperties .getCacheLevel (),
614
+ container ::setCacheLevel )
615
+ .acceptIfNotNull (this .replyContainerProperties .getConcurrentConsumers (),
616
+ container ::setConcurrentConsumers )
617
+ .acceptIfNotNull (this .replyContainerProperties .getIdleConsumerLimit (),
618
+ container ::setIdleConsumerLimit )
619
+ .acceptIfNotNull (this .replyContainerProperties .getIdleTaskExecutionLimit (),
620
+ container ::setIdleTaskExecutionLimit )
621
+ .acceptIfNotNull (this .replyContainerProperties .getMaxConcurrentConsumers (),
622
+ container ::setMaxConcurrentConsumers )
623
+ .acceptIfNotNull (this .replyContainerProperties .getMaxMessagesPerTask (),
624
+ container ::setMaxMessagesPerTask )
625
+ .acceptIfNotNull (this .replyContainerProperties .getReceiveTimeout (),
626
+ container ::setReceiveTimeout )
627
+ .acceptIfNotNull (this .replyContainerProperties .getRecoveryInterval (),
628
+ container ::setRecoveryInterval )
629
+ .acceptIfHasText (this .replyContainerProperties .getSessionAcknowledgeModeName (),
630
+ acknowledgeModeName -> {
631
+ Integer acknowledgeMode = JmsAdapterUtils .parseAcknowledgeMode (
632
+ this .replyContainerProperties .getSessionAcknowledgeModeName ());
633
+ if (acknowledgeMode != null ) {
634
+ if (JmsAdapterUtils .SESSION_TRANSACTED == acknowledgeMode ) {
635
+ container .setSessionTransacted (true );
636
+ }
637
+ else {
638
+ container .setSessionAcknowledgeMode (acknowledgeMode );
639
+ }
640
+ }
641
+ })
642
+ .acceptIfNotNull (this .replyContainerProperties .getSessionAcknowledgeMode (),
643
+ acknowledgeMode -> {
644
+ if (Session .SESSION_TRANSACTED == acknowledgeMode ) {
645
+ container .setSessionTransacted (true );
646
+ }
647
+ else {
648
+ container .setSessionAcknowledgeMode (acknowledgeMode );
649
+ }
650
+ })
651
+ .acceptIfNotNull (this .replyContainerProperties .getTaskExecutor (),
652
+ container ::setTaskExecutor );
653
+
654
+
655
+ if (this .replyContainerProperties .getTaskExecutor () == null ) {
652
656
// set the beanName so the default TE threads get a meaningful name
653
657
String containerBeanName = this .getComponentName ();
654
658
containerBeanName = ((!StringUtils .hasText (containerBeanName )
@@ -985,8 +989,8 @@ private javax.jms.Message doSendAndReceiveWithMessageIdCorrelation(Destination r
985
989
* If the replyTo is not temporary, and the connection is lost while waiting for a reply, reconnect for
986
990
* up to receiveTimeout.
987
991
*/
988
- private javax .jms .Message retryableReceiveReply (Session session , Destination replyTo , String messageSelector )
989
- throws JMSException {
992
+ private javax .jms .Message retryableReceiveReply (Session session , Destination replyTo , // NOSONAR
993
+ String messageSelector ) throws JMSException {
990
994
991
995
Connection consumerConnection = null ; //NOSONAR
992
996
Session consumerSession = session ;
@@ -996,17 +1000,15 @@ private javax.jms.Message retryableReceiveReply(Session session, Destination rep
996
1000
long replyTimeout = isTemporaryReplyTo
997
1001
? Long .MIN_VALUE
998
1002
: this .receiveTimeout < 0
999
- ? Long .MAX_VALUE
1000
- : System .currentTimeMillis () + this .receiveTimeout ;
1003
+ ? Long .MAX_VALUE
1004
+ : System .currentTimeMillis () + this .receiveTimeout ;
1001
1005
try {
1002
1006
do {
1003
1007
try {
1004
1008
messageConsumer = consumerSession .createConsumer (replyTo , messageSelector );
1005
1009
javax .jms .Message reply = receiveReplyMessage (messageConsumer );
1006
- if (reply == null ) {
1007
- if (replyTimeout > System .currentTimeMillis ()) {
1008
- throw new JMSException ("Consumer closed before timeout" );
1009
- }
1010
+ if (reply == null && replyTimeout > System .currentTimeMillis ()) {
1011
+ throw new JMSException ("Consumer closed before timeout" );
1010
1012
}
1011
1013
return reply ;
1012
1014
}
@@ -1027,7 +1029,7 @@ private javax.jms.Message retryableReceiveReply(Session session, Destination rep
1027
1029
logger .debug ("Could not reconnect, retrying: " + ee .getMessage ());
1028
1030
}
1029
1031
try {
1030
- Thread .sleep (1000 );
1032
+ Thread .sleep (1000 ); // NOSONAR
1031
1033
}
1032
1034
catch (@ SuppressWarnings ("unused" ) InterruptedException e1 ) {
1033
1035
Thread .currentThread ().interrupt ();
@@ -1377,10 +1379,10 @@ public Destination getReplyDestination() {
1377
1379
}
1378
1380
else {
1379
1381
int n = 0 ;
1380
- while (this .replyDestination == null && n ++ < 100 ) {
1382
+ while (this .replyDestination == null && n ++ < 100 ) { // NOSONAR
1381
1383
logger .debug ("Waiting for container to create destination" );
1382
1384
try {
1383
- Thread .sleep (100 );
1385
+ Thread .sleep (100 ); // NOSONAR
1384
1386
}
1385
1387
catch (InterruptedException e ) {
1386
1388
Thread .currentThread ().interrupt ();
@@ -1488,16 +1490,15 @@ public void run() {
1488
1490
synchronized (JmsOutboundGateway .this .lifeCycleMonitor ) {
1489
1491
if (System .currentTimeMillis () - JmsOutboundGateway .this .lastSend >
1490
1492
JmsOutboundGateway .this .idleReplyContainerTimeout
1491
- && JmsOutboundGateway .this .replies .size () == 0 ) {
1493
+ && JmsOutboundGateway .this .replies .size () == 0 &&
1494
+ JmsOutboundGateway .this .replyContainer .isRunning ()) {
1492
1495
1493
- if (JmsOutboundGateway .this .replyContainer .isRunning ()) {
1494
- if (logger .isDebugEnabled ()) {
1495
- logger .debug (getComponentName () + ": Stopping idle reply container." );
1496
- }
1497
- JmsOutboundGateway .this .replyContainer .stop ();
1498
- JmsOutboundGateway .this .idleTask .cancel (false );
1499
- JmsOutboundGateway .this .idleTask = null ;
1496
+ if (logger .isDebugEnabled ()) {
1497
+ logger .debug (getComponentName () + ": Stopping idle reply container." );
1500
1498
}
1499
+ JmsOutboundGateway .this .replyContainer .stop ();
1500
+ JmsOutboundGateway .this .idleTask .cancel (false );
1501
+ JmsOutboundGateway .this .idleTask = null ;
1501
1502
}
1502
1503
}
1503
1504
}
0 commit comments