Skip to content

Commit 91e894a

Browse files
authored
removes redundant frames being sent (#792)
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 49e93c9 commit 91e894a

File tree

4 files changed

+53
-33
lines changed

4 files changed

+53
-33
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,9 +261,7 @@ public void doOnTerminal(
261261
@Nonnull SignalType signalType,
262262
@Nullable Payload element,
263263
@Nullable Throwable e) {
264-
if (signalType == SignalType.ON_ERROR) {
265-
sendProcessor.onNext(ErrorFrameFlyweight.encode(allocator, streamId, e));
266-
} else if (signalType == SignalType.CANCEL) {
264+
if (signalType == SignalType.CANCEL) {
267265
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
268266
}
269267
removeStreamReceiver(streamId);

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.rsocket.lease.ResponderLeaseHandler;
3737
import java.util.function.Consumer;
3838
import java.util.function.LongConsumer;
39+
import javax.annotation.Nullable;
3940
import org.reactivestreams.Processor;
4041
import org.reactivestreams.Publisher;
4142
import org.reactivestreams.Subscriber;
@@ -302,7 +303,7 @@ private void handleFrame(ByteBuf frame) {
302303
case REQUEST_STREAM:
303304
int streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame);
304305
Payload streamPayload = payloadDecoder.apply(frame);
305-
handleStream(streamId, requestStream(streamPayload), streamInitialRequestN);
306+
handleStream(streamId, requestStream(streamPayload), streamInitialRequestN, null);
306307
break;
307308
case REQUEST_CHANNEL:
308309
int channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame);
@@ -433,7 +434,11 @@ protected void hookFinally(SignalType type) {
433434
response.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER).subscribe(subscriber);
434435
}
435436

436-
private void handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
437+
private void handleStream(
438+
int streamId,
439+
Flux<Payload> response,
440+
int initialRequestN,
441+
@Nullable UnicastProcessor<Payload> requestChannel) {
437442
final BaseSubscriber<Payload> subscriber =
438443
new BaseSubscriber<Payload>() {
439444

@@ -446,6 +451,17 @@ protected void hookOnSubscribe(Subscription s) {
446451
protected void hookOnNext(Payload payload) {
447452
if (!PayloadValidationUtils.isValid(mtu, payload)) {
448453
payload.release();
454+
// specifically for requestChannel case so when Payload is invalid we will not be
455+
// sending CancelFrame and ErrorFrame
456+
// Note: CancelFrame is redundant and due to spec
457+
// (https://github.com/rsocket/rsocket/blob/master/Protocol.md#request-channel)
458+
// Upon receiving an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
459+
// terminated on both Requester and Responder.
460+
// Upon sending an ERROR[APPLICATION_ERROR|REJECTED|CANCELED|INVALID], the stream is
461+
// terminated on both the Requester and Responder.
462+
if (requestChannel != null) {
463+
channelProcessors.remove(streamId, requestChannel);
464+
}
449465
cancel();
450466
final IllegalArgumentException t =
451467
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
@@ -495,9 +511,6 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) {
495511

496512
Flux<Payload> payloads =
497513
frames
498-
.doOnCancel(
499-
() -> sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId)))
500-
.doOnError(t -> handleError(streamId, t))
501514
.doOnRequest(
502515
new LongConsumer() {
503516
boolean first = true;
@@ -511,10 +524,19 @@ public void accept(long l) {
511524
} else {
512525
n = l;
513526
}
514-
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
527+
if (n > 0) {
528+
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
529+
}
530+
}
531+
})
532+
.doFinally(
533+
signalType -> {
534+
if (channelProcessors.remove(streamId, frames)) {
535+
if (signalType == SignalType.CANCEL) {
536+
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
537+
}
515538
}
516539
})
517-
.doFinally(signalType -> channelProcessors.remove(streamId))
518540
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
519541

520542
// not chained, as the payload should be enqueued in the Unicast processor before this method
@@ -523,9 +545,9 @@ public void accept(long l) {
523545
frames.onNext(payload);
524546

525547
if (responderRSocket != null) {
526-
handleStream(streamId, requestChannel(payload, payloads), initialRequestN);
548+
handleStream(streamId, requestChannel(payload, payloads), initialRequestN, frames);
527549
} else {
528-
handleStream(streamId, requestChannel(payloads), initialRequestN);
550+
handleStream(streamId, requestChannel(payloads), initialRequestN, frames);
529551
}
530552
}
531553

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,8 @@ public void testHandleApplicationException() {
174174
verify(responseSub).onError(any(ApplicationErrorException.class));
175175

176176
Assertions.assertThat(rule.connection.getSent())
177-
// requestResponseFrame FIXME
178-
// .hasSize(1)
177+
// requestResponseFrame
178+
.hasSize(1)
179179
.allMatch(ReferenceCounted::release);
180180

181181
rule.assertHasNoLeaks();
@@ -356,8 +356,6 @@ public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmen
356356
.isInstanceOf(IllegalArgumentException.class)
357357
.hasMessage(INVALID_PAYLOAD_ERROR_MESSAGE))
358358
.verify();
359-
// FIXME: should be removed
360-
Assertions.assertThat(rule.connection.getSent()).allMatch(bb -> bb.release());
361359
rule.assertHasNoLeaks();
362360
});
363361
}

rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@
6868
import org.junit.runners.model.Statement;
6969
import org.reactivestreams.Publisher;
7070
import org.reactivestreams.Subscriber;
71+
import org.reactivestreams.Subscription;
7172
import reactor.core.CoreSubscriber;
73+
import reactor.core.publisher.BaseSubscriber;
7274
import reactor.core.publisher.Flux;
7375
import reactor.core.publisher.FluxSink;
7476
import reactor.core.publisher.Hooks;
@@ -193,27 +195,27 @@ public Flux<Payload> requestStream(Payload p) {
193195
p.release();
194196
return Flux.just(payload).doOnCancel(() -> cancelled.set(true));
195197
}
196-
// FIXME
197-
// @Override
198-
// public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
199-
// Flux.from(payloads)
200-
// .doOnNext(Payload::release)
201-
// .subscribe(
202-
// new BaseSubscriber<Payload>() {
203-
// @Override
204-
// protected void hookOnSubscribe(Subscription subscription) {
205-
// subscription.request(1);
206-
// }
207-
// });
208-
// return Flux.just(payload).doOnCancel(() -> cancelled.set(true));
209-
// }
198+
199+
@Override
200+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
201+
Flux.from(payloads)
202+
.doOnNext(Payload::release)
203+
.subscribe(
204+
new BaseSubscriber<Payload>() {
205+
@Override
206+
protected void hookOnSubscribe(Subscription subscription) {
207+
subscription.request(1);
208+
}
209+
});
210+
return Flux.just(payload).doOnCancel(() -> cancelled.set(true));
211+
}
210212
};
211213
rule.setAcceptingSocket(acceptingSocket);
212214

213215
final Runnable[] runnables = {
214216
() -> rule.sendRequest(streamId, FrameType.REQUEST_RESPONSE),
215-
() -> rule.sendRequest(streamId, FrameType.REQUEST_STREAM) /* FIXME,
216-
() -> rule.sendRequest(streamId, FrameType.REQUEST_CHANNEL)*/
217+
() -> rule.sendRequest(streamId, FrameType.REQUEST_STREAM),
218+
() -> rule.sendRequest(streamId, FrameType.REQUEST_CHANNEL)
217219
};
218220

219221
for (Runnable runnable : runnables) {
@@ -224,9 +226,9 @@ public Flux<Payload> requestStream(Payload p) {
224226
.isInstanceOf(IllegalArgumentException.class)
225227
.hasToString("java.lang.IllegalArgumentException: " + INVALID_PAYLOAD_ERROR_MESSAGE);
226228
Assertions.assertThat(rule.connection.getSent())
227-
.filteredOn(bb -> FrameHeaderFlyweight.frameType(bb) == FrameType.ERROR)
228229
.hasSize(1)
229230
.first()
231+
.matches(bb -> FrameHeaderFlyweight.frameType(bb) == FrameType.ERROR)
230232
.matches(bb -> ErrorFrameFlyweight.dataUtf8(bb).contains(INVALID_PAYLOAD_ERROR_MESSAGE))
231233
.matches(ReferenceCounted::release);
232234

0 commit comments

Comments
 (0)