Skip to content

Commit 0dbcdaa

Browse files
Merge pull request #56 from ReactiveSocket/transport-requestn
Compose Request(n) – Transport/Application
2 parents b7c2c7d + 6878c96 commit 0dbcdaa

File tree

5 files changed

+517
-77
lines changed

5 files changed

+517
-77
lines changed

src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 75 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ private void start(final Completable responderCompletable) {
118118
/* state of cancellation subjects during connection */
119119
final Int2ObjectHashMap<Subscription> cancellationSubscriptions = new Int2ObjectHashMap<>();
120120
/* streams in flight that can receive REQUEST_N messages */
121-
final Int2ObjectHashMap<Subscription> inFlight = new Int2ObjectHashMap<>(); // TODO not being used
121+
final Int2ObjectHashMap<SubscriptionArbiter> inFlight = new Int2ObjectHashMap<>();
122122
/* bidirectional channels */
123123
final Int2ObjectHashMap<UnicastSubject<Payload>> channels = new Int2ObjectHashMap<>(); // TODO should/can we make this optional so that it only gets allocated per connection if channels are
124124
// used?
@@ -205,14 +205,14 @@ public void onNext(Frame requestFrame) {
205205
}
206206
return;
207207
} else if (requestFrame.getType() == FrameType.REQUEST_N) {
208-
Subscription inFlightSubscription = null;
208+
SubscriptionArbiter inFlightSubscription = null;
209209
synchronized (Responder.this)
210210
{
211211
inFlightSubscription = inFlight.get(requestFrame.getStreamId());
212212
}
213213
if (inFlightSubscription != null)
214214
{
215-
inFlightSubscription.request(Frame.RequestN.requestN(requestFrame));
215+
inFlightSubscription.addApplicationRequest(Frame.RequestN.requestN(requestFrame));
216216
return;
217217
}
218218
// TODO should we do anything if we don't find the stream? emitting an error is risky as the responder could have terminated and cleaned up already
@@ -407,15 +407,15 @@ private Publisher<Frame> handleRequestStream(
407407
Frame requestFrame,
408408
final RequestHandler requestHandler,
409409
final Int2ObjectHashMap<Subscription> cancellationSubscriptions,
410-
final Int2ObjectHashMap<Subscription> inFlight) {
410+
final Int2ObjectHashMap<SubscriptionArbiter> inFlight) {
411411
return _handleRequestStream(requestStreamHandler, requestFrame, requestHandler, cancellationSubscriptions, inFlight, true);
412412
}
413413

414414
private Publisher<Frame> handleRequestSubscription(
415415
Frame requestFrame,
416416
final RequestHandler requestHandler,
417417
final Int2ObjectHashMap<Subscription> cancellationSubscriptions,
418-
final Int2ObjectHashMap<Subscription> inFlight) {
418+
final Int2ObjectHashMap<SubscriptionArbiter> inFlight) {
419419
return _handleRequestStream(requestSubscriptionHandler, requestFrame, requestHandler, cancellationSubscriptions, inFlight, false);
420420
}
421421

@@ -434,7 +434,7 @@ private Publisher<Frame> _handleRequestStream(
434434
Frame requestFrame,
435435
final RequestHandler requestHandler,
436436
final Int2ObjectHashMap<Subscription> cancellationSubscriptions,
437-
final Int2ObjectHashMap<Subscription> inFlight,
437+
final Int2ObjectHashMap<SubscriptionArbiter> inFlight,
438438
final boolean allowCompletion) {
439439

440440
return new Publisher<Frame>() {
@@ -445,19 +445,25 @@ public void subscribe(Subscriber<? super Frame> child) {
445445

446446
final AtomicBoolean started = new AtomicBoolean(false);
447447
final AtomicReference<Subscription> parent = new AtomicReference<>();
448+
final SubscriptionArbiter arbiter = new SubscriptionArbiter();
448449

449450
@Override
450451
public void request(long n) {
451-
if (n > 0 && started.compareAndSet(false, true)) {
452+
if(n <= 0) {
453+
return;
454+
}
455+
if (started.compareAndSet(false, true)) {
456+
arbiter.addTransportRequest(n);
452457
final int streamId = requestFrame.getStreamId();
453458

454459
handler.apply(requestHandler, requestFrame).subscribe(new Subscriber<Payload>() {
455460

456461
@Override
457462
public void onSubscribe(Subscription s) {
458463
if (parent.compareAndSet(null, s)) {
459-
inFlight.put(streamId, s);
460-
s.request(Frame.Request.initialRequestN(requestFrame));
464+
inFlight.put(streamId, arbiter);
465+
arbiter.addApplicationRequest(Frame.Request.initialRequestN(requestFrame));
466+
arbiter.addApplicationProducer(s);
461467
} else {
462468
s.cancel();
463469
cleanup();
@@ -493,6 +499,8 @@ public void onComplete() {
493499
}
494500

495501
});
502+
} else {
503+
arbiter.addTransportRequest(n);
496504
}
497505
}
498506

@@ -561,7 +569,7 @@ private Publisher<Frame> handleRequestChannel(Frame requestFrame,
561569
RequestHandler requestHandler,
562570
Int2ObjectHashMap<UnicastSubject<Payload>> channels,
563571
Int2ObjectHashMap<Subscription> cancellationSubscriptions,
564-
Int2ObjectHashMap<Subscription> inFlight) {
572+
Int2ObjectHashMap<SubscriptionArbiter> inFlight) {
565573

566574
UnicastSubject<Payload> channelSubject = null;
567575
synchronized(Responder.this) {
@@ -576,10 +584,15 @@ public void subscribe(Subscriber<? super Frame> child) {
576584

577585
final AtomicBoolean started = new AtomicBoolean(false);
578586
final AtomicReference<Subscription> parent = new AtomicReference<>();
587+
final SubscriptionArbiter arbiter = new SubscriptionArbiter();
579588

580589
@Override
581590
public void request(long n) {
582-
if (n > 0 && started.compareAndSet(false, true)) {
591+
if(n <= 0) {
592+
return;
593+
}
594+
if (started.compareAndSet(false, true)) {
595+
arbiter.addTransportRequest(n);
583596
final int streamId = requestFrame.getStreamId();
584597

585598
// first request on this channel
@@ -609,8 +622,9 @@ public void request(long n) {
609622
@Override
610623
public void onSubscribe(Subscription s) {
611624
if (parent.compareAndSet(null, s)) {
612-
inFlight.put(streamId, s);
613-
s.request(Frame.Request.initialRequestN(requestFrame));
625+
inFlight.put(streamId, arbiter);
626+
arbiter.addApplicationRequest(Frame.Request.initialRequestN(requestFrame));
627+
arbiter.addApplicationProducer(s);
614628
} else {
615629
s.cancel();
616630
cleanup();
@@ -638,6 +652,8 @@ public void onComplete() {
638652
}
639653

640654
});
655+
} else {
656+
arbiter.addTransportRequest(n);
641657
}
642658
}
643659

@@ -681,5 +697,51 @@ private void cleanup() {
681697
}
682698
}
683699
}
700+
701+
private static class SubscriptionArbiter {
702+
private Subscription applicationProducer;
703+
private long appRequested = 0;
704+
private long transportRequested = 0;
705+
private long requestedToProducer = 0;
706+
707+
public void addApplicationRequest(long n) {
708+
synchronized(this) {
709+
appRequested += n;
710+
}
711+
tryRequest();
712+
}
713+
714+
public void addApplicationProducer(Subscription s) {
715+
synchronized(this) {
716+
applicationProducer = s;
717+
}
718+
tryRequest();
719+
}
720+
721+
public void addTransportRequest(long n) {
722+
synchronized(this) {
723+
transportRequested += n;
724+
}
725+
tryRequest();
726+
}
727+
728+
private void tryRequest() {
729+
long toRequest = 0;
730+
Subscription s = null;
731+
synchronized(this) {
732+
if(applicationProducer == null) {
733+
return;
734+
}
735+
s = applicationProducer;
736+
long minToRequest = Math.min(appRequested, transportRequested);
737+
toRequest = minToRequest - requestedToProducer;
738+
requestedToProducer += toRequest;
739+
}
740+
if(toRequest > 0) {
741+
s.request(toRequest);
742+
}
743+
}
744+
745+
}
684746

685747
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket;
17+
18+
import java.util.concurrent.CopyOnWriteArrayList;
19+
import java.util.function.Consumer;
20+
21+
import io.reactivesocket.observable.Observer;
22+
import io.reactivex.subjects.PublishSubject;
23+
import io.reactivex.subjects.Subject;
24+
25+
/**
26+
* Multicast eventbus that serializes incoming events.
27+
*/
28+
public class SerializedEventBus {
29+
30+
private final CopyOnWriteArrayList<Observer<Frame>> os = new CopyOnWriteArrayList<>();
31+
private Subject<Frame, Frame> s;
32+
33+
public SerializedEventBus() {
34+
s = PublishSubject.<Frame>create().toSerialized();
35+
s.subscribe(f-> {
36+
for (Observer<Frame> o : os) {
37+
o.onNext(f);
38+
}
39+
});
40+
}
41+
42+
public void send(Frame f) {
43+
s.onNext(f);
44+
}
45+
46+
public void add(Observer<Frame> o) {
47+
os.add(o);
48+
}
49+
50+
public void add(Consumer<Frame> f) {
51+
add(new Observer<Frame>() {
52+
53+
@Override
54+
public void onNext(Frame t) {
55+
f.accept(t);
56+
}
57+
58+
@Override
59+
public void onError(Throwable e) {
60+
61+
}
62+
63+
@Override
64+
public void onComplete() {
65+
66+
}
67+
68+
@Override
69+
public void onSubscribe(io.reactivesocket.observable.Disposable d) {
70+
// TODO Auto-generated method stub
71+
72+
}
73+
74+
});
75+
}
76+
77+
public void remove(Observer<Frame> o) {
78+
os.remove(o);
79+
}
80+
}

src/test/java/io/reactivesocket/TestConnection.java

Lines changed: 7 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,18 @@
1818
import static io.reactivex.Observable.*;
1919

2020
import java.io.IOException;
21-
import java.util.concurrent.CopyOnWriteArrayList;
22-
import java.util.function.Consumer;
2321

2422
import org.reactivestreams.Publisher;
2523

2624
import io.reactivesocket.observable.Observer;
2725
import io.reactivex.Observable;
2826
import io.reactivex.Scheduler.Worker;
2927
import io.reactivex.schedulers.Schedulers;
30-
import io.reactivex.subjects.PublishSubject;
31-
import io.reactivex.subjects.Subject;
3228

3329
public class TestConnection implements DuplexConnection {
3430

35-
public final Channel toInput = new Channel();
36-
public final Channel write = new Channel();
31+
public final SerializedEventBus toInput = new SerializedEventBus();
32+
public final SerializedEventBus write = new SerializedEventBus();
3733

3834
@Override
3935
public void addOutput(Publisher<Frame> o, Completable callback) {
@@ -70,6 +66,9 @@ public void connectToServerConnection(TestConnection serverConnection) {
7066
connectToServerConnection(serverConnection, true);
7167
}
7268

69+
Worker clientThread = Schedulers.newThread().createWorker();
70+
Worker serverThread = Schedulers.newThread().createWorker();
71+
7372
public void connectToServerConnection(TestConnection serverConnection, boolean log) {
7473
if (log) {
7574
serverConnection.write.add(n -> System.out.println("SERVER ==> Writes from server->client: " + n + " Written from " + Thread.currentThread()));
@@ -78,9 +77,6 @@ public void connectToServerConnection(TestConnection serverConnection, boolean l
7877
toInput.add(n -> System.out.println("CLIENT <== Input from server->client: " + n + " Read on " + Thread.currentThread()));
7978
}
8079

81-
Worker clientThread = Schedulers.newThread().createWorker();
82-
Worker serverThread = Schedulers.newThread().createWorker();
83-
8480
// client to server
8581
write.add(f -> {
8682
serverThread.schedule(() -> {
@@ -97,61 +93,8 @@ public void connectToServerConnection(TestConnection serverConnection, boolean l
9793

9894
@Override
9995
public void close() throws IOException {
100-
101-
}
102-
103-
public static class Channel {
104-
105-
private final CopyOnWriteArrayList<Observer<Frame>> os = new CopyOnWriteArrayList<>();
106-
private Subject<Frame, Frame> s;
107-
108-
public Channel() {
109-
s = PublishSubject.<Frame>create().toSerialized();
110-
s.subscribe(f-> {
111-
for (Observer<Frame> o : os) {
112-
o.onNext(f);
113-
}
114-
});
115-
}
116-
117-
public void send(Frame f) {
118-
s.onNext(f);
119-
}
120-
121-
public void add(Observer<Frame> o) {
122-
os.add(o);
123-
}
124-
125-
public void add(Consumer<Frame> f) {
126-
add(new Observer<Frame>() {
127-
128-
@Override
129-
public void onNext(Frame t) {
130-
f.accept(t);
131-
}
132-
133-
@Override
134-
public void onError(Throwable e) {
135-
136-
}
137-
138-
@Override
139-
public void onComplete() {
140-
141-
}
142-
143-
@Override
144-
public void onSubscribe(io.reactivesocket.observable.Disposable d) {
145-
// TODO Auto-generated method stub
146-
147-
}
148-
149-
});
150-
}
151-
152-
public void remove(Observer<Frame> o) {
153-
os.remove(o);
154-
}
96+
clientThread.dispose();
97+
serverThread.dispose();
15598
}
15699

157100
}

0 commit comments

Comments
 (0)