Skip to content

Commit 16f2d6e

Browse files
committed
Use ProtonJ2 modified API for credit state
1 parent 311be61 commit 16f2d6e

File tree

5 files changed

+45
-53
lines changed

5 files changed

+45
-53
lines changed

src/main/java/com/rabbitmq/model/amqp/AmqpConsumer.java

Lines changed: 19 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,16 @@
2222
import com.rabbitmq.model.Consumer;
2323
import com.rabbitmq.model.ModelException;
2424
import com.rabbitmq.model.metrics.MetricsCollector;
25-
import java.lang.reflect.Field;
26-
import java.lang.reflect.InvocationTargetException;
27-
import java.lang.reflect.Method;
28-
import java.util.Arrays;
2925
import java.util.concurrent.*;
3026
import java.util.concurrent.atomic.AtomicBoolean;
3127
import java.util.concurrent.atomic.AtomicLong;
3228
import java.util.concurrent.atomic.AtomicReference;
3329
import org.apache.qpid.protonj2.client.*;
3430
import org.apache.qpid.protonj2.client.exceptions.*;
35-
import org.apache.qpid.protonj2.client.impl.ClientLinkType;
36-
import org.apache.qpid.protonj2.client.impl.ClientReceiverLinkType;
31+
import org.apache.qpid.protonj2.client.impl.ClientReceiver;
3732
import org.apache.qpid.protonj2.client.util.DeliveryQueue;
3833
import org.apache.qpid.protonj2.engine.EventHandler;
3934
import org.apache.qpid.protonj2.engine.Scheduler;
40-
import org.apache.qpid.protonj2.engine.impl.ProtonLink;
4135
import org.apache.qpid.protonj2.engine.impl.ProtonLinkCreditState;
4236
import org.apache.qpid.protonj2.engine.impl.ProtonReceiver;
4337
import org.apache.qpid.protonj2.engine.impl.ProtonSessionIncomingWindow;
@@ -50,7 +44,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
5044

5145
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpConsumer.class);
5246

53-
private volatile Receiver nativeReceiver;
47+
private volatile ClientReceiver nativeReceiver;
5448
private final AtomicBoolean closed = new AtomicBoolean(false);
5549
private volatile Future<?> receiveLoop;
5650
private final int initialCredits;
@@ -94,15 +88,16 @@ public void close() {
9488

9589
// internal API
9690

97-
private Receiver createNativeReceiver(Session nativeSession, String address) {
91+
private ClientReceiver createNativeReceiver(Session nativeSession, String address) {
9892
try {
99-
return nativeSession.openReceiver(
100-
address,
101-
new ReceiverOptions()
102-
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
103-
.autoAccept(false)
104-
.autoSettle(false)
105-
.creditWindow(0));
93+
return (ClientReceiver)
94+
nativeSession.openReceiver(
95+
address,
96+
new ReceiverOptions()
97+
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
98+
.autoAccept(false)
99+
.autoSettle(false)
100+
.creditWindow(0));
106101
} catch (ClientException e) {
107102
throw ExceptionUtils.convert(e, "Error while creating receiver from '%s'", address);
108103
}
@@ -229,46 +224,19 @@ String address() {
229224
return this.address;
230225
}
231226

232-
static <T> T field(String name, Object obj) {
233-
return field(obj.getClass(), name, obj);
234-
}
235-
236-
@SuppressWarnings("unchecked")
237-
static <T> T field(Class<?> lookupClass, String name, Object obj) {
238-
try {
239-
Field field = lookupClass.getDeclaredField(name);
240-
field.setAccessible(true);
241-
return (T) field.get(obj);
242-
} catch (NoSuchFieldException | IllegalAccessException e) {
243-
throw new ModelException("Error during Java reflection operation", e);
244-
}
245-
}
246-
247-
@SuppressWarnings("unchecked")
248-
static <T> T invoke(Class<?> lookupClass, String name, Object obj, Object... args) {
249-
try {
250-
Class<?>[] argTypes = Arrays.stream(args).map(Object::getClass).toArray(Class[]::new);
251-
Method method = lookupClass.getDeclaredMethod(name, argTypes);
252-
method.setAccessible(true);
253-
return (T) method.invoke(obj, args);
254-
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) {
255-
throw new ModelException("Error during Java reflection operation", e);
256-
}
257-
}
258-
259-
private void initStateFromNativeReceiver(Receiver receiver) {
227+
private void initStateFromNativeReceiver(ClientReceiver receiver) {
260228
try {
261-
Scheduler protonExecutor = field(ClientLinkType.class, "executor", receiver);
229+
Scheduler protonExecutor = receiver.executor();
262230
CountDownLatch fieldsSetLatch = new CountDownLatch(1);
263231
protonExecutor.execute(
264232
() -> {
265-
this.protonReceiver = field(ClientReceiverLinkType.class, "protonReceiver", receiver);
266-
this.creditState = invoke(ProtonLink.class, "getCreditState", this.protonReceiver);
267-
this.sessionWindow = field("sessionWindow", this.protonReceiver);
268-
this.protonDeliveryQueue = field("deliveryQueue", receiver);
233+
this.protonReceiver = (ProtonReceiver) receiver.protonReceiver();
234+
this.creditState = this.protonReceiver.getCreditState();
235+
this.sessionWindow = this.protonReceiver.sessionWindow();
236+
this.protonDeliveryQueue = receiver.deliveryQueue();
269237

270238
EventHandler<org.apache.qpid.protonj2.engine.Receiver> eventHandler =
271-
field("linkCreditUpdatedHandler", this.protonReceiver);
239+
this.protonReceiver.linkCreditUpdatedHandler();
272240
EventHandler<org.apache.qpid.protonj2.engine.Receiver> decorator =
273241
target -> {
274242
eventHandler.handle(target);
@@ -326,7 +294,7 @@ void pause() {
326294
private void doPause() {
327295
this.creditState.updateCredit(0);
328296
this.creditState.updateEcho(true);
329-
invoke(this.sessionWindow.getClass(), "writeFlow", this.sessionWindow, this.protonReceiver);
297+
this.sessionWindow.writeFlow(this.protonReceiver);
330298
}
331299

332300
void unpause() {

src/main/qpid/org/apache/qpid/protonj2/client/impl/ClientReceiver.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@
2727
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
2828
import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException;
2929
import org.apache.qpid.protonj2.client.futures.ClientFuture;
30+
import org.apache.qpid.protonj2.client.util.DeliveryQueue;
3031
import org.apache.qpid.protonj2.client.util.FifoDeliveryQueue;
3132
import org.apache.qpid.protonj2.engine.IncomingDelivery;
33+
import org.apache.qpid.protonj2.engine.Scheduler;
34+
import org.apache.qpid.protonj2.engine.impl.ProtonReceiver;
3235
import org.apache.qpid.protonj2.types.messaging.Accepted;
3336
import org.apache.qpid.protonj2.types.messaging.Released;
3437
import org.slf4j.Logger;
@@ -240,4 +243,17 @@ protected void recreateLinkForReconnect() {
240243
protonReceiver.setLinkedResource(this);
241244
protonReceiver.addCredit(previousCredit);
242245
}
246+
247+
public Scheduler executor() {
248+
return this.executor;
249+
}
250+
251+
public org.apache.qpid.protonj2.engine.Receiver protonReceiver() {
252+
return this.protonReceiver;
253+
}
254+
255+
public DeliveryQueue deliveryQueue() {
256+
return this.deliveryQueue;
257+
}
258+
243259
}

src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonLink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ final L remoteDisposition(Disposition disposition, ProtonIncomingDelivery delive
723723

724724
//----- Internal methods
725725

726-
ProtonLinkCreditState getCreditState() {
726+
public ProtonLinkCreditState getCreditState() {
727727
return creditState;
728728
}
729729

src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonReceiver.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,4 +467,12 @@ private void verifyNewDeliveryIdSequence(Transfer transfer, DeliveryIdTracker cu
467467
currentDeliveryId + " and " + transfer.getDeliveryId()));
468468
}
469469
}
470+
471+
public ProtonSessionIncomingWindow sessionWindow() {
472+
return this.sessionWindow;
473+
}
474+
475+
public EventHandler<Receiver> linkCreditUpdatedHandler() {
476+
return this.linkCreditUpdatedHandler;
477+
}
470478
}

src/main/qpid/org/apache/qpid/protonj2/engine/impl/ProtonSessionIncomingWindow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ long updateIncomingWindow() {
195195
return incomingWindow;
196196
}
197197

198-
void writeFlow(ProtonReceiver link) {
198+
public void writeFlow(ProtonReceiver link) {
199199
updateIncomingWindow();
200200
session.writeFlow(link);
201201
}

0 commit comments

Comments
 (0)