-
Notifications
You must be signed in to change notification settings - Fork 356
Fragmentation #481
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fragmentation #481
Conversation
This gonna take some time to review given the number of changes ! |
* | ||
* @param delegate the {@link DuplexConnection} to decorate | ||
* @param maxFragmentSize the maximum fragment size | ||
* @throws NullPointerException if {@code byteBufAllocator} or {@code delegate} are {@code null} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: byteBufAllocator not provided
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problems with nits. I’m super-meticulous about my code, and especially my documentation, so keep up with this level of review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - consider this a low signal review, but looks really good. I'd prefer others more familiar with the fragmentation to Approve. Happy to unblock later if its waiting around.
* @return the {@link String} as a {@code UTF-8} encoded {@link ByteBuf} or {@code null} if {@code | ||
* s} is {@code null}. | ||
*/ | ||
static @Nullable ByteBuf getUff8AsByteBuf(@Nullable String s) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: Uff
import io.netty.buffer.ByteBufUtil; | ||
import org.assertj.core.presentation.StandardRepresentation; | ||
|
||
public final class ByteBufRepresentation extends StandardRepresentation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL: nice!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Single most important change I made for testing. Amazing how much of a difference it makes when debugging framing bugs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Frames all have solid toString()
s now so I don’t think they need representations, but if you can think of other types that could benefit from this treatment, we should add them.
* @return the data mapped to a different type | ||
* @throws NullPointerException if {@code function} is {@code null} | ||
*/ | ||
public <T> T mapDataMimeType(Function<ByteBuf, T> function) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain the thinking over map functions for something as trivial as mime type. it's generally a US-ASCII string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally is the problem:
MIME Type for encoding of Data and Metadata. This SHOULD be a US-ASCII string that includes...
The use of “SHOULD” leaves the possibility of a non-US-ASCII string for this value. I do believe that everyone will use the String
variants but wanted to be pedantically accurate to the spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think maybe making the default easy to find default return a UTF-8 string, and possibly having this mapDataMimeType as a backup. But TBH I think the SHOULD argument was over US-ASCII vs UTF-8 previously, not over arbitrary binary encodings here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would you do the method name getDataMimeTypeAsUtf8()
? I'm not sure what would make one method easier to find than the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or rather, is the bug here that it should be toString(US_ASCII)
?
* @see <a href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#setup-frame-0x01">Setup | ||
* Frame</a> | ||
*/ | ||
public final class SetupFrame extends AbstractRecyclableMetadataAndDataFrame<SetupFrame> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love this part of the change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job on the frame changes - nice use of the byebuf api
@@ -40,6 +40,7 @@ subprojects { | |||
} | |||
|
|||
dependencies { | |||
dependency 'ch.qos.logback:logback-classic:1.2.3' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to include a log implementation in the dependencies?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the dependencyManagment
plugin declaration. Adding it here creates a logical dependency with a version number, but doesn't actually attach it to any artifact. It makes it possible to add it as a testRuntimeOnly
dependency to multiple projects without having to keep the version number in sync across all of them.
|
||
/** An RSocket frame that only contains metadata. */ | ||
public interface MetadataFrame extends Frame { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer we have a flag that indicates if there is metadata or not, and not return optionals. We don't have allocate anything now to look at the metadata and now we would have to create an extra object.
|
||
/** An RSocket frame that only contains data. */ | ||
public interface DataFrame extends Frame { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get what you're doing, but I'd still really like the ability to get the ByteBuf without having to do something like mapData(Function.Identity()) or any Optionals.checkNotNull in the call path, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can call it something like getUnsafeData() ad getUnsafeMetadata()
return Mono.just(frame); | ||
} | ||
}); | ||
public Mono<Void> onClose() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure this will lead to problems when we start chaining duplex connections together - returning Mono.empty() is going to cause onClose to fire right away. This should probably return delegate.onClose()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, this is just a massive oversight on my part. This should absolutely have been delegated.
* @return a {@link GroupedFlux} with a key of the stream id and values of the frames | ||
*/ | ||
@SuppressWarnings("UnassignedFluxMonoInstance") | ||
public static Flux<GroupedFlux<Integer, io.rsocket.framing.Frame>> fromAbstractionLeakingFlux( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every frame is going to come in order, and you're either going to be rebuilding a frame, or emitting it if nothing has to happen, so I don't think grouping is necessary.
I think this would probably lead problems too as every stream gets a new stream id.
It also adds a flatMap to the duplex connection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is exactly the bit that I see as a "intermediate" step as the stream id is pushed into the transport. When this happens, it means that Flux<Frame> receive()
no longer has sufficient information for consumers to partition into individual streams. I'm not sure exactly what the API will look like, but it's going to need to evolve to something like Flux<Flux<Frame>>
where each outer Flux
is an isolated stream of a given id. The transport is responsible for maintaining the mapping and closing those Flux
es where appropriate (I would not expect an implementation to use GroupedFlux
, more likely a lookup table) at which point the implementations would look like:
private Flux<Frame> toFragmentedFrames(Flux<Frame> stream) {
return stream
.flatMap(frameFragmenter::fragment);
}
private Flux<Frame> toReassembledFrames(Flux<Frame> stream) {
FrameReassembler frameReassembler = createFrameReassembler(byteBufAllocator);
return stream
.handle(frameReassembler::reassemble)
.doOnTerminate(() -> disposeQuietly(frameReassembler));
}
It's at this point, that the entire AbstractionLeakingFrameUtils
disappears as the unwrapping and partitioning of streams happens in the transport (via stream id or transport multiplexing) directly without the ceremony of unwrapping and rewrapping anywhere but the transport itself.
This change introduces a framing package that contains strongly typed representations of all of the RSocket frame types. These representations are introduced as a separate abstraction because they specifically remove the frame length and stream id that are an optional, transport-layer concern. This allows will allow the main body of the code to be gradually moved over to an implementation that only deals with the logical frame types, and pushes the decoration of the frames with frame length and/or stream id down to the transport layer. Note that no implementation has been converted over; this is just the prework to have an the frame types ready for migration. These frame type representations are all implemented using the Flyweight pattern (i.e. use the Netty Recycler) and preserve the zero-copy nature of the existing frame representation. These implementations have consistent and symmetric management of ByteBuf retain and release lifecycle, in addition to well documented and proper nullability enforced APIs, specific to each frame type. Each implementation is also tested against raw binary representations to ensure the strictest specification compliance. Finally, improvements were made to testing with regards to configurable test logging, an AssertJ ByteBuf Representation for visual comparison on test failure, and other conveniences.
This change improves the implementation and test coverage of the fragmentation and reassembly functionality. The fragmentation functionality now support all frame types (previously, only the PAYLOAD frame could be fragmented) and ensures that a fragmentation and reassembly roundtrip results in an identical frame. The implementations of the fragmentation and reassembly functionality has been implemented on top of the new framing abstraction for testability and to start moving off of the abstraction that leaks frame lengths and stream ids. Note that the implementation of DuplexConnection *does not* move to the new framing abstraction. As this interface is not yet migrated, the FragmentationDuplexConnection is responsible for translating back and forth between the old abstraction and the new one. This has an implication that overhead will increase (although minimally because of zero-copy data and the use of Flyweights) temporarily during the migration to the new abstraction. Once that abstraction has been adopted everywhere, the translation will be removed, and the overheard will go back to it's current levels.
@robertroeser and @yschimke Changes made as requested. |
This was definitely a hard one for me. I started working on fragmentation test improvements 10 days ago and kept running into issues. I couldn't get all frame types to fragment, the ones I could would end up with a flag that didn't match here and there. In my attempts to improve the visibility of the fragments and frames, I ended up doing a bunch of work, all of which lead me to one of the goals identified in our meeting: the pushing down of frame length and stream id into the transports.
Well, it turns out that the issue was that the implementation wasn't quite complete, and all of the work I did for visibility just brought that into view for me. So as I implemented the rest of the fragmentation algorithm, I did so on top of a new framing abstraction that does not have frame length and stream id. This is introduced in an isolated commit, and then built on for the fragmentation. Since this would be an incremental migration, the
FragmentingDuplexConnection
acts as a translation boundary (there's a nice conversion utility) between the old abstraction and the new. For completeness, the fragmentation is tested both at the boundary of the new abstraction and the boundary of the old.The net result is a completely implemented, heavily tested fragmentation implementation and as a byproduct a new and tested-to-the-bit framing abstraction. Because of the movement of a couple of types (
FrameType
andErrorType
) in order to start untangling some packages, a number of files outside the scope of these changes was made. I'd highly recommend reviewing the.framing
and.fragmentation
packages directly as source rather than navigating over the 117 diffs. I'm hoping the test code and especially the fragmenter and reassembler implementations give you an idea of what I see the future with the new framing abstraction would look like.