Skip to content

Commit e45327c

Browse files
nebhaleyschimke
authored andcommitted
Local Transport Improvements (#489)
This change makes general improvements to the Local Transport. It updates the code to make it better documented and tested. In addition it introduces the new TransportTest, a JUnit 5 way of testing uniformly across all of the transport implementations.
1 parent 492f1a5 commit e45327c

File tree

24 files changed

+948
-161
lines changed

24 files changed

+948
-161
lines changed

rsocket-core/src/main/java/io/rsocket/uri/UriHandler.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,37 @@
2222
import java.util.Optional;
2323
import java.util.ServiceLoader;
2424

25-
/**
26-
* URI to {@link ClientTransport} or {@link ServerTransport}. Should return a non empty value only
27-
* when the URI is unambiguously mapped to a particular transport, either by a standardised
28-
* implementation or via some flag in the URI to indicate a choice.
29-
*/
25+
/** Maps a {@link URI} to a {@link ClientTransport} or {@link ServerTransport}. */
3026
public interface UriHandler {
27+
28+
/**
29+
* Load all registered instances of {@code UriHandler}.
30+
*
31+
* @return all registered instances of {@code UriHandler}
32+
*/
3133
static ServiceLoader<UriHandler> loadServices() {
3234
return ServiceLoader.load(UriHandler.class);
3335
}
3436

35-
default Optional<ClientTransport> buildClient(URI uri) {
36-
return Optional.empty();
37-
}
37+
/**
38+
* Returns an implementation of {@link ClientTransport} unambiguously mapped to a {@link URI},
39+
* otherwise {@link Optional#EMPTY}.
40+
*
41+
* @param uri the uri to map
42+
* @return an implementation of {@link ClientTransport} unambiguously mapped to a {@link URI}, *
43+
* otherwise {@link Optional#EMPTY}
44+
* @throws NullPointerException if {@code uri} is {@code null}
45+
*/
46+
Optional<ClientTransport> buildClient(URI uri);
3847

39-
default Optional<ServerTransport> buildServer(URI uri) {
40-
return Optional.empty();
41-
}
48+
/**
49+
* Returns an implementation of {@link ServerTransport} unambiguously mapped to a {@link URI},
50+
* otherwise {@link Optional#EMPTY}.
51+
*
52+
* @param uri the uri to map
53+
* @return an implementation of {@link ServerTransport} unambiguously mapped to a {@link URI}, *
54+
* otherwise {@link Optional#EMPTY}
55+
* @throws NullPointerException if {@code uri} is {@code null}
56+
*/
57+
Optional<ServerTransport> buildServer(URI uri);
4258
}

rsocket-core/src/test/java/io/rsocket/uri/TestUriHandler.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,29 @@
1818

1919
import io.rsocket.test.util.TestDuplexConnection;
2020
import io.rsocket.transport.ClientTransport;
21+
import io.rsocket.transport.ServerTransport;
2122
import java.net.URI;
23+
import java.util.Objects;
2224
import java.util.Optional;
2325
import reactor.core.publisher.Mono;
2426

25-
public class TestUriHandler implements UriHandler {
27+
public final class TestUriHandler implements UriHandler {
28+
29+
private static final String SCHEME = "test";
30+
2631
@Override
2732
public Optional<ClientTransport> buildClient(URI uri) {
28-
if ("test".equals(uri.getScheme())) {
29-
return Optional.of(() -> Mono.just(new TestDuplexConnection()));
33+
Objects.requireNonNull(uri, "uri must not be null");
34+
35+
if (!SCHEME.equals(uri.getScheme())) {
36+
return Optional.empty();
3037
}
31-
return UriHandler.super.buildClient(uri);
38+
39+
return Optional.of(() -> Mono.just(new TestDuplexConnection()));
40+
}
41+
42+
@Override
43+
public Optional<ServerTransport> buildServer(URI uri) {
44+
return Optional.empty();
3245
}
3346
}

rsocket-test/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ dependencies {
2323

2424
implementation project(':rsocket-core')
2525
implementation 'com.google.code.findbugs:jsr305'
26+
implementation 'io.projectreactor:reactor-test'
27+
implementation 'org.assertj:assertj-core'
2628
implementation 'org.junit.jupiter:junit-jupiter-api'
2729
implementation 'org.mockito:mockito-core'
2830

rsocket-test/src/main/java/io/rsocket/test/TestFrames.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ public static ExtensionFrame createTestExtensionFrame() {
9393
/**
9494
* Returns a custom test {@link Frame}.
9595
*
96+
* @param frameType the type of frame
97+
* @param byteBuf the {@link ByteBuf} of content for this frame
9698
* @return a custom test {@link Frame}
9799
*/
98100
public static Frame createTestFrame(FrameType frameType, ByteBuf byteBuf) {
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
/*
2+
* Copyright 2015-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.test;
18+
19+
import io.rsocket.Closeable;
20+
import io.rsocket.Payload;
21+
import io.rsocket.RSocket;
22+
import io.rsocket.RSocketFactory;
23+
import io.rsocket.transport.ClientTransport;
24+
import io.rsocket.transport.ServerTransport;
25+
import io.rsocket.util.DefaultPayload;
26+
import java.time.Duration;
27+
import org.junit.jupiter.api.AfterEach;
28+
import org.junit.jupiter.api.DisplayName;
29+
import org.junit.jupiter.api.Test;
30+
import reactor.core.Disposable;
31+
import reactor.core.publisher.Flux;
32+
import reactor.core.publisher.Mono;
33+
import reactor.test.StepVerifier;
34+
35+
public interface TransportTest {
36+
37+
@AfterEach
38+
default void close() {
39+
getTransportPair().dispose();
40+
}
41+
42+
default Payload createTestPayload(int metadataPresent) {
43+
String metadata1;
44+
45+
switch (metadataPresent % 5) {
46+
case 0:
47+
metadata1 = null;
48+
break;
49+
case 1:
50+
metadata1 = "";
51+
break;
52+
default:
53+
metadata1 = "metadata";
54+
break;
55+
}
56+
String metadata = metadata1;
57+
58+
return DefaultPayload.create("test-data", metadata);
59+
}
60+
61+
@DisplayName("makes 10 fireAndForget requests")
62+
@Test
63+
default void fireAndForget10() {
64+
Flux.range(1, 10)
65+
.flatMap(i -> getClient().fireAndForget(createTestPayload(i)))
66+
.as(StepVerifier::create)
67+
.expectNextCount(0)
68+
.expectComplete()
69+
.verify(getTimeout());
70+
}
71+
72+
default RSocket getClient() {
73+
return getTransportPair().getClient();
74+
}
75+
76+
Duration getTimeout();
77+
78+
TransportPair getTransportPair();
79+
80+
@DisplayName("makes 10 metadataPush requests")
81+
@Test
82+
default void metadataPush10() {
83+
Flux.range(1, 10)
84+
.flatMap(i -> getClient().metadataPush(DefaultPayload.create("", "test-metadata")))
85+
.as(StepVerifier::create)
86+
.expectNextCount(0)
87+
.expectComplete()
88+
.verify(getTimeout());
89+
}
90+
91+
@DisplayName("makes 1 requestChannel request with 0 payloads")
92+
@Test
93+
default void requestChannel0() {
94+
getClient()
95+
.requestChannel(Flux.empty())
96+
.as(StepVerifier::create)
97+
.expectNextCount(0)
98+
.expectComplete()
99+
.verify(getTimeout());
100+
}
101+
102+
@DisplayName("makes 1 requestChannel request with 1 payloads")
103+
@Test
104+
default void requestChannel1() {
105+
getClient()
106+
.requestChannel(Mono.just(createTestPayload(0)))
107+
.as(StepVerifier::create)
108+
.expectNextCount(1)
109+
.expectComplete()
110+
.verify(getTimeout());
111+
}
112+
113+
@DisplayName("makes 1 requestChannel request with 200,000 payloads")
114+
@Test
115+
default void requestChannel200_000() {
116+
Flux<Payload> payloads = Flux.range(0, 200_000).map(this::createTestPayload);
117+
118+
getClient()
119+
.requestChannel(payloads)
120+
.as(StepVerifier::create)
121+
.expectNextCount(200_000)
122+
.expectComplete()
123+
.verify(getTimeout());
124+
}
125+
126+
@DisplayName("makes 1 requestChannel request with 20,000 payloads")
127+
@Test
128+
default void requestChannel20_000() {
129+
Flux<Payload> payloads = Flux.range(0, 20_000).map(this::createTestPayload);
130+
131+
getClient()
132+
.requestChannel(payloads)
133+
.as(StepVerifier::create)
134+
.expectNextCount(20_000)
135+
.expectComplete()
136+
.verify(getTimeout());
137+
}
138+
139+
@DisplayName("makes 1 requestChannel request with 2,000,000 payloads")
140+
@Test
141+
default void requestChannel2_000_000() {
142+
Flux<Payload> payloads = Flux.range(0, 2_000_000).map(this::createTestPayload);
143+
144+
getClient()
145+
.requestChannel(payloads)
146+
.as(StepVerifier::create)
147+
.expectNextCount(2_000_000)
148+
.expectComplete()
149+
.verify(getTimeout());
150+
}
151+
152+
@DisplayName("makes 1 requestChannel request with 3 payloads")
153+
@Test
154+
default void requestChannel3() {
155+
Flux<Payload> payloads = Flux.range(0, 3).map(this::createTestPayload);
156+
157+
getClient()
158+
.requestChannel(payloads)
159+
.as(StepVerifier::create)
160+
.expectNextCount(3)
161+
.expectComplete()
162+
.verify(getTimeout());
163+
}
164+
165+
@DisplayName("makes 1 requestChannel request with 512 payloads")
166+
@Test
167+
default void requestChannel512() {
168+
Flux<Payload> payloads = Flux.range(0, 512).map(this::createTestPayload);
169+
170+
getClient()
171+
.requestChannel(payloads)
172+
.as(StepVerifier::create)
173+
.expectNextCount(512)
174+
.expectComplete()
175+
.verify(getTimeout());
176+
}
177+
178+
@DisplayName("makes 1 requestResponse request")
179+
@Test
180+
default void requestResponse1() {
181+
getClient()
182+
.requestResponse(createTestPayload(1))
183+
.map(Payload::getDataUtf8)
184+
.as(StepVerifier::create)
185+
.expectNextCount(1)
186+
.expectComplete()
187+
.verify(getTimeout());
188+
}
189+
190+
@DisplayName("makes 10 requestResponse requests")
191+
@Test
192+
default void requestResponse10() {
193+
Flux.range(1, 10)
194+
.flatMap(i -> getClient().requestResponse(createTestPayload(i)).map(Payload::getDataUtf8))
195+
.as(StepVerifier::create)
196+
.expectNextCount(10)
197+
.expectComplete()
198+
.verify(getTimeout());
199+
}
200+
201+
@DisplayName("makes 100 requestResponse requests")
202+
@Test
203+
default void requestResponse100() {
204+
Flux.range(1, 100)
205+
.flatMap(i -> getClient().requestResponse(createTestPayload(i)).map(Payload::getDataUtf8))
206+
.as(StepVerifier::create)
207+
.expectNextCount(100)
208+
.expectComplete()
209+
.verify(getTimeout());
210+
}
211+
212+
@DisplayName("makes 10,000 requestResponse requests")
213+
@Test
214+
default void requestResponse10_000() {
215+
Flux.range(1, 10_000)
216+
.flatMap(i -> getClient().requestResponse(createTestPayload(i)).map(Payload::getDataUtf8))
217+
.as(StepVerifier::create)
218+
.expectNextCount(10_000)
219+
.expectComplete()
220+
.verify(getTimeout());
221+
}
222+
223+
@DisplayName("makes 1 requestStream request and receives 10,000 responses")
224+
@Test
225+
default void requestStream10_000() {
226+
getClient()
227+
.requestStream(createTestPayload(3))
228+
.as(StepVerifier::create)
229+
.expectNextCount(10_000)
230+
.expectComplete()
231+
.verify(getTimeout());
232+
}
233+
234+
@DisplayName("makes 1 requestStream request and receives 5 responses")
235+
@Test
236+
default void requestStream5() {
237+
getClient()
238+
.requestStream(createTestPayload(3))
239+
.take(5)
240+
.as(StepVerifier::create)
241+
.expectNextCount(5)
242+
.expectComplete()
243+
.verify(getTimeout());
244+
}
245+
246+
@DisplayName("makes 1 requestStream request and consumes result incrementally")
247+
@Test
248+
default void requestStreamDelayedRequestN() {
249+
getClient()
250+
.requestStream(createTestPayload(3))
251+
.take(10)
252+
.as(StepVerifier::create)
253+
.thenRequest(5)
254+
.expectNextCount(5)
255+
.thenRequest(5)
256+
.expectNextCount(5)
257+
.expectComplete()
258+
.verify(getTimeout());
259+
}
260+
261+
final class TransportPair implements Disposable {
262+
263+
private final RSocket client;
264+
265+
private final Closeable server;
266+
267+
public TransportPair(ServerTransport<?> serverTransport, ClientTransport clientTransport) {
268+
this.server =
269+
RSocketFactory.receive()
270+
.acceptor((setup, sendingSocket) -> Mono.just(new TestRSocket()))
271+
.transport(serverTransport)
272+
.start()
273+
.block();
274+
275+
this.client = RSocketFactory.connect().transport(clientTransport).start().block();
276+
}
277+
278+
@Override
279+
public void dispose() {
280+
server.dispose();
281+
}
282+
283+
RSocket getClient() {
284+
return client;
285+
}
286+
}
287+
}

0 commit comments

Comments
 (0)