Skip to content

Commit 45677c7

Browse files
author
Tomas Kolda
committed
Fix performance degradation when fragmentation is used (#994)
1 parent 7a4602c commit 45677c7

File tree

1 file changed

+17
-2
lines changed

1 file changed

+17
-2
lines changed

rsocket-core/src/main/java/io/rsocket/fragmentation/FragmentationDuplexConnection.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,16 @@ public static int assertMtu(int mtu) {
8484

8585
@Override
8686
public Mono<Void> send(Publisher<ByteBuf> frames) {
87-
return Flux.from(frames).concatMap(this::sendOne).then();
87+
return delegate.send(Flux.from(frames)
88+
.concatMap(frame -> {
89+
FrameType frameType = FrameHeaderCodec.frameType(frame);
90+
int readableBytes = frame.readableBytes();
91+
if (!shouldFragment(frameType, readableBytes)) {
92+
return Flux.just(frame);
93+
}
94+
95+
return logFragments(Flux.from(fragmentFrame(alloc(), mtu, frame, frameType)));
96+
}));
8897
}
8998

9099
@Override
@@ -95,6 +104,11 @@ public Mono<Void> sendOne(ByteBuf frame) {
95104
return delegate.sendOne(frame);
96105
}
97106
Flux<ByteBuf> fragments = Flux.from(fragmentFrame(alloc(), mtu, frame, frameType));
107+
fragments = logFragments(fragments);
108+
return delegate.send(fragments);
109+
}
110+
111+
protected Flux<ByteBuf> logFragments(Flux<ByteBuf> fragments) {
98112
if (logger.isDebugEnabled()) {
99113
fragments =
100114
fragments.doOnNext(
@@ -107,6 +121,7 @@ public Mono<Void> sendOne(ByteBuf frame) {
107121
ByteBufUtil.prettyHexDump(byteBuf));
108122
});
109123
}
110-
return delegate.send(fragments);
124+
return fragments;
111125
}
126+
112127
}

0 commit comments

Comments
 (0)