Skip to content

Commit 17f7512

Browse files
committed
removes redundant frames being sent
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 49e93c9 commit 17f7512

File tree

4 files changed

+45
-33
lines changed

4 files changed

+45
-33
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,9 +261,7 @@ public void doOnTerminal(
261261
@Nonnull SignalType signalType,
262262
@Nullable Payload element,
263263
@Nullable Throwable e) {
264-
if (signalType == SignalType.ON_ERROR) {
265-
sendProcessor.onNext(ErrorFrameFlyweight.encode(allocator, streamId, e));
266-
} else if (signalType == SignalType.CANCEL) {
264+
if (signalType == SignalType.CANCEL) {
267265
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
268266
}
269267
removeStreamReceiver(streamId);

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.rsocket.lease.ResponderLeaseHandler;
3737
import java.util.function.Consumer;
3838
import java.util.function.LongConsumer;
39+
import javax.annotation.Nullable;
3940
import org.reactivestreams.Processor;
4041
import org.reactivestreams.Publisher;
4142
import org.reactivestreams.Subscriber;
@@ -302,7 +303,7 @@ private void handleFrame(ByteBuf frame) {
302303
case REQUEST_STREAM:
303304
int streamInitialRequestN = RequestStreamFrameFlyweight.initialRequestN(frame);
304305
Payload streamPayload = payloadDecoder.apply(frame);
305-
handleStream(streamId, requestStream(streamPayload), streamInitialRequestN);
306+
handleStream(streamId, requestStream(streamPayload), streamInitialRequestN, null);
306307
break;
307308
case REQUEST_CHANNEL:
308309
int channelInitialRequestN = RequestChannelFrameFlyweight.initialRequestN(frame);
@@ -433,7 +434,11 @@ protected void hookFinally(SignalType type) {
433434
response.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER).subscribe(subscriber);
434435
}
435436

436-
private void handleStream(int streamId, Flux<Payload> response, int initialRequestN) {
437+
private void handleStream(
438+
int streamId,
439+
Flux<Payload> response,
440+
int initialRequestN,
441+
@Nullable UnicastProcessor<Payload> requestChannel) {
437442
final BaseSubscriber<Payload> subscriber =
438443
new BaseSubscriber<Payload>() {
439444

@@ -446,6 +451,9 @@ protected void hookOnSubscribe(Subscription s) {
446451
protected void hookOnNext(Payload payload) {
447452
if (!PayloadValidationUtils.isValid(mtu, payload)) {
448453
payload.release();
454+
if (requestChannel != null) {
455+
channelProcessors.remove(streamId, requestChannel);
456+
}
449457
cancel();
450458
final IllegalArgumentException t =
451459
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
@@ -495,9 +503,6 @@ private void handleChannel(int streamId, Payload payload, int initialRequestN) {
495503

496504
Flux<Payload> payloads =
497505
frames
498-
.doOnCancel(
499-
() -> sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId)))
500-
.doOnError(t -> handleError(streamId, t))
501506
.doOnRequest(
502507
new LongConsumer() {
503508
boolean first = true;
@@ -511,10 +516,19 @@ public void accept(long l) {
511516
} else {
512517
n = l;
513518
}
514-
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
519+
if (n > 0) {
520+
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
521+
}
522+
}
523+
})
524+
.doFinally(
525+
signalType -> {
526+
if (channelProcessors.remove(streamId, frames)) {
527+
if (signalType == SignalType.CANCEL) {
528+
sendProcessor.onNext(CancelFrameFlyweight.encode(allocator, streamId));
529+
}
515530
}
516531
})
517-
.doFinally(signalType -> channelProcessors.remove(streamId))
518532
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
519533

520534
// not chained, as the payload should be enqueued in the Unicast processor before this method
@@ -523,9 +537,9 @@ public void accept(long l) {
523537
frames.onNext(payload);
524538

525539
if (responderRSocket != null) {
526-
handleStream(streamId, requestChannel(payload, payloads), initialRequestN);
540+
handleStream(streamId, requestChannel(payload, payloads), initialRequestN, frames);
527541
} else {
528-
handleStream(streamId, requestChannel(payloads), initialRequestN);
542+
handleStream(streamId, requestChannel(payloads), initialRequestN, frames);
529543
}
530544
}
531545

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,8 @@ public void testHandleApplicationException() {
174174
verify(responseSub).onError(any(ApplicationErrorException.class));
175175

176176
Assertions.assertThat(rule.connection.getSent())
177-
// requestResponseFrame FIXME
178-
// .hasSize(1)
177+
// requestResponseFrame
178+
.hasSize(1)
179179
.allMatch(ReferenceCounted::release);
180180

181181
rule.assertHasNoLeaks();
@@ -356,8 +356,6 @@ public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmen
356356
.isInstanceOf(IllegalArgumentException.class)
357357
.hasMessage(INVALID_PAYLOAD_ERROR_MESSAGE))
358358
.verify();
359-
// FIXME: should be removed
360-
Assertions.assertThat(rule.connection.getSent()).allMatch(bb -> bb.release());
361359
rule.assertHasNoLeaks();
362360
});
363361
}

rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@
6868
import org.junit.runners.model.Statement;
6969
import org.reactivestreams.Publisher;
7070
import org.reactivestreams.Subscriber;
71+
import org.reactivestreams.Subscription;
7172
import reactor.core.CoreSubscriber;
73+
import reactor.core.publisher.BaseSubscriber;
7274
import reactor.core.publisher.Flux;
7375
import reactor.core.publisher.FluxSink;
7476
import reactor.core.publisher.Hooks;
@@ -193,27 +195,27 @@ public Flux<Payload> requestStream(Payload p) {
193195
p.release();
194196
return Flux.just(payload).doOnCancel(() -> cancelled.set(true));
195197
}
196-
// FIXME
197-
// @Override
198-
// public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
199-
// Flux.from(payloads)
200-
// .doOnNext(Payload::release)
201-
// .subscribe(
202-
// new BaseSubscriber<Payload>() {
203-
// @Override
204-
// protected void hookOnSubscribe(Subscription subscription) {
205-
// subscription.request(1);
206-
// }
207-
// });
208-
// return Flux.just(payload).doOnCancel(() -> cancelled.set(true));
209-
// }
198+
199+
@Override
200+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
201+
Flux.from(payloads)
202+
.doOnNext(Payload::release)
203+
.subscribe(
204+
new BaseSubscriber<>() {
205+
@Override
206+
protected void hookOnSubscribe(Subscription subscription) {
207+
subscription.request(1);
208+
}
209+
});
210+
return Flux.just(payload).doOnCancel(() -> cancelled.set(true));
211+
}
210212
};
211213
rule.setAcceptingSocket(acceptingSocket);
212214

213215
final Runnable[] runnables = {
214216
() -> rule.sendRequest(streamId, FrameType.REQUEST_RESPONSE),
215-
() -> rule.sendRequest(streamId, FrameType.REQUEST_STREAM) /* FIXME,
216-
() -> rule.sendRequest(streamId, FrameType.REQUEST_CHANNEL)*/
217+
() -> rule.sendRequest(streamId, FrameType.REQUEST_STREAM),
218+
() -> rule.sendRequest(streamId, FrameType.REQUEST_CHANNEL)
217219
};
218220

219221
for (Runnable runnable : runnables) {
@@ -224,9 +226,9 @@ public Flux<Payload> requestStream(Payload p) {
224226
.isInstanceOf(IllegalArgumentException.class)
225227
.hasToString("java.lang.IllegalArgumentException: " + INVALID_PAYLOAD_ERROR_MESSAGE);
226228
Assertions.assertThat(rule.connection.getSent())
227-
.filteredOn(bb -> FrameHeaderFlyweight.frameType(bb) == FrameType.ERROR)
228229
.hasSize(1)
229230
.first()
231+
.matches(bb -> FrameHeaderFlyweight.frameType(bb) == FrameType.ERROR)
230232
.matches(bb -> ErrorFrameFlyweight.dataUtf8(bb).contains(INVALID_PAYLOAD_ERROR_MESSAGE))
231233
.matches(ReferenceCounted::release);
232234

0 commit comments

Comments
 (0)