Skip to content

Adds specific requestChannel(Payload, Publisher<Payload>) #572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.rsocket.framing.FrameType;

/**
* Exposed to server for determination of RequestHandler based on mime types and SETUP metadata/data
* Exposed to server for determination of ResponderRSocket based on mime types and SETUP metadata/data
*/
public abstract class ConnectionSetupPayload extends AbstractReferenceCounted implements Payload {

Expand Down
105 changes: 68 additions & 37 deletions rsocket-core/src/main/java/io/rsocket/RSocketServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.UnicastProcessor;

import java.util.Collections;
import java.util.Map;
Expand All @@ -39,15 +42,16 @@
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;

/** Server side RSocket. Receives {@link Frame}s from a {@link RSocketClient} */
class RSocketServer implements RSocket {
class RSocketServer implements ResponderRSocket {

private final DuplexConnection connection;
private final RSocket requestHandler;
private final ResponderRSocket responderRSocket;
private final Function<Frame, ? extends Payload> frameDecoder;
private final Consumer<Throwable> errorConsumer;

private final Map<Integer, Subscription> sendingSubscriptions;
private final Map<Integer, Processor<Payload,Payload>> channelProcessors;
private final Map<Integer, Processor<Payload, Payload>> channelProcessors;

private final UnboundedProcessor<Frame> sendProcessor;
private KeepAliveHandler keepAliveHandler;
Expand All @@ -69,12 +73,16 @@ class RSocketServer implements RSocket {
Consumer<Throwable> errorConsumer,
long tickPeriod,
long ackTimeout) {
this.connection = connection;

this.requestHandler = requestHandler;
this.responderRSocket =
(requestHandler instanceof ResponderRSocket) ? (ResponderRSocket) requestHandler : null;

this.connection = connection;
this.frameDecoder = frameDecoder;
this.errorConsumer = errorConsumer;
this.sendingSubscriptions = Collections.synchronizedMap(new IntObjectHashMap<>());
this.channelProcessors = Collections.synchronizedMap(new IntObjectHashMap<>());
this.channelProcessors = Collections.synchronizedMap(new IntObjectHashMap<>());

// DO NOT Change the order here. The Send processor must be subscribed to before receiving
// connections
Expand Down Expand Up @@ -116,43 +124,55 @@ class RSocketServer implements RSocket {
}

private void handleSendProcessorError(Throwable t) {
sendingSubscriptions.values().forEach(subscription -> {
try {
subscription.cancel();
} catch (Throwable e) {
errorConsumer.accept(e);
}
});
sendingSubscriptions
.values()
.forEach(
subscription -> {
try {
subscription.cancel();
} catch (Throwable e) {
errorConsumer.accept(e);
}
});

channelProcessors.values().forEach(subscription -> {
try {
subscription.onError(t);
} catch (Throwable e) {
errorConsumer.accept(e);
}
});
channelProcessors
.values()
.forEach(
subscription -> {
try {
subscription.onError(t);
} catch (Throwable e) {
errorConsumer.accept(e);
}
});
}

private void handleSendProcessorCancel(SignalType t) {
if (SignalType.ON_ERROR == t) {
return;
}

sendingSubscriptions.values().forEach(subscription -> {
try {
subscription.cancel();
} catch (Throwable e) {
errorConsumer.accept(e);
}
});
sendingSubscriptions
.values()
.forEach(
subscription -> {
try {
subscription.cancel();
} catch (Throwable e) {
errorConsumer.accept(e);
}
});

channelProcessors.values().forEach(subscription -> {
try {
subscription.onComplete();
} catch (Throwable e) {
errorConsumer.accept(e);
}
});
channelProcessors
.values()
.forEach(
subscription -> {
try {
subscription.onComplete();
} catch (Throwable e) {
errorConsumer.accept(e);
}
});
}

@Override
Expand Down Expand Up @@ -191,6 +211,15 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
}
}

@Override
public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
try {
return responderRSocket.requestChannel(payload, payloads);
} catch (Throwable t) {
return Flux.error(t);
}
}

@Override
public Mono<Void> metadataPush(Payload payload) {
try {
Expand Down Expand Up @@ -232,9 +261,7 @@ private synchronized void cleanUpSendingSubscriptions() {
}

private synchronized void cleanUpChannelProcessors() {
channelProcessors
.values()
.forEach(Processor::onComplete);
channelProcessors.values().forEach(Processor::onComplete);
channelProcessors.clear();
}

Expand Down Expand Up @@ -381,7 +408,11 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) {
// and any later payload can be processed
frames.onNext(payload);

handleStream(streamId, requestChannel(payloads), initialRequestN);
if (responderRSocket != null) {
handleStream(streamId, requestChannel(payload, payloads), initialRequestN);
} else {
handleStream(streamId, requestChannel(payloads), initialRequestN);
}
}

private void handleKeepAliveFrame(Frame frame) {
Expand Down
23 changes: 23 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/ResponderRSocket.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.rsocket;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

/**
* Extends the {@link RSocket} that allows an implementer to peek at the first request payload of a
* channel.
*/
public interface ResponderRSocket extends RSocket {
/**
* Implement this method to peak at the first payload of the incoming request stream without
* having to subscribe to Publish&lt;Payload&gt; payloads
*
* @param payload First payload in the stream - this is the same payload as the first payload in
* Publisher&lt;Payload&gt; payloads
* @param payloads Stream of request payloads.
* @return Stream of response payloads.
*/
default Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
return requestChannel(payloads);
}
}