Skip to content

Commit 6878c96

Browse files
Responder RequestN Composed with Application/Transport
The request(n) of transport and application layer now compose.
1 parent 0dd9d28 commit 6878c96

File tree

2 files changed

+163
-17
lines changed

2 files changed

+163
-17
lines changed

src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 75 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ private void start(final Completable responderCompletable) {
118118
/* state of cancellation subjects during connection */
119119
final Int2ObjectHashMap<Subscription> cancellationSubscriptions = new Int2ObjectHashMap<>();
120120
/* streams in flight that can receive REQUEST_N messages */
121-
final Int2ObjectHashMap<Subscription> inFlight = new Int2ObjectHashMap<>(); // TODO not being used
121+
final Int2ObjectHashMap<SubscriptionArbiter> inFlight = new Int2ObjectHashMap<>();
122122
/* bidirectional channels */
123123
final Int2ObjectHashMap<UnicastSubject<Payload>> channels = new Int2ObjectHashMap<>(); // TODO should/can we make this optional so that it only gets allocated per connection if channels are
124124
// used?
@@ -205,14 +205,14 @@ public void onNext(Frame requestFrame) {
205205
}
206206
return;
207207
} else if (requestFrame.getType() == FrameType.REQUEST_N) {
208-
Subscription inFlightSubscription = null;
208+
SubscriptionArbiter inFlightSubscription = null;
209209
synchronized (Responder.this)
210210
{
211211
inFlightSubscription = inFlight.get(requestFrame.getStreamId());
212212
}
213213
if (inFlightSubscription != null)
214214
{
215-
inFlightSubscription.request(Frame.RequestN.requestN(requestFrame));
215+
inFlightSubscription.addApplicationRequest(Frame.RequestN.requestN(requestFrame));
216216
return;
217217
}
218218
// TODO should we do anything if we don't find the stream? emitting an error is risky as the responder could have terminated and cleaned up already
@@ -407,15 +407,15 @@ private Publisher<Frame> handleRequestStream(
407407
Frame requestFrame,
408408
final RequestHandler requestHandler,
409409
final Int2ObjectHashMap<Subscription> cancellationSubscriptions,
410-
final Int2ObjectHashMap<Subscription> inFlight) {
410+
final Int2ObjectHashMap<SubscriptionArbiter> inFlight) {
411411
return _handleRequestStream(requestStreamHandler, requestFrame, requestHandler, cancellationSubscriptions, inFlight, true);
412412
}
413413

414414
private Publisher<Frame> handleRequestSubscription(
415415
Frame requestFrame,
416416
final RequestHandler requestHandler,
417417
final Int2ObjectHashMap<Subscription> cancellationSubscriptions,
418-
final Int2ObjectHashMap<Subscription> inFlight) {
418+
final Int2ObjectHashMap<SubscriptionArbiter> inFlight) {
419419
return _handleRequestStream(requestSubscriptionHandler, requestFrame, requestHandler, cancellationSubscriptions, inFlight, false);
420420
}
421421

@@ -434,7 +434,7 @@ private Publisher<Frame> _handleRequestStream(
434434
Frame requestFrame,
435435
final RequestHandler requestHandler,
436436
final Int2ObjectHashMap<Subscription> cancellationSubscriptions,
437-
final Int2ObjectHashMap<Subscription> inFlight,
437+
final Int2ObjectHashMap<SubscriptionArbiter> inFlight,
438438
final boolean allowCompletion) {
439439

440440
return new Publisher<Frame>() {
@@ -445,19 +445,25 @@ public void subscribe(Subscriber<? super Frame> child) {
445445

446446
final AtomicBoolean started = new AtomicBoolean(false);
447447
final AtomicReference<Subscription> parent = new AtomicReference<>();
448+
final SubscriptionArbiter arbiter = new SubscriptionArbiter();
448449

449450
@Override
450451
public void request(long n) {
451-
if (n > 0 && started.compareAndSet(false, true)) {
452+
if(n <= 0) {
453+
return;
454+
}
455+
if (started.compareAndSet(false, true)) {
456+
arbiter.addTransportRequest(n);
452457
final int streamId = requestFrame.getStreamId();
453458

454459
handler.apply(requestHandler, requestFrame).subscribe(new Subscriber<Payload>() {
455460

456461
@Override
457462
public void onSubscribe(Subscription s) {
458463
if (parent.compareAndSet(null, s)) {
459-
inFlight.put(streamId, s);
460-
s.request(Frame.Request.initialRequestN(requestFrame));
464+
inFlight.put(streamId, arbiter);
465+
arbiter.addApplicationRequest(Frame.Request.initialRequestN(requestFrame));
466+
arbiter.addApplicationProducer(s);
461467
} else {
462468
s.cancel();
463469
cleanup();
@@ -493,6 +499,8 @@ public void onComplete() {
493499
}
494500

495501
});
502+
} else {
503+
arbiter.addTransportRequest(n);
496504
}
497505
}
498506

@@ -561,7 +569,7 @@ private Publisher<Frame> handleRequestChannel(Frame requestFrame,
561569
RequestHandler requestHandler,
562570
Int2ObjectHashMap<UnicastSubject<Payload>> channels,
563571
Int2ObjectHashMap<Subscription> cancellationSubscriptions,
564-
Int2ObjectHashMap<Subscription> inFlight) {
572+
Int2ObjectHashMap<SubscriptionArbiter> inFlight) {
565573

566574
UnicastSubject<Payload> channelSubject = null;
567575
synchronized(Responder.this) {
@@ -576,10 +584,15 @@ public void subscribe(Subscriber<? super Frame> child) {
576584

577585
final AtomicBoolean started = new AtomicBoolean(false);
578586
final AtomicReference<Subscription> parent = new AtomicReference<>();
587+
final SubscriptionArbiter arbiter = new SubscriptionArbiter();
579588

580589
@Override
581590
public void request(long n) {
582-
if (n > 0 && started.compareAndSet(false, true)) {
591+
if(n <= 0) {
592+
return;
593+
}
594+
if (started.compareAndSet(false, true)) {
595+
arbiter.addTransportRequest(n);
583596
final int streamId = requestFrame.getStreamId();
584597

585598
// first request on this channel
@@ -609,8 +622,9 @@ public void request(long n) {
609622
@Override
610623
public void onSubscribe(Subscription s) {
611624
if (parent.compareAndSet(null, s)) {
612-
inFlight.put(streamId, s);
613-
s.request(Frame.Request.initialRequestN(requestFrame));
625+
inFlight.put(streamId, arbiter);
626+
arbiter.addApplicationRequest(Frame.Request.initialRequestN(requestFrame));
627+
arbiter.addApplicationProducer(s);
614628
} else {
615629
s.cancel();
616630
cleanup();
@@ -638,6 +652,8 @@ public void onComplete() {
638652
}
639653

640654
});
655+
} else {
656+
arbiter.addTransportRequest(n);
641657
}
642658
}
643659

@@ -681,5 +697,51 @@ private void cleanup() {
681697
}
682698
}
683699
}
700+
701+
private static class SubscriptionArbiter {
702+
private Subscription applicationProducer;
703+
private long appRequested = 0;
704+
private long transportRequested = 0;
705+
private long requestedToProducer = 0;
706+
707+
public void addApplicationRequest(long n) {
708+
synchronized(this) {
709+
appRequested += n;
710+
}
711+
tryRequest();
712+
}
713+
714+
public void addApplicationProducer(Subscription s) {
715+
synchronized(this) {
716+
applicationProducer = s;
717+
}
718+
tryRequest();
719+
}
720+
721+
public void addTransportRequest(long n) {
722+
synchronized(this) {
723+
transportRequested += n;
724+
}
725+
tryRequest();
726+
}
727+
728+
private void tryRequest() {
729+
long toRequest = 0;
730+
Subscription s = null;
731+
synchronized(this) {
732+
if(applicationProducer == null) {
733+
return;
734+
}
735+
s = applicationProducer;
736+
long minToRequest = Math.min(appRequested, transportRequested);
737+
toRequest = minToRequest - requestedToProducer;
738+
requestedToProducer += toRequest;
739+
}
740+
if(toRequest > 0) {
741+
s.request(toRequest);
742+
}
743+
}
744+
745+
}
684746

685747
}

src/test/java/io/reactivesocket/TestTransportRequestN.java

Lines changed: 88 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.atomic.AtomicReference;
2727

2828
import org.junit.After;
29+
import org.junit.Ignore;
2930
import org.junit.Test;
3031
import org.reactivestreams.Publisher;
3132

@@ -38,9 +39,10 @@
3839
public class TestTransportRequestN {
3940

4041
@Test(timeout = 3000)
41-
public void testRequestNFromTransport() throws InterruptedException {
42+
public void testRequestStreamWithNFromTransport() throws InterruptedException {
43+
clientConnection = new TestConnectionWithControlledRequestN();
4244
serverConnection = new TestConnectionWithControlledRequestN();
43-
setup(serverConnection);
45+
setup(clientConnection, serverConnection);
4446

4547
TestSubscriber<Payload> ts = new TestSubscriber<>();
4648
fromPublisher(socketClient.requestStream(utf8EncodedPayload("", null)))
@@ -72,16 +74,97 @@ public void testRequestNFromTransport() throws InterruptedException {
7274
fail("Emitted more (" + serverConnection.emitted.get() + ") than transport requested (" + serverConnection.requested.get() + ")");
7375
}
7476
}
77+
78+
@Test(timeout = 3000)
79+
public void testRequestChannelDownstreamWithNFromTransport() throws InterruptedException {
80+
clientConnection = new TestConnectionWithControlledRequestN();
81+
serverConnection = new TestConnectionWithControlledRequestN();
82+
setup(clientConnection, serverConnection);
83+
84+
TestSubscriber<Payload> ts = new TestSubscriber<>();
85+
fromPublisher(socketClient.requestChannel(just(utf8EncodedPayload("", null))))
86+
.take(150)
87+
.subscribe(ts);
88+
89+
// wait for server to add output
90+
if (!serverConnection.awaitSubscription(1000)) {
91+
fail("Did not receive subscription");
92+
}
93+
// now request some data, but less than it is expected to output
94+
serverConnection.requestMore(10);
95+
96+
// since we are async, give time for emission to occur
97+
Thread.sleep(500);
98+
99+
// we should not have received more than 11 (10 + default 1 that is requested)
100+
101+
if (ts.valueCount() > 11) {
102+
fail("Received more (" + ts.valueCount() + ") than transport requested (11)");
103+
}
104+
105+
ts.cancel();
106+
107+
// since we are async, give time for emission to occur
108+
Thread.sleep(500);
109+
110+
if (serverConnection.emitted.get() > serverConnection.requested.get()) {
111+
fail("Emitted more (" + serverConnection.emitted.get() + ") than transport requested (" + serverConnection.requested.get() + ")");
112+
}
113+
}
114+
115+
// TODO come back after some other work (Ben)
116+
@Ignore
117+
@Test(timeout = 3000)
118+
public void testRequestChannelUpstreamWithNFromTransport() throws InterruptedException {
119+
clientConnection = new TestConnectionWithControlledRequestN();
120+
serverConnection = new TestConnectionWithControlledRequestN();
121+
setup(clientConnection, serverConnection);
122+
123+
TestSubscriber<Payload> ts = new TestSubscriber<>();
124+
fromPublisher(socketClient.requestChannel(range(0, 1000).map(i -> utf8EncodedPayload("" + i, null))))
125+
.take(10)
126+
.subscribe(ts);
127+
128+
// wait for server to add output
129+
if (!serverConnection.awaitSubscription(1000)) {
130+
fail("Did not receive subscription");
131+
}
132+
// now request some data, but less than it is expected to output
133+
serverConnection.requestMore(10);
134+
// clientConnection.requestMore(2);
135+
136+
// since we are async, give time for emission to occur
137+
Thread.sleep(500);
138+
139+
// we should not have received more than 11 (10 + default 1 that is requested)
140+
141+
if (ts.valueCount() > 11) {
142+
fail("Received more (" + ts.valueCount() + ") than transport requested (11)");
143+
}
144+
145+
ts.cancel();
146+
147+
// since we are async, give time for emission to occur
148+
Thread.sleep(500);
149+
150+
if (serverConnection.emitted.get() > serverConnection.requested.get()) {
151+
fail("Server Emitted more (" + serverConnection.emitted.get() + ") than transport requested (" + serverConnection.requested.get() + ")");
152+
}
153+
154+
if (clientConnection.emitted.get() > clientConnection.requested.get()) {
155+
fail("Client Emitted more (" + clientConnection.emitted.get() + ") than transport requested (" + clientConnection.requested.get() + ")");
156+
}
157+
}
75158

76159
private TestConnectionWithControlledRequestN serverConnection;
160+
private TestConnectionWithControlledRequestN clientConnection;
77161
private ReactiveSocket socketServer;
78162
private ReactiveSocket socketClient;
79163
private AtomicBoolean helloSubscriptionRunning = new AtomicBoolean(false);
80164
private AtomicReference<Throwable> lastServerError = new AtomicReference<Throwable>();
81165
private CountDownLatch lastServerErrorCountDown;
82166

83-
public void setup(TestConnectionWithControlledRequestN serverConnection) throws InterruptedException {
84-
TestConnection clientConnection = new TestConnection();
167+
public void setup(TestConnectionWithControlledRequestN clientConnection, TestConnectionWithControlledRequestN serverConnection) throws InterruptedException {
85168
clientConnection.connectToServerConnection(serverConnection, false);
86169
lastServerErrorCountDown = new CountDownLatch(1);
87170

@@ -146,6 +229,7 @@ public void shutdown() {
146229
socketServer.shutdown();
147230
socketClient.shutdown();
148231
try {
232+
clientConnection.close();
149233
serverConnection.close();
150234
} catch (IOException e) {
151235
e.printStackTrace();

0 commit comments

Comments
 (0)