Skip to content

Commit b8fd58a

Browse files
authored
Empty Payload Response treated as Complete frame (#206)
#### Problem `FrameHeaderFlyweight` resolves the frame type as `NEXT_COMPLETE` only if the payload is not empty. Otherwise, the frame is treated as `COMPLETE`. This is a problem for request-response as this means the response publisher will complete without emitting a response, if the response does not have any data. Such a condition is a failure as request-response should have exaclty one response. #### Modification Eliminated the check for data length and always emit `NEXT_COMPLETE` if the complete flag is set. According to the [protocol](https://github.com/ReactiveSocket/reactivesocket/blob/master/Protocol.md#response-frame): ``` A Response is generally referred to as a NEXT. ``` So, this behavior would not be a violation of the protocol but will produce empty `onNext` before `onComplete` #### Result Better behavior with empty responses.
1 parent 04b8cd7 commit b8fd58a

File tree

5 files changed

+20
-19
lines changed

5 files changed

+20
-19
lines changed

reactivesocket-core/src/main/java/io/reactivesocket/frame/FrameHeaderFlyweight.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ public static int encode(
148148
final int frameLength = computeFrameHeaderLength(frameType, metadata.remaining(), data.remaining());
149149

150150
final FrameType outFrameType;
151-
152151
switch (frameType) {
152+
case NEXT_COMPLETE:
153153
case COMPLETE:
154154
outFrameType = FrameType.RESPONSE;
155155
flags |= FLAGS_RESPONSE_C;
@@ -162,10 +162,10 @@ public static int encode(
162162
break;
163163
}
164164

165-
int length = FrameHeaderFlyweight.encodeFrameHeader(mutableDirectBuffer, offset, frameLength, flags, outFrameType, streamId);
165+
int length = encodeFrameHeader(mutableDirectBuffer, offset, frameLength, flags, outFrameType, streamId);
166166

167-
length += FrameHeaderFlyweight.encodeMetadata(mutableDirectBuffer, offset, offset + length, metadata);
168-
length += FrameHeaderFlyweight.encodeData(mutableDirectBuffer, offset + length, data);
167+
length += encodeMetadata(mutableDirectBuffer, offset, offset + length, metadata);
168+
length += encodeData(mutableDirectBuffer, offset + length, data);
169169

170170
return length;
171171
}
@@ -179,13 +179,10 @@ public static FrameType frameType(final DirectBuffer directBuffer, final int off
179179

180180
if (FrameType.RESPONSE == result) {
181181
final int flags = flags(directBuffer, offset);
182-
final int dataLength = dataLength(directBuffer, offset, 0);
183182

184183
boolean complete = FLAGS_RESPONSE_C == (flags & FLAGS_RESPONSE_C);
185-
if (complete && 0 < dataLength) {
184+
if (complete) {
186185
result = FrameType.NEXT_COMPLETE;
187-
} else if (complete) {
188-
result = FrameType.COMPLETE;
189186
} else {
190187
result = FrameType.NEXT;
191188
}
@@ -233,7 +230,7 @@ private static int frameLength(final DirectBuffer directBuffer, final int offset
233230
}
234231

235232
private static int computeMetadataLength(final int metadataPayloadLength) {
236-
return metadataPayloadLength + ((0 == metadataPayloadLength) ? 0 : BitUtil.SIZE_OF_INT);
233+
return metadataPayloadLength + (0 == metadataPayloadLength? 0 : BitUtil.SIZE_OF_INT);
237234
}
238235

239236
private static int metadataFieldLength(final DirectBuffer directBuffer, final int offset) {

reactivesocket-core/src/main/java/io/reactivesocket/internal/RemoteSender.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ public void onSubscribe(Subscription s) {
142142
@Override
143143
public void onNext(Frame frame) {
144144
// No flow-control check
145-
assert frame.getType() != FrameType.ERROR && frame.getType() != FrameType.COMPLETE;
145+
FrameType frameType = frame.getType();
146+
assert frameType != FrameType.ERROR && !isCompleteFrame(frameType);
146147
synchronized (this) {
147148
outstanding--;
148149
}
@@ -229,10 +230,14 @@ private boolean trySendTerminalFrame(Frame frame, Throwable optionalError) {
229230

230231
private void unsafeSendTerminalFrameToTransport(Frame terminalFrame, Throwable optionalError) {
231232
transportSubscription.safeOnNext(terminalFrame);
232-
if (terminalFrame.getType() == FrameType.COMPLETE) {
233+
if (terminalFrame.getType() == FrameType.COMPLETE || terminalFrame.getType() == FrameType.NEXT_COMPLETE) {
233234
transportSubscription.safeOnComplete();
234235
} else {
235236
transportSubscription.safeOnError(optionalError);
236237
}
237238
}
239+
240+
private static boolean isCompleteFrame(FrameType frameType) {
241+
return frameType == FrameType.COMPLETE || frameType == FrameType.NEXT_COMPLETE;
242+
}
238243
}

reactivesocket-core/src/test/java/io/reactivesocket/ServerReactiveSocketTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@
2929
import java.util.concurrent.atomic.AtomicBoolean;
3030

3131
import static org.hamcrest.MatcherAssert.assertThat;
32-
import static org.hamcrest.Matchers.empty;
33-
import static org.hamcrest.Matchers.hasSize;
34-
import static org.hamcrest.Matchers.is;
32+
import static org.hamcrest.Matchers.*;
3533

3634
public class ServerReactiveSocketTest {
3735

@@ -60,7 +58,8 @@ public void testHandleResponseFrameNoError() throws Exception {
6058
assertThat("Unexpected error.", rule.errors, is(empty()));
6159
TestSubscriber<Frame> sendSub = sendSubscribers.iterator().next();
6260
sendSub.request(2);
63-
assertThat("Unexpected frame sent.", rule.connection.awaitSend().getType(), is(FrameType.COMPLETE));
61+
assertThat("Unexpected frame sent.", rule.connection.awaitSend().getType(),
62+
anyOf(is(FrameType.COMPLETE), is(FrameType.NEXT_COMPLETE)));
6463
}
6564

6665
@Test(timeout = 2000)

reactivesocket-core/src/test/java/io/reactivesocket/internal/RemoteSenderTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void testOnComplete() throws Exception {
7777
receiverSub.assertValue(new Predicate<Frame>() {
7878
@Override
7979
public boolean test(Frame frame) throws Exception {
80-
return frame.getType() == FrameType.COMPLETE;
80+
return frame.getType() == FrameType.COMPLETE || frame.getType() == FrameType.NEXT_COMPLETE;
8181
}
8282
});
8383

@@ -138,7 +138,7 @@ public void testOnCompleteWithBuffer() throws Exception {
138138
receiverSub.assertValue(new Predicate<Frame>() {
139139
@Override
140140
public boolean test(Frame frame) throws Exception {
141-
return frame.getType() == FrameType.COMPLETE;
141+
return frame.getType() == FrameType.COMPLETE || frame.getType() == FrameType.NEXT_COMPLETE;
142142
}
143143
});
144144
receiverSub.assertNoErrors();

reactivesocket-examples/src/main/java/io/reactivesocket/examples/transport/tcp/requestresponse/HelloWorldClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ public Publisher<Payload> requestResponse(Payload p) {
5555
.connect())
5656
.blockingFirst();
5757

58-
Flowable.fromPublisher(socket.requestResponse(new PayloadImpl("Hello")))
58+
Flowable.fromPublisher(socket.requestResponse(PayloadImpl.EMPTY))
5959
.map(payload -> payload.getData())
6060
.map(ByteBufferUtil::toUtf8String)
61-
.doOnNext(System.out::println)
61+
.doOnNext(x -> System.out.println("===>>>> " + x))
6262
.concatWith(Flowable.fromPublisher(socket.close()).cast(String.class))
6363
.blockingFirst();
6464
}

0 commit comments

Comments
 (0)