Skip to content

Fragmentation #481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 5, 2018
Merged

Fragmentation #481

merged 2 commits into from
Apr 5, 2018

Conversation

nebhale
Copy link
Member

@nebhale nebhale commented Apr 1, 2018

This was definitely a hard one for me. I started working on fragmentation test improvements 10 days ago and kept running into issues. I couldn't get all frame types to fragment, the ones I could would end up with a flag that didn't match here and there. In my attempts to improve the visibility of the fragments and frames, I ended up doing a bunch of work, all of which lead me to one of the goals identified in our meeting: the pushing down of frame length and stream id into the transports.

Well, it turns out that the issue was that the implementation wasn't quite complete, and all of the work I did for visibility just brought that into view for me. So as I implemented the rest of the fragmentation algorithm, I did so on top of a new framing abstraction that does not have frame length and stream id. This is introduced in an isolated commit, and then built on for the fragmentation. Since this would be an incremental migration, the FragmentingDuplexConnection acts as a translation boundary (there's a nice conversion utility) between the old abstraction and the new. For completeness, the fragmentation is tested both at the boundary of the new abstraction and the boundary of the old.

The net result is a completely implemented, heavily tested fragmentation implementation and as a byproduct a new and tested-to-the-bit framing abstraction. Because of the movement of a couple of types (FrameType and ErrorType) in order to start untangling some packages, a number of files outside the scope of these changes was made. I'd highly recommend reviewing the .framing and .fragmentation packages directly as source rather than navigating over the 117 diffs. I'm hoping the test code and especially the fragmenter and reassembler implementations give you an idea of what I see the future with the new framing abstraction would look like.

@smaldini
Copy link
Member

smaldini commented Apr 1, 2018

This gonna take some time to review given the number of changes !

*
* @param delegate the {@link DuplexConnection} to decorate
* @param maxFragmentSize the maximum fragment size
* @throws NullPointerException if {@code byteBufAllocator} or {@code delegate} are {@code null}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: byteBufAllocator not provided

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problems with nits. I’m super-meticulous about my code, and especially my documentation, so keep up with this level of review.

Copy link
Member

@yschimke yschimke left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM - consider this a low signal review, but looks really good. I'd prefer others more familiar with the fragmentation to Approve. Happy to unblock later if its waiting around.

* @return the {@link String} as a {@code UTF-8} encoded {@link ByteBuf} or {@code null} if {@code
* s} is {@code null}.
*/
static @Nullable ByteBuf getUff8AsByteBuf(@Nullable String s) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: Uff

import io.netty.buffer.ByteBufUtil;
import org.assertj.core.presentation.StandardRepresentation;

public final class ByteBufRepresentation extends StandardRepresentation {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL: nice!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Single most important change I made for testing. Amazing how much of a difference it makes when debugging framing bugs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Frames all have solid toString()s now so I don’t think they need representations, but if you can think of other types that could benefit from this treatment, we should add them.

* @return the data mapped to a different type
* @throws NullPointerException if {@code function} is {@code null}
*/
public <T> T mapDataMimeType(Function<ByteBuf, T> function) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the thinking over map functions for something as trivial as mime type. it's generally a US-ASCII string.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally is the problem:

MIME Type for encoding of Data and Metadata. This SHOULD be a US-ASCII string that includes...

The use of “SHOULD” leaves the possibility of a non-US-ASCII string for this value. I do believe that everyone will use the String variants but wanted to be pedantically accurate to the spec.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think maybe making the default easy to find default return a UTF-8 string, and possibly having this mapDataMimeType as a backup. But TBH I think the SHOULD argument was over US-ASCII vs UTF-8 previously, not over arbitrary binary encodings here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you do the method name getDataMimeTypeAsUtf8()? I'm not sure what would make one method easier to find than the other.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or rather, is the bug here that it should be toString(US_ASCII)?

* @see <a href="https://github.com/rsocket/rsocket/blob/master/Protocol.md#setup-frame-0x01">Setup
* Frame</a>
*/
public final class SetupFrame extends AbstractRecyclableMetadataAndDataFrame<SetupFrame> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this part of the change.

Copy link
Member

@robertroeser robertroeser left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job on the frame changes - nice use of the byebuf api

@@ -40,6 +40,7 @@ subprojects {
}

dependencies {
dependency 'ch.qos.logback:logback-classic:1.2.3'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to include a log implementation in the dependencies?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the dependencyManagment plugin declaration. Adding it here creates a logical dependency with a version number, but doesn't actually attach it to any artifact. It makes it possible to add it as a testRuntimeOnly dependency to multiple projects without having to keep the version number in sync across all of them.


/** An RSocket frame that only contains metadata. */
public interface MetadataFrame extends Frame {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer we have a flag that indicates if there is metadata or not, and not return optionals. We don't have allocate anything now to look at the metadata and now we would have to create an extra object.


/** An RSocket frame that only contains data. */
public interface DataFrame extends Frame {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get what you're doing, but I'd still really like the ability to get the ByteBuf without having to do something like mapData(Function.Identity()) or any Optionals.checkNotNull in the call path, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can call it something like getUnsafeData() ad getUnsafeMetadata()

return Mono.just(frame);
}
});
public Mono<Void> onClose() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure this will lead to problems when we start chaining duplex connections together - returning Mono.empty() is going to cause onClose to fire right away. This should probably return delegate.onClose()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this is just a massive oversight on my part. This should absolutely have been delegated.

* @return a {@link GroupedFlux} with a key of the stream id and values of the frames
*/
@SuppressWarnings("UnassignedFluxMonoInstance")
public static Flux<GroupedFlux<Integer, io.rsocket.framing.Frame>> fromAbstractionLeakingFlux(
Copy link
Member

@robertroeser robertroeser Apr 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every frame is going to come in order, and you're either going to be rebuilding a frame, or emitting it if nothing has to happen, so I don't think grouping is necessary.

I think this would probably lead problems too as every stream gets a new stream id.

It also adds a flatMap to the duplex connection.

Copy link
Member Author

@nebhale nebhale Apr 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is exactly the bit that I see as a "intermediate" step as the stream id is pushed into the transport. When this happens, it means that Flux<Frame> receive() no longer has sufficient information for consumers to partition into individual streams. I'm not sure exactly what the API will look like, but it's going to need to evolve to something like Flux<Flux<Frame>> where each outer Flux is an isolated stream of a given id. The transport is responsible for maintaining the mapping and closing those Fluxes where appropriate (I would not expect an implementation to use GroupedFlux, more likely a lookup table) at which point the implementations would look like:

private Flux<Frame> toFragmentedFrames(Flux<Frame> stream) {
  return stream
      .flatMap(frameFragmenter::fragment);
}

private Flux<Frame> toReassembledFrames(Flux<Frame> stream) {
  FrameReassembler frameReassembler = createFrameReassembler(byteBufAllocator);

  return stream
      .handle(frameReassembler::reassemble)
      .doOnTerminate(() -> disposeQuietly(frameReassembler));
}

It's at this point, that the entire AbstractionLeakingFrameUtils disappears as the unwrapping and partitioning of streams happens in the transport (via stream id or transport multiplexing) directly without the ceremony of unwrapping and rewrapping anywhere but the transport itself.

nebhale added 2 commits April 3, 2018 12:41
This change introduces a framing package that contains strongly typed
representations of all of the RSocket frame types.  These representations are
introduced as a separate abstraction because they specifically remove the
frame length and stream id that are an optional, transport-layer concern. This
allows will allow the main body of the code to be gradually moved over to an
implementation that only deals with the logical frame types, and pushes the
decoration of the frames with frame length and/or stream id down to the
transport layer.  Note that no implementation has been converted over; this is
just the prework to have an the frame types ready for migration.

These frame type representations are all implemented using the Flyweight
pattern (i.e. use the Netty Recycler) and preserve the zero-copy nature of the
existing frame representation.  These implementations have consistent and
symmetric management of ByteBuf retain and release lifecycle, in addition to
well documented and proper nullability enforced APIs, specific to each frame
type. Each implementation is also tested against raw binary representations to
ensure the strictest specification compliance.

Finally, improvements were made to testing with regards to configurable test
logging, an AssertJ ByteBuf Representation for visual comparison on test
failure, and other conveniences.
This change improves the implementation and test coverage of the fragmentation
and reassembly functionality.  The fragmentation functionality now support all
frame types (previously, only the PAYLOAD frame could be fragmented) and
ensures that a fragmentation and reassembly roundtrip results in an identical
frame.

The implementations of the fragmentation and reassembly functionality has been
implemented on top of the new framing abstraction for testability and to start
moving off of the abstraction that leaks frame lengths and stream ids.  Note
that the implementation of DuplexConnection *does not* move to the new framing
abstraction.  As this interface is not yet migrated, the
FragmentationDuplexConnection is responsible for translating back and forth
between the old abstraction and the new one.  This has an implication that
overhead will increase (although minimally because of zero-copy data and the
use of Flyweights) temporarily during the migration to the new abstraction.
Once that abstraction has been adopted everywhere, the translation will be
removed, and the overheard will go back to it's current levels.
@nebhale
Copy link
Member Author

nebhale commented Apr 3, 2018

@robertroeser and @yschimke Changes made as requested. SETUP frame MIME-types are handled as Strings everywhere, ByteBufs are exposed directly via getUnsafe*(), the fragmentation is updated to use these direct methods, and reassembly uses a lookup map rather than a GroupedFlux.

@yschimke yschimke merged commit 1aecebe into rsocket:1.0.x Apr 5, 2018
@nebhale nebhale deleted the fragmentation branch April 5, 2018 14:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants