Skip to content

Avoid sending of redundant ERROR and CANCEL frames #792

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,7 @@ public void doOnTerminal(
@Nonnull SignalType signalType,
@Nullable Payload element,
@Nullable Throwable e) {
if (signalType == SignalType.ON_ERROR) {
sendProcessor.onNext(ErrorFrameFlyweight.encode(allocator, streamId, e));
} else if (signalType == SignalType.CANCEL) {
if (signalType == SignalType.CANCEL) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
}
removeStreamReceiver(streamId);
Expand Down
40 changes: 31 additions & 9 deletions rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.rsocket.lease.ResponderLeaseHandler;
import java.util.function.Consumer;
import java.util.function.LongConsumer;
import javax.annotation.Nullable;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -302,7 +303,7 @@ private void handleFrame(ByteBuf frame) {
case REQUEST_STREAM:
int streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame);
Payload streamPayload = payloadDecoder.apply(frame);
handleStream(streamId, requestStream(streamPayload), streamInitialRequestN);
handleStream(streamId, requestStream(streamPayload), streamInitialRequestN, null);
break;
case REQUEST_CHANNEL:
int channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame);
Expand Down Expand Up @@ -433,7 +434,11 @@ protected void hookFinally(SignalType type) {
response.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER).subscribe(subscriber);
}

private void handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
private void handleStream(
int streamId,
Flux<Payload> response,
int initialRequestN,
@Nullable UnicastProcessor<Payload> requestChannel) {
final BaseSubscriber<Payload> subscriber =
new BaseSubscriber<Payload>() {

Expand All @@ -446,6 +451,17 @@ protected void hookOnSubscribe(Subscription s) {
protected void hookOnNext(Payload payload) {
if (!PayloadValidationUtils.isValid(mtu, payload)) {
payload.release();
// specifically for requestChannel case so when Payload is invalid we will not be
// sending CancelFrame and ErrorFrame
// Note: CancelFrame is redundant and due to spec
// (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
// Upon receiving an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
// terminated on both Requester and Responder.
// Upon sending an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
// terminated on both the Requester and Responder.
if (requestChannel != null) {
channelProcessors.remove(streamId, requestChannel);
}
cancel();
final IllegalArgumentException t =
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
Expand Down Expand Up @@ -495,9 +511,6 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) {

Flux<Payload> payloads =
frames
.doOnCancel(
() -> sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId)))
.doOnError(t -> handleError(streamId, t))
.doOnRequest(
new LongConsumer() {
boolean first = true;
Expand All @@ -511,10 +524,19 @@ public void accept(long l) {
} else {
n = l;
}
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
if (n > 0) {
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
}
}
})
.doFinally(
signalType -> {
if (channelProcessors.remove(streamId, frames)) {
if (signalType == SignalType.CANCEL) {
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
}
}
})
.doFinally(signalType -> channelProcessors.remove(streamId))
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);

// not chained, as the payload should be enqueued in the Unicast processor before this method
Expand All @@ -523,9 +545,9 @@ public void accept(long l) {
frames.onNext(payload);

if (responderRSocket != null) {
handleStream(streamId, requestChannel(payload, payloads), initialRequestN);
handleStream(streamId, requestChannel(payload, payloads), initialRequestN, frames);
} else {
handleStream(streamId, requestChannel(payloads), initialRequestN);
handleStream(streamId, requestChannel(payloads), initialRequestN, frames);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ public void testHandleApplicationException() {
verify(responseSub).onError(any(ApplicationErrorException.class));

Assertions.assertThat(rule.connection.getSent())
// requestResponseFrame FIXME
// .hasSize(1)
// requestResponseFrame
.hasSize(1)
.allMatch(ReferenceCounted::release);

rule.assertHasNoLeaks();
Expand Down Expand Up @@ -356,8 +356,6 @@ public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmen
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(INVALID_PAYLOAD_ERROR_MESSAGE))
.verify();
// FIXME: should be removed
Assertions.assertThat(rule.connection.getSent()).allMatch(bb -> bb.release());
rule.assertHasNoLeaks();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@
import org.junit.runners.model.Statement;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Hooks;
Expand Down Expand Up @@ -193,27 +195,27 @@ public Flux<Payload> requestStream(Payload p) {
p.release();
return Flux.just(payload).doOnCancel(() -> cancelled.set(true));
}
// FIXME
// @Override
// public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
// Flux.from(payloads)
// .doOnNext(Payload::release)
// .subscribe(
// new BaseSubscriber<Payload>() {
// @Override
// protected void hookOnSubscribe(Subscription subscription) {
// subscription.request(1);
// }
// });
// return Flux.just(payload).doOnCancel(() -> cancelled.set(true));
// }

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
Flux.from(payloads)
.doOnNext(Payload::release)
.subscribe(
new BaseSubscriber<Payload>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(1);
}
});
return Flux.just(payload).doOnCancel(() -> cancelled.set(true));
}
};
rule.setAcceptingSocket(acceptingSocket);

final Runnable[] runnables = {
() -> rule.sendRequest(streamId, FrameType.REQUEST_RESPONSE),
() -> rule.sendRequest(streamId, FrameType.REQUEST_STREAM) /* FIXME,
() -> rule.sendRequest(streamId, FrameType.REQUEST_CHANNEL)*/
() -> rule.sendRequest(streamId, FrameType.REQUEST_STREAM),
() -> rule.sendRequest(streamId, FrameType.REQUEST_CHANNEL)
};

for (Runnable runnable : runnables) {
Expand All @@ -224,9 +226,9 @@ public Flux<Payload> requestStream(Payload p) {
.isInstanceOf(IllegalArgumentException.class)
.hasToString("java.lang.IllegalArgumentException: " + INVALID_PAYLOAD_ERROR_MESSAGE);
Assertions.assertThat(rule.connection.getSent())
.filteredOn(bb -> FrameHeaderFlyweight.frameType(bb) == FrameType.ERROR)
.hasSize(1)
.first()
.matches(bb -> FrameHeaderFlyweight.frameType(bb) == FrameType.ERROR)
.matches(bb -> ErrorFrameFlyweight.dataUtf8(bb).contains(INVALID_PAYLOAD_ERROR_MESSAGE))
.matches(ReferenceCounted::release);

Expand Down