Skip to content

Commit 7891645

Browse files
committed
Use a CompletableFuture for publishing settlement callback
Instead of a Future. There is no need to poll the settlement future, the callback is called asynchronously from the executor task. This improved the throughput by about 75%.
1 parent 615a5ba commit 7891645

File tree

6 files changed

+33
-42
lines changed

6 files changed

+33
-42
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -107,20 +107,22 @@ public void publish(Message message, Callback callback) {
107107
message,
108108
this.connectionInfo,
109109
publishCall);
110-
this.executorService.submit(
111-
() -> {
112-
Status status;
113-
try {
114-
// FIXME set a timeout for publishing settlement
115-
tracker.settlementFuture().get();
116-
status = mapDeliveryState(tracker.remoteState());
117-
} catch (InterruptedException | ExecutionException e) {
118-
status = Status.REJECTED;
119-
}
120-
DefaultContext defaultContext = new DefaultContext(message, status);
121-
this.metricsCollector.publishDisposition(mapToPublishDisposition(status));
122-
callback.handle(defaultContext);
123-
});
110+
tracker
111+
.settlementFuture()
112+
.handleAsync(
113+
(t, ex) -> {
114+
Status status;
115+
if (ex == null) {
116+
status = mapDeliveryState(t.remoteState());
117+
} else {
118+
status = Status.REJECTED;
119+
}
120+
DefaultContext defaultContext = new DefaultContext(message, status);
121+
this.metricsCollector.publishDisposition(mapToPublishDisposition(status));
122+
callback.handle(defaultContext);
123+
return null;
124+
},
125+
this.executorService);
124126
this.metricsCollector.publish();
125127
}
126128

src/main/qpid/org/apache/qpid/protonj2/client/Tracker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.qpid.protonj2.client;
1818

19+
import java.util.concurrent.CompletableFuture;
1920
import java.util.concurrent.Future;
2021
import java.util.concurrent.TimeUnit;
2122

@@ -90,7 +91,7 @@ public interface Tracker {
9091
*
9192
* @return a {@link Future} that can be used to wait on remote settlement.
9293
*/
93-
Future<Tracker> settlementFuture();
94+
CompletableFuture<Tracker> settlementFuture();
9495

9596
/**
9697
* Waits if necessary for the remote to settle the sent delivery unless it has

src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientNoOpTracker.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.qpid.protonj2.client.impl;
1818

19+
import java.util.concurrent.CompletableFuture;
1920
import java.util.concurrent.Future;
2021
import java.util.concurrent.TimeUnit;
2122

@@ -80,8 +81,10 @@ public boolean remoteSettled() {
8081
}
8182

8283
@Override
83-
public Future<Tracker> settlementFuture() {
84-
return ClientFutureFactory.completedFuture(this);
84+
public CompletableFuture<Tracker> settlementFuture() {
85+
CompletableFuture<Tracker> result = new CompletableFuture<>();
86+
result.complete(this);
87+
return result;
8588
}
8689

8790
@Override

src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ private void failPendingUnsettledAndBlockedSends(ClientException cause) {
214214
// Cancel all settlement futures for in-flight sends passing an appropriate error to the future
215215
protonSender.unsettled().forEach((delivery) -> {
216216
try {
217-
delivery.getLinkedResource(ClientTrackable.class).settlementFuture().failed(cause);
217+
delivery.getLinkedResource(ClientTrackable.class).settlementFuture().completeExceptionally(cause);
218218
} catch (Exception e) {
219219
}
220220
});

src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ private void failPendingUnsettledAndBlockedSends(ClientException cause) {
358358
// Cancel all settlement futures for in-flight sends passing an appropriate error to the future
359359
protonSender.unsettled().forEach((delivery) -> {
360360
try {
361-
delivery.getLinkedResource(ClientTrackable.class).settlementFuture().failed(cause);
361+
delivery.getLinkedResource(ClientTrackable.class).settlementFuture().completeExceptionally(cause);
362362
} catch (Exception e) {
363363
}
364364
});

src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientTrackable.java

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.qpid.protonj2.client.impl;
1919

2020
import java.util.Objects;
21+
import java.util.concurrent.CompletableFuture;
2122
import java.util.concurrent.ExecutionException;
2223
import java.util.concurrent.TimeUnit;
2324
import java.util.concurrent.TimeoutException;
@@ -49,7 +50,7 @@ public abstract class ClientTrackable<SenderType extends ClientSenderLinkType<?>
4950
protected static final AtomicReferenceFieldUpdater<ClientTrackable, DeliveryState> REMOTEL_DELIVERY_STATE_UPDATER =
5051
AtomicReferenceFieldUpdater.newUpdater(ClientTrackable.class, DeliveryState.class, "remoteDeliveryState");
5152

52-
private ClientFuture<TrackerType> remoteSettlementFuture;
53+
private final CompletableFuture<TrackerType> remoteSettlementFuture;
5354
private volatile int remotelySettled;
5455
private volatile DeliveryState remoteDeliveryState;
5556

@@ -64,6 +65,7 @@ public abstract class ClientTrackable<SenderType extends ClientSenderLinkType<?>
6465
ClientTrackable(SenderType sender, OutgoingDelivery delivery) {
6566
Objects.requireNonNull(sender, "Sender cannot be null for a Tracker");
6667

68+
this.remoteSettlementFuture = new CompletableFuture<>();
6769
this.sender = sender;
6870
this.delivery = delivery;
6971
this.delivery.deliveryStateUpdatedHandler(this::processDeliveryUpdated);
@@ -92,12 +94,7 @@ public TrackerType disposition(DeliveryState state, boolean settle) throws Clien
9294
sender.disposition(delivery, ClientDeliveryState.asProtonType(state), settle);
9395
} finally {
9496
if (settle) {
95-
synchronized (this) {
96-
if (remoteSettlementFuture == null) {
97-
remoteSettlementFuture = sender.session.connection().getFutureFactory().createFuture();
98-
}
99-
remoteSettlementFuture.complete(self());
100-
}
97+
remoteSettlementFuture.complete(self());
10198
}
10299
}
103100

@@ -108,12 +105,7 @@ public TrackerType settle() throws ClientException {
108105
try {
109106
sender.disposition(delivery, null, true);
110107
} finally {
111-
synchronized (this) {
112-
if (remoteSettlementFuture == null) {
113-
remoteSettlementFuture = sender.session.connection().getFutureFactory().createFuture();
114-
}
115-
remoteSettlementFuture.complete(self());
116-
}
108+
remoteSettlementFuture.complete(self());
117109
}
118110

119111
return self();
@@ -123,17 +115,10 @@ public synchronized boolean settled() {
123115
return delivery.isSettled();
124116
}
125117

126-
public ClientFuture<TrackerType> settlementFuture() {
127-
synchronized (this) {
128-
if (remoteSettlementFuture == null) {
129-
remoteSettlementFuture = sender.session.connection().getFutureFactory().createFuture();
130-
}
131-
132-
if (delivery.isSettled() || remoteSettled()) {
133-
remoteSettlementFuture.complete(self());
134-
}
118+
public CompletableFuture<TrackerType> settlementFuture() {
119+
if (delivery.isSettled() || remoteSettled()) {
120+
remoteSettlementFuture.complete(self());
135121
}
136-
137122
return remoteSettlementFuture;
138123
}
139124

0 commit comments

Comments
 (0)