23
23
import com .rabbitmq .model .ObservationCollector ;
24
24
import com .rabbitmq .model .Publisher ;
25
25
import com .rabbitmq .model .metrics .MetricsCollector ;
26
+
27
+ import java .time .Duration ;
26
28
import java .util .concurrent .*;
27
29
import java .util .concurrent .atomic .AtomicBoolean ;
28
30
import java .util .concurrent .atomic .AtomicLong ;
@@ -49,6 +51,7 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
49
51
private final ObservationCollector observationCollector ;
50
52
private final Function <Message , Tracker > publishCall ;
51
53
private final DefaultAddressBuilder .DestinationSpec destinationSpec ;
54
+ private final Duration publishTimeout ;
52
55
private volatile ObservationCollector .ConnectionInfo connectionInfo ;
53
56
54
57
AmqpPublisher (AmqpPublisherBuilder builder ) {
@@ -58,7 +61,9 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
58
61
this .address = builder .address ();
59
62
this .destinationSpec = builder .destination ();
60
63
this .connection = builder .connection ();
61
- this .sender = this .createSender (builder .connection ().nativeSession (), this .address );
64
+ this .publishTimeout = builder .publishTimeout ();
65
+ this .sender = this .createSender (builder .connection ().nativeSession (), this .address ,
66
+ this .publishTimeout );
62
67
this .metricsCollector = this .connection .metricsCollector ();
63
68
this .observationCollector = this .connection .observationCollector ();
64
69
this .state (OPEN );
@@ -122,7 +127,8 @@ public void publish(Message message, Callback callback) {
122
127
123
128
void recoverAfterConnectionFailure () {
124
129
this .connectionInfo = new Utils .ObservationConnectionInfo (this .connection .connectionAddress ());
125
- this .sender = this .createSender (this .connection .nativeSession (false ), this .address );
130
+ this .sender = this .createSender (this .connection .nativeSession (false ), this .address ,
131
+ this .publishTimeout );
126
132
}
127
133
128
134
@ Override
@@ -132,8 +138,10 @@ public void close() {
132
138
133
139
// internal API
134
140
135
- private Sender createSender (Session session , String address ) {
136
- SenderOptions senderOptions = new SenderOptions ().deliveryMode (DeliveryMode .AT_LEAST_ONCE );
141
+ private Sender createSender (Session session , String address , Duration publishTimeout ) {
142
+ SenderOptions senderOptions = new SenderOptions ().deliveryMode (DeliveryMode .AT_LEAST_ONCE )
143
+ .sendTimeout (publishTimeout .isNegative () ? ConnectionOptions .INFINITE :
144
+ publishTimeout .toMillis ());
137
145
try {
138
146
if (address == null ) {
139
147
return session .openAnonymousSender (senderOptions );
0 commit comments