Skip to content

Local Transport Improvements #489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions rsocket-core/src/main/java/io/rsocket/uri/UriHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,37 @@
import java.util.Optional;
import java.util.ServiceLoader;

/**
* URI to {@link ClientTransport} or {@link ServerTransport}. Should return a non empty value only
* when the URI is unambiguously mapped to a particular transport, either by a standardised
* implementation or via some flag in the URI to indicate a choice.
*/
/** Maps a {@link URI} to a {@link ClientTransport} or {@link ServerTransport}. */
public interface UriHandler {

/**
* Load all registered instances of {@code UriHandler}.
*
* @return all registered instances of {@code UriHandler}
*/
static ServiceLoader<UriHandler> loadServices() {
return ServiceLoader.load(UriHandler.class);
}

default Optional<ClientTransport> buildClient(URI uri) {
return Optional.empty();
}
/**
* Returns an implementation of {@link ClientTransport} unambiguously mapped to a {@link URI},
* otherwise {@link Optional#EMPTY}.
*
* @param uri the uri to map
* @return an implementation of {@link ClientTransport} unambiguously mapped to a {@link URI}, *
* otherwise {@link Optional#EMPTY}
* @throws NullPointerException if {@code uri} is {@code null}
*/
Optional<ClientTransport> buildClient(URI uri);

default Optional<ServerTransport> buildServer(URI uri) {
return Optional.empty();
}
/**
* Returns an implementation of {@link ServerTransport} unambiguously mapped to a {@link URI},
* otherwise {@link Optional#EMPTY}.
*
* @param uri the uri to map
* @return an implementation of {@link ServerTransport} unambiguously mapped to a {@link URI}, *
* otherwise {@link Optional#EMPTY}
* @throws NullPointerException if {@code uri} is {@code null}
*/
Optional<ServerTransport> buildServer(URI uri);
}
21 changes: 17 additions & 4 deletions rsocket-core/src/test/java/io/rsocket/uri/TestUriHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,29 @@

import io.rsocket.test.util.TestDuplexConnection;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.ServerTransport;
import java.net.URI;
import java.util.Objects;
import java.util.Optional;
import reactor.core.publisher.Mono;

public class TestUriHandler implements UriHandler {
public final class TestUriHandler implements UriHandler {

private static final String SCHEME = "test";

@Override
public Optional<ClientTransport> buildClient(URI uri) {
if ("test".equals(uri.getScheme())) {
return Optional.of(() -> Mono.just(new TestDuplexConnection()));
Objects.requireNonNull(uri, "uri must not be null");

if (!SCHEME.equals(uri.getScheme())) {
return Optional.empty();
}
return UriHandler.super.buildClient(uri);

return Optional.of(() -> Mono.just(new TestDuplexConnection()));
}

@Override
public Optional<ServerTransport> buildServer(URI uri) {
return Optional.empty();
}
}
2 changes: 2 additions & 0 deletions rsocket-test/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ dependencies {

implementation project(':rsocket-core')
implementation 'com.google.code.findbugs:jsr305'
implementation 'io.projectreactor:reactor-test'
implementation 'org.assertj:assertj-core'
implementation 'org.junit.jupiter:junit-jupiter-api'
implementation 'org.mockito:mockito-core'

Expand Down
2 changes: 2 additions & 0 deletions rsocket-test/src/main/java/io/rsocket/test/TestFrames.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public static ExtensionFrame createTestExtensionFrame() {
/**
* Returns a custom test {@link Frame}.
*
* @param frameType the type of frame
* @param byteBuf the {@link ByteBuf} of content for this frame
* @return a custom test {@link Frame}
*/
public static Frame createTestFrame(FrameType frameType, ByteBuf byteBuf) {
Expand Down
287 changes: 287 additions & 0 deletions rsocket-test/src/main/java/io/rsocket/test/TransportTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
/*
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.test;

import io.rsocket.Closeable;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.ServerTransport;
import io.rsocket.util.DefaultPayload;
import java.time.Duration;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

public interface TransportTest {

@AfterEach
default void close() {
getTransportPair().dispose();
}

default Payload createTestPayload(int metadataPresent) {
String metadata1;

switch (metadataPresent % 5) {
case 0:
metadata1 = null;
break;
case 1:
metadata1 = "";
break;
default:
metadata1 = "metadata";
break;
}
String metadata = metadata1;

return DefaultPayload.create("test-data", metadata);
}

@DisplayName("makes 10 fireAndForget requests")
@Test
default void fireAndForget10() {
Flux.range(1, 10)
.flatMap(i -> getClient().fireAndForget(createTestPayload(i)))
.as(StepVerifier::create)
.expectNextCount(0)
.expectComplete()
.verify(getTimeout());
}

default RSocket getClient() {
return getTransportPair().getClient();
}

Duration getTimeout();

TransportPair getTransportPair();

@DisplayName("makes 10 metadataPush requests")
@Test
default void metadataPush10() {
Flux.range(1, 10)
.flatMap(i -> getClient().metadataPush(DefaultPayload.create("", "test-metadata")))
.as(StepVerifier::create)
.expectNextCount(0)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 1 requestChannel request with 0 payloads")
@Test
default void requestChannel0() {
getClient()
.requestChannel(Flux.empty())
.as(StepVerifier::create)
.expectNextCount(0)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 1 requestChannel request with 1 payloads")
@Test
default void requestChannel1() {
getClient()
.requestChannel(Mono.just(createTestPayload(0)))
.as(StepVerifier::create)
.expectNextCount(1)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 1 requestChannel request with 200,000 payloads")
@Test
default void requestChannel200_000() {
Flux<Payload> payloads = Flux.range(0, 200_000).map(this::createTestPayload);

getClient()
.requestChannel(payloads)
.as(StepVerifier::create)
.expectNextCount(200_000)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 1 requestChannel request with 20,000 payloads")
@Test
default void requestChannel20_000() {
Flux<Payload> payloads = Flux.range(0, 20_000).map(this::createTestPayload);

getClient()
.requestChannel(payloads)
.as(StepVerifier::create)
.expectNextCount(20_000)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 1 requestChannel request with 2,000,000 payloads")
@Test
default void requestChannel2_000_000() {
Flux<Payload> payloads = Flux.range(0, 2_000_000).map(this::createTestPayload);

getClient()
.requestChannel(payloads)
.as(StepVerifier::create)
.expectNextCount(2_000_000)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 1 requestChannel request with 3 payloads")
@Test
default void requestChannel3() {
Flux<Payload> payloads = Flux.range(0, 3).map(this::createTestPayload);

getClient()
.requestChannel(payloads)
.as(StepVerifier::create)
.expectNextCount(3)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 1 requestChannel request with 512 payloads")
@Test
default void requestChannel512() {
Flux<Payload> payloads = Flux.range(0, 512).map(this::createTestPayload);

getClient()
.requestChannel(payloads)
.as(StepVerifier::create)
.expectNextCount(512)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 1 requestResponse request")
@Test
default void requestResponse1() {
getClient()
.requestResponse(createTestPayload(1))
.map(Payload::getDataUtf8)
.as(StepVerifier::create)
.expectNextCount(1)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 10 requestResponse requests")
@Test
default void requestResponse10() {
Flux.range(1, 10)
.flatMap(i -> getClient().requestResponse(createTestPayload(i)).map(Payload::getDataUtf8))
.as(StepVerifier::create)
.expectNextCount(10)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 100 requestResponse requests")
@Test
default void requestResponse100() {
Flux.range(1, 100)
.flatMap(i -> getClient().requestResponse(createTestPayload(i)).map(Payload::getDataUtf8))
.as(StepVerifier::create)
.expectNextCount(100)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 10,000 requestResponse requests")
@Test
default void requestResponse10_000() {
Flux.range(1, 10_000)
.flatMap(i -> getClient().requestResponse(createTestPayload(i)).map(Payload::getDataUtf8))
.as(StepVerifier::create)
.expectNextCount(10_000)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 1 requestStream request and receives 10,000 responses")
@Test
default void requestStream10_000() {
getClient()
.requestStream(createTestPayload(3))
.as(StepVerifier::create)
.expectNextCount(10_000)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 1 requestStream request and receives 5 responses")
@Test
default void requestStream5() {
getClient()
.requestStream(createTestPayload(3))
.take(5)
.as(StepVerifier::create)
.expectNextCount(5)
.expectComplete()
.verify(getTimeout());
}

@DisplayName("makes 1 requestStream request and consumes result incrementally")
@Test
default void requestStreamDelayedRequestN() {
getClient()
.requestStream(createTestPayload(3))
.take(10)
.as(StepVerifier::create)
.thenRequest(5)
.expectNextCount(5)
.thenRequest(5)
.expectNextCount(5)
.expectComplete()
.verify(getTimeout());
}

final class TransportPair implements Disposable {

private final RSocket client;

private final Closeable server;

public TransportPair(ServerTransport<?> serverTransport, ClientTransport clientTransport) {
this.server =
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new TestRSocket()))
.transport(serverTransport)
.start()
.block();

this.client = RSocketFactory.connect().transport(clientTransport).start().block();
}

@Override
public void dispose() {
server.dispose();
}

RSocket getClient() {
return client;
}
}
}
Loading