Skip to content

Commit 9a24f90

Browse files
committed
iterating on fragmenter
1 parent 3205d85 commit 9a24f90

File tree

1 file changed

+30
-2
lines changed

1 file changed

+30
-2
lines changed

src/main/java/io/reactivesocket/internal/PayloadFragmenter.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@
2727
*
2828
* Not thread-safe
2929
*/
30-
public class PayloadFragmenter implements Iterator<Frame>
30+
public class PayloadFragmenter implements Iterable<Frame>, Iterator<Frame>
3131
{
3232
private enum Type
3333
{
34-
RESPONSE, REQUEST_CHANNEL
34+
RESPONSE, RESPONSE_COMPLETE, REQUEST_CHANNEL
3535
}
3636

3737
private final int metadataMtu;
@@ -56,13 +56,32 @@ public void resetForResponse(final int streamId, final Payload payload)
5656
type = Type.RESPONSE;
5757
}
5858

59+
public void resetForResponseComplete(final int streamId, final Payload payload)
60+
{
61+
reset(streamId, payload);
62+
type = Type.RESPONSE_COMPLETE;
63+
}
64+
5965
public void resetForRequestChannel(final int streamId, final Payload payload, final int initialRequestN)
6066
{
6167
reset(streamId, payload);
6268
type = Type.REQUEST_CHANNEL;
6369
this.initialRequestN = initialRequestN;
6470
}
6571

72+
public static boolean requiresFragmenting(final int metadataMtu, final int dataMtu, final Payload payload)
73+
{
74+
final ByteBuffer metadata = payload.getMetadata();
75+
final ByteBuffer data = payload.getData();
76+
77+
return (metadata.remaining() > metadataMtu || data.remaining() > dataMtu);
78+
}
79+
80+
public Iterator<Frame> iterator()
81+
{
82+
return this;
83+
}
84+
6685
public boolean hasNext()
6786
{
6887
return (dataOffset < data.capacity() || metadataOffset < metadata.capacity());
@@ -96,6 +115,15 @@ public Frame next()
96115

97116
result = Frame.Response.from(streamId, FrameType.NEXT, metadataBuffer, dataBuffer, flags);
98117
}
118+
if (Type.RESPONSE_COMPLETE == type)
119+
{
120+
if (isMoreFollowing)
121+
{
122+
flags |= FrameHeaderFlyweight.FLAGS_RESPONSE_F;
123+
}
124+
125+
result = Frame.Response.from(streamId, FrameType.NEXT_COMPLETE, metadataBuffer, dataBuffer, flags);
126+
}
99127
else if (Type.REQUEST_CHANNEL == type)
100128
{
101129
if (isMoreFollowing)

0 commit comments

Comments
 (0)