Skip to content

Commit 0dd9d28

Browse files
Unit Test Showing Unbounded Transport
1 parent 162c506 commit 0dd9d28

File tree

4 files changed

+358
-64
lines changed

4 files changed

+358
-64
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket;
17+
18+
import java.util.concurrent.CopyOnWriteArrayList;
19+
import java.util.function.Consumer;
20+
21+
import io.reactivesocket.observable.Observer;
22+
import io.reactivex.subjects.PublishSubject;
23+
import io.reactivex.subjects.Subject;
24+
25+
/**
26+
* Multicast eventbus that serializes incoming events.
27+
*/
28+
public class SerializedEventBus {
29+
30+
private final CopyOnWriteArrayList<Observer<Frame>> os = new CopyOnWriteArrayList<>();
31+
private Subject<Frame, Frame> s;
32+
33+
public SerializedEventBus() {
34+
s = PublishSubject.<Frame>create().toSerialized();
35+
s.subscribe(f-> {
36+
for (Observer<Frame> o : os) {
37+
o.onNext(f);
38+
}
39+
});
40+
}
41+
42+
public void send(Frame f) {
43+
s.onNext(f);
44+
}
45+
46+
public void add(Observer<Frame> o) {
47+
os.add(o);
48+
}
49+
50+
public void add(Consumer<Frame> f) {
51+
add(new Observer<Frame>() {
52+
53+
@Override
54+
public void onNext(Frame t) {
55+
f.accept(t);
56+
}
57+
58+
@Override
59+
public void onError(Throwable e) {
60+
61+
}
62+
63+
@Override
64+
public void onComplete() {
65+
66+
}
67+
68+
@Override
69+
public void onSubscribe(io.reactivesocket.observable.Disposable d) {
70+
// TODO Auto-generated method stub
71+
72+
}
73+
74+
});
75+
}
76+
77+
public void remove(Observer<Frame> o) {
78+
os.remove(o);
79+
}
80+
}

src/test/java/io/reactivesocket/TestConnection.java

Lines changed: 7 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,18 @@
1818
import static io.reactivex.Observable.*;
1919

2020
import java.io.IOException;
21-
import java.util.concurrent.CopyOnWriteArrayList;
22-
import java.util.function.Consumer;
2321

2422
import org.reactivestreams.Publisher;
2523

2624
import io.reactivesocket.observable.Observer;
2725
import io.reactivex.Observable;
2826
import io.reactivex.Scheduler.Worker;
2927
import io.reactivex.schedulers.Schedulers;
30-
import io.reactivex.subjects.PublishSubject;
31-
import io.reactivex.subjects.Subject;
3228

3329
public class TestConnection implements DuplexConnection {
3430

35-
public final Channel toInput = new Channel();
36-
public final Channel write = new Channel();
31+
public final SerializedEventBus toInput = new SerializedEventBus();
32+
public final SerializedEventBus write = new SerializedEventBus();
3733

3834
@Override
3935
public void addOutput(Publisher<Frame> o, Completable callback) {
@@ -70,6 +66,9 @@ public void connectToServerConnection(TestConnection serverConnection) {
7066
connectToServerConnection(serverConnection, true);
7167
}
7268

69+
Worker clientThread = Schedulers.newThread().createWorker();
70+
Worker serverThread = Schedulers.newThread().createWorker();
71+
7372
public void connectToServerConnection(TestConnection serverConnection, boolean log) {
7473
if (log) {
7574
serverConnection.write.add(n -> System.out.println("SERVER ==> Writes from server->client: " + n + " Written from " + Thread.currentThread()));
@@ -78,9 +77,6 @@ public void connectToServerConnection(TestConnection serverConnection, boolean l
7877
toInput.add(n -> System.out.println("CLIENT <== Input from server->client: " + n + " Read on " + Thread.currentThread()));
7978
}
8079

81-
Worker clientThread = Schedulers.newThread().createWorker();
82-
Worker serverThread = Schedulers.newThread().createWorker();
83-
8480
// client to server
8581
write.add(f -> {
8682
serverThread.schedule(() -> {
@@ -97,61 +93,8 @@ public void connectToServerConnection(TestConnection serverConnection, boolean l
9793

9894
@Override
9995
public void close() throws IOException {
100-
101-
}
102-
103-
public static class Channel {
104-
105-
private final CopyOnWriteArrayList<Observer<Frame>> os = new CopyOnWriteArrayList<>();
106-
private Subject<Frame, Frame> s;
107-
108-
public Channel() {
109-
s = PublishSubject.<Frame>create().toSerialized();
110-
s.subscribe(f-> {
111-
for (Observer<Frame> o : os) {
112-
o.onNext(f);
113-
}
114-
});
115-
}
116-
117-
public void send(Frame f) {
118-
s.onNext(f);
119-
}
120-
121-
public void add(Observer<Frame> o) {
122-
os.add(o);
123-
}
124-
125-
public void add(Consumer<Frame> f) {
126-
add(new Observer<Frame>() {
127-
128-
@Override
129-
public void onNext(Frame t) {
130-
f.accept(t);
131-
}
132-
133-
@Override
134-
public void onError(Throwable e) {
135-
136-
}
137-
138-
@Override
139-
public void onComplete() {
140-
141-
}
142-
143-
@Override
144-
public void onSubscribe(io.reactivesocket.observable.Disposable d) {
145-
// TODO Auto-generated method stub
146-
147-
}
148-
149-
});
150-
}
151-
152-
public void remove(Observer<Frame> o) {
153-
os.remove(o);
154-
}
96+
clientThread.dispose();
97+
serverThread.dispose();
15598
}
15699

157100
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.reactivesocket;
17+
18+
import java.util.ArrayList;
19+
import java.util.Collections;
20+
import java.util.List;
21+
import java.util.concurrent.atomic.AtomicLong;
22+
23+
import org.reactivestreams.Publisher;
24+
import org.reactivestreams.Subscriber;
25+
import org.reactivestreams.Subscription;
26+
27+
/**
28+
* Connection that by defaults only calls request(1) on a Publisher to addOutput. Any further must be done via requestMore(n)
29+
* <p>
30+
* NOTE: This should ONLY be used for 1 test at a time as it maintains state. Call close() when done.
31+
*/
32+
public class TestConnectionWithControlledRequestN extends TestConnection {
33+
34+
public List<Subscription> subscriptions = Collections.synchronizedList(new ArrayList<Subscription>());
35+
public AtomicLong emitted = new AtomicLong();
36+
public AtomicLong requested = new AtomicLong();
37+
38+
@Override
39+
public void addOutput(Publisher<Frame> o, Completable callback) {
40+
System.out.println("TestConnectionWithControlledRequestN => addOutput");
41+
o.subscribe(new Subscriber<Frame>() {
42+
43+
volatile Subscription _s = null;
44+
public AtomicLong sEmitted = new AtomicLong();
45+
46+
@Override
47+
public void onSubscribe(Subscription s) {
48+
_s = new Subscription() {
49+
50+
@Override
51+
public void request(long n) {
52+
requested.addAndGet(n);
53+
s.request(n);
54+
}
55+
56+
@Override
57+
public void cancel() {
58+
subscriptions.remove(_s);
59+
s.cancel();
60+
}
61+
62+
};
63+
subscriptions.add(_s);
64+
_s.request(1);
65+
}
66+
67+
@Override
68+
public void onNext(Frame t) {
69+
emitted.incrementAndGet();
70+
sEmitted.incrementAndGet();
71+
write.send(t);
72+
}
73+
74+
@Override
75+
public void onError(Throwable t) {
76+
subscriptions.remove(_s);
77+
callback.error(t);
78+
}
79+
80+
@Override
81+
public void onComplete() {
82+
System.out.println("TestConnectionWithControlledRequestN => complete, emitted: " + sEmitted.get());
83+
subscriptions.remove(_s);
84+
callback.success();
85+
}
86+
87+
});
88+
89+
}
90+
91+
public boolean awaitSubscription(int timeInMillis) {
92+
long start = System.currentTimeMillis();
93+
while (subscriptions.size() == 0) {
94+
Thread.yield();
95+
if(System.currentTimeMillis() - start > timeInMillis) {
96+
return false;
97+
}
98+
}
99+
return true;
100+
}
101+
102+
/**
103+
* Request more against the first subscription. This will ONLY request against the oldest Subscription, one at a time.
104+
* <p>
105+
* When one completes, it does NOT propagate request(n) to the next. Thus, this assumes unit tests where you know what you are doing with request(n).
106+
*
107+
* @param n
108+
*/
109+
public void requestMore(int n) {
110+
if (subscriptions.size() == 0) {
111+
throw new IllegalStateException("no subscriptions to request from");
112+
}
113+
subscriptions.get(0).request(n);
114+
}
115+
116+
}

0 commit comments

Comments
 (0)