Skip to content

Commit 02c02f6

Browse files
Merge pull request #40 from ReactiveSocket/TestConnection
new TestConnection
2 parents b8699a6 + 49a4b0d commit 02c02f6

File tree

4 files changed

+94
-80
lines changed

4 files changed

+94
-80
lines changed

src/test/java/io/reactivesocket/RequesterResponderInteractionTest.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -46,22 +46,9 @@ public class RequesterResponderInteractionTest
4646
public void setup() throws InterruptedException {
4747
System.out.println("-----------------------------------------------------------------------");
4848
TestConnection serverConnection = new TestConnection();
49-
serverConnection.writes.forEach(n -> System.out.println("Writes from server->client: " + n));
50-
serverConnection.toInput.forEach(n -> System.out.println("Input from client->server: " + n));
5149
TestConnection clientConnection = new TestConnection();
52-
clientConnection.writes.forEach(n -> System.out.println("Writes from client->server: " + n));
53-
clientConnection.toInput.forEach(n -> System.out.println("Input from server->client: " + n));
54-
55-
// connect the connections (with a Scheduler to simulate async IO)
56-
clientConnection.writes
57-
.subscribeOn(Schedulers.computation())
58-
.observeOn(Schedulers.computation())
59-
.subscribe(serverConnection.toInput);
60-
serverConnection.writes.observeOn(Schedulers.computation())
61-
.subscribeOn(Schedulers.computation())
62-
.observeOn(Schedulers.computation())
63-
.subscribe(clientConnection.toInput);
64-
50+
clientConnection.connectToServerConnection(serverConnection);
51+
6552
LatchedCompletable lc = new LatchedCompletable(2);
6653

6754
responder = Responder.create(serverConnection, setup -> new RequestHandler.Builder()

src/test/java/io/reactivesocket/TestConnection.java

Lines changed: 69 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,30 @@
1515
*/
1616
package io.reactivesocket;
1717

18+
import static io.reactivex.Observable.*;
19+
1820
import java.io.IOException;
21+
import java.util.concurrent.CopyOnWriteArrayList;
1922
import java.util.concurrent.CountDownLatch;
23+
import java.util.function.Consumer;
2024

2125
import org.reactivestreams.Publisher;
2226

23-
import static io.reactivex.Observable.*;
24-
2527
import io.reactivesocket.observable.Observer;
2628
import io.reactivex.Observable;
2729
import io.reactivex.Scheduler;
28-
import io.reactivex.disposables.Disposable;
2930
import io.reactivex.schedulers.Schedulers;
30-
import io.reactivex.subjects.PublishSubject;
31-
import io.reactivex.subjects.Subject;
3231

3332
public class TestConnection implements DuplexConnection {
3433

35-
public final PublishSubject<Frame> toInput = PublishSubject.create();
36-
private Subject<Frame, Frame> writeSubject = PublishSubject.<Frame>create().toSerialized();
37-
public final Observable<Frame> writes = writeSubject;
34+
public final Channel toInput = new Channel();
35+
public final Channel write = new Channel();
3836

3937
@Override
4038
public void addOutput(Publisher<Frame> o, Completable callback) {
4139
fromPublisher(o).flatMap(m -> {
4240
// no backpressure on a Subject so just firehosing for this test
43-
writeSubject.onNext(m);
41+
write.send(m);
4442
return Observable.<Void> empty();
4543
}).subscribe(v -> {}, callback::error, callback::success);
4644
}
@@ -51,61 +49,90 @@ public io.reactivesocket.observable.Observable<Frame> getInput() {
5149

5250
@Override
5351
public void subscribe(Observer<Frame> o) {
54-
final Disposable d = toInput.subscribe(f -> o.onNext(f));
52+
toInput.add(o);
5553
// we are okay with the race of sending data and cancelling ... since this is "hot" by definition and unsubscribing is a race.
5654
o.onSubscribe(new io.reactivesocket.observable.Disposable() {
5755

5856
@Override
5957
public void dispose() {
60-
d.dispose();
58+
toInput.remove(o);
6159
}
6260

6361
});
6462
}
65-
63+
6664
};
6765
}
6866

6967
public void connectToServerConnection(TestConnection serverConnection) {
7068
connectToServerConnection(serverConnection, true);
7169
}
72-
70+
7371
public void connectToServerConnection(TestConnection serverConnection, boolean log) {
7472
if (log) {
75-
serverConnection.writes.forEach(n -> System.out.println("SERVER ==> Writes from server->client: " + n + " Written from " + Thread.currentThread()));
76-
serverConnection.toInput.forEach(n -> System.out.println("SERVER <== Input from client->server: " + n + " Read on " + Thread.currentThread()));
77-
writes.forEach(n -> System.out.println("CLIENT ==> Writes from client->server: " + n + " Written from " + Thread.currentThread()));
78-
toInput.forEach(n -> System.out.println("CLIENT <== Input from server->client: " + n + " Read on " + Thread.currentThread()));
73+
serverConnection.write.add(n -> System.out.println("SERVER ==> Writes from server->client: " + n + " Written from " + Thread.currentThread()));
74+
serverConnection.toInput.add(n -> System.out.println("SERVER <== Input from client->server: " + n + " Read on " + Thread.currentThread()));
75+
write.add(n -> System.out.println("CLIENT ==> Writes from client->server: " + n + " Written from " + Thread.currentThread()));
76+
toInput.add(n -> System.out.println("CLIENT <== Input from server->client: " + n + " Read on " + Thread.currentThread()));
7977
}
80-
81-
Scheduler clientThread = Schedulers.newThread();
82-
Scheduler serverThread = Schedulers.newThread();
8378

84-
// connect the connections (with a Scheduler to simulate async IO)
85-
CountDownLatch c = new CountDownLatch(2);
86-
87-
writes
88-
.doOnSubscribe(t -> c.countDown())
89-
.subscribeOn(clientThread)
90-
.onBackpressureBuffer() // simulate unbounded network buffer
91-
.observeOn(serverThread)
92-
.subscribe(serverConnection.toInput);
93-
serverConnection.writes
94-
.doOnSubscribe(t -> c.countDown())
95-
.subscribeOn(serverThread)
96-
.onBackpressureBuffer() // simulate unbounded network buffer
97-
.observeOn(clientThread)
98-
.subscribe(toInput);
99-
100-
try {
101-
c.await();
102-
} catch (InterruptedException e) {
103-
e.printStackTrace();
104-
}
79+
// client to server
80+
write.add(f -> serverConnection.toInput.send(f));
81+
// server to client
82+
serverConnection.write.add(f -> toInput.send(f));
10583
}
10684

10785
@Override
10886
public void close() throws IOException {
109-
87+
88+
}
89+
90+
91+
public static class Channel {
92+
93+
private final CopyOnWriteArrayList<Observer<Frame>> os = new CopyOnWriteArrayList<>();
94+
95+
public void send(Frame f) {
96+
for (Observer<Frame> o : os) {
97+
o.onNext(f);
98+
}
99+
}
100+
101+
public void add(Observer<Frame> o) {
102+
os.add(o);
103+
}
104+
105+
public void add(Consumer<Frame> f) {
106+
add(new Observer<Frame>() {
107+
108+
@Override
109+
public void onNext(Frame t) {
110+
f.accept(t);
111+
}
112+
113+
@Override
114+
public void onError(Throwable e) {
115+
116+
}
117+
118+
@Override
119+
public void onComplete() {
120+
121+
}
122+
123+
@Override
124+
public void onSubscribe(io.reactivesocket.observable.Disposable d) {
125+
// TODO Auto-generated method stub
126+
127+
}
128+
129+
});
130+
}
131+
132+
public void remove(Observer<Frame> o) {
133+
os.remove(o);
134+
}
110135
}
136+
137+
111138
}

src/test/java/io/reactivesocket/internal/RequesterTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void testRequestResponseSuccess() throws InterruptedException {
6969
assertEquals(FrameType.REQUEST_RESPONSE, two.getType());
7070

7171
// now emit a response to ensure the Publisher receives and completes
72-
conn.toInput.onNext(utf8EncodedResponseFrame(2, FrameType.NEXT_COMPLETE, "world"));
72+
conn.toInput.send(utf8EncodedResponseFrame(2, FrameType.NEXT_COMPLETE, "world"));
7373

7474
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
7575
ts.assertValue(utf8EncodedPayload("world", null));
@@ -100,7 +100,7 @@ public void testRequestResponseError() throws InterruptedException {
100100
assertEquals("hello", byteToString(two.getData()));
101101
assertEquals(FrameType.REQUEST_RESPONSE, two.getType());
102102

103-
conn.toInput.onNext(Frame.Error.from(2, new RuntimeException("Failed")));
103+
conn.toInput.send(Frame.Error.from(2, new RuntimeException("Failed")));
104104
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
105105
ts.assertError(Exception.class);
106106
assertEquals("Failed", ts.errors().get(0).getMessage());
@@ -167,9 +167,9 @@ public void testRequestStreamSuccess() throws InterruptedException {
167167
// TODO assert initial requestN
168168

169169
// emit data
170-
conn.toInput.onNext(utf8EncodedResponseFrame(2, FrameType.NEXT, "hello"));
171-
conn.toInput.onNext(utf8EncodedResponseFrame(2, FrameType.NEXT, "world"));
172-
conn.toInput.onNext(utf8EncodedResponseFrame(2, FrameType.COMPLETE, ""));
170+
conn.toInput.send(utf8EncodedResponseFrame(2, FrameType.NEXT, "hello"));
171+
conn.toInput.send(utf8EncodedResponseFrame(2, FrameType.NEXT, "world"));
172+
conn.toInput.send(utf8EncodedResponseFrame(2, FrameType.COMPLETE, ""));
173173

174174
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
175175
ts.assertComplete();
@@ -203,8 +203,8 @@ public void testRequestStreamSuccessTake2AndCancel() throws InterruptedException
203203
// TODO assert initial requestN
204204

205205
// emit data
206-
conn.toInput.onNext(utf8EncodedResponseFrame(2, FrameType.NEXT, "hello"));
207-
conn.toInput.onNext(utf8EncodedResponseFrame(2, FrameType.NEXT, "world"));
206+
conn.toInput.send(utf8EncodedResponseFrame(2, FrameType.NEXT, "hello"));
207+
conn.toInput.send(utf8EncodedResponseFrame(2, FrameType.NEXT, "world"));
208208

209209
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
210210
ts.assertComplete();
@@ -246,8 +246,8 @@ public void testRequestStreamError() throws InterruptedException {
246246
// TODO assert initial requestN
247247

248248
// emit data
249-
conn.toInput.onNext(utf8EncodedResponseFrame(2, FrameType.NEXT, "hello"));
250-
conn.toInput.onNext(utf8EncodedErrorFrame(2, "Failure"));
249+
conn.toInput.send(utf8EncodedResponseFrame(2, FrameType.NEXT, "hello"));
250+
conn.toInput.send(utf8EncodedErrorFrame(2, "Failure"));
251251

252252
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
253253
ts.assertError(Exception.class);
@@ -269,7 +269,7 @@ private TestConnection establishConnection() {
269269
private ReplaySubject<Frame> captureRequests(TestConnection conn) {
270270
ReplaySubject<Frame> rs = ReplaySubject.create();
271271
rs.forEach(i -> System.out.println("capturedRequest => " + i));
272-
conn.writes.subscribe(rs);
272+
conn.write.add(rs::onNext);
273273
return rs;
274274
}
275275
}

src/test/java/io/reactivesocket/internal/ResponderTest.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void testRequestResponseSuccess() throws InterruptedException {
5555
sendSetupFrame(conn);
5656

5757
// perform a request/response
58-
conn.toInput.onNext(utf8EncodedRequestFrame(1, FrameType.REQUEST_RESPONSE, "hello", 128));
58+
conn.toInput.send(utf8EncodedRequestFrame(1, FrameType.REQUEST_RESPONSE, "hello", 128));
5959

6060
assertEquals(1, cachedResponses.getValues().length);// 1 onNext + 1 onCompleted
6161
List<Frame> frames = cachedResponses.take(1).toList().toBlocking().first();
@@ -80,7 +80,7 @@ public void testRequestResponseError() throws InterruptedException {
8080
sendSetupFrame(conn);
8181

8282
// perform a request/response
83-
conn.toInput.onNext(utf8EncodedRequestFrame(1, FrameType.REQUEST_RESPONSE, "hello", 128));
83+
conn.toInput.send(utf8EncodedRequestFrame(1, FrameType.REQUEST_RESPONSE, "hello", 128));
8484

8585
// assert
8686
Frame first = cachedResponses.toBlocking().first();
@@ -107,12 +107,12 @@ public void testRequestResponseCancel() throws InterruptedException {
107107
sendSetupFrame(conn);
108108

109109
// perform a request/response
110-
conn.toInput.onNext(utf8EncodedRequestFrame(1, FrameType.REQUEST_RESPONSE, "hello", 128));
110+
conn.toInput.send(utf8EncodedRequestFrame(1, FrameType.REQUEST_RESPONSE, "hello", 128));
111111
// assert no response
112112
assertFalse(cachedResponses.hasValue());
113113
// unsubscribe
114114
assertFalse(unsubscribed.get());
115-
conn.toInput.onNext(Frame.Cancel.from(1));
115+
conn.toInput.send(Frame.Cancel.from(1));
116116
assertTrue(unsubscribed.get());
117117
}
118118

@@ -130,7 +130,7 @@ public void testRequestStreamSuccess() throws InterruptedException {
130130
sendSetupFrame(conn);
131131

132132
// perform a request/response
133-
conn.toInput.onNext(utf8EncodedRequestFrame(1, FrameType.REQUEST_STREAM, "10", 128));
133+
conn.toInput.send(utf8EncodedRequestFrame(1, FrameType.REQUEST_STREAM, "10", 128));
134134

135135
// assert
136136
assertEquals(11, cachedResponses.getValues().length);// 10 onNext + 1 onCompleted
@@ -164,7 +164,7 @@ public void testRequestStreamError() throws InterruptedException {
164164
sendSetupFrame(conn);
165165

166166
// perform a request/response
167-
conn.toInput.onNext(utf8EncodedRequestFrame(1, FrameType.REQUEST_STREAM, "0", 128));
167+
conn.toInput.send(utf8EncodedRequestFrame(1, FrameType.REQUEST_STREAM, "0", 128));
168168

169169
// assert
170170
assertEquals(4, cachedResponses.getValues().length);// 3 onNext + 1 onError
@@ -197,7 +197,7 @@ public void testRequestStreamCancel() throws InterruptedException {
197197
sendSetupFrame(conn);
198198

199199
// perform a request/response
200-
conn.toInput.onNext(utf8EncodedRequestFrame(1, FrameType.REQUEST_STREAM, "/aRequest", 128));
200+
conn.toInput.send(utf8EncodedRequestFrame(1, FrameType.REQUEST_STREAM, "/aRequest", 128));
201201

202202
// no time has passed, so no values
203203
assertEquals(0, cachedResponses.getValues().length);
@@ -206,7 +206,7 @@ public void testRequestStreamCancel() throws InterruptedException {
206206
ts.advanceTimeBy(2000, TimeUnit.MILLISECONDS);
207207
assertEquals(3, cachedResponses.getValues().length);
208208
// dispose
209-
conn.toInput.onNext(Frame.Cancel.from(1));
209+
conn.toInput.send(Frame.Cancel.from(1));
210210
// still only 1 message
211211
assertEquals(3, cachedResponses.getValues().length);
212212
// advance again, nothing should happen
@@ -238,21 +238,21 @@ public void testMultiplexedStreams() throws InterruptedException {
238238
sendSetupFrame(conn);
239239

240240
// perform a request/response
241-
conn.toInput.onNext(utf8EncodedRequestFrame(1, FrameType.REQUEST_STREAM, "requestA", 128));
241+
conn.toInput.send(utf8EncodedRequestFrame(1, FrameType.REQUEST_STREAM, "requestA", 128));
242242

243243
// no time has passed, so no values
244244
assertEquals(0, cachedResponses.getValues().length);
245245
ts.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
246246
// we should have 1 message from A
247247
assertEquals(1, cachedResponses.getValues().length);
248248
// now request another stream
249-
conn.toInput.onNext(utf8EncodedRequestFrame(2, FrameType.REQUEST_STREAM, "requestB", 128));
249+
conn.toInput.send(utf8EncodedRequestFrame(2, FrameType.REQUEST_STREAM, "requestB", 128));
250250
// advance some more
251251
ts.advanceTimeBy(2000, TimeUnit.MILLISECONDS);
252252
// should have 3 from A and 2 from B
253253
assertEquals(5, cachedResponses.getValues().length);
254254
// dispose A, but leave B
255-
conn.toInput.onNext(Frame.Cancel.from(1));
255+
conn.toInput.send(Frame.Cancel.from(1));
256256
// still same 5 frames
257257
assertEquals(5, cachedResponses.getValues().length);
258258
// advance again, should get 2 from B
@@ -285,7 +285,7 @@ public void testMultiplexedStreams() throws InterruptedException {
285285
private ReplaySubject<Frame> captureResponses(TestConnection conn) {
286286
// capture all responses to client
287287
ReplaySubject<Frame> rs = ReplaySubject.create();
288-
conn.writes.subscribe(rs);
288+
conn.write.add(rs::onNext);
289289
return rs;
290290
}
291291

@@ -320,6 +320,6 @@ public void onComplete() {
320320

321321
private void sendSetupFrame(TestConnection conn) {
322322
// setup
323-
conn.toInput.onNext(Frame.Setup.from(0, 0, 0, "UTF-8", "UTF-8", utf8EncodedPayload("", "")));
323+
conn.toInput.send(Frame.Setup.from(0, 0, 0, "UTF-8", "UTF-8", utf8EncodedPayload("", "")));
324324
}
325325
}

0 commit comments

Comments
 (0)