Skip to content

Commit f9091fe

Browse files
committed
provides failing test
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent a8df316 commit f9091fe

File tree

1 file changed

+39
-0
lines changed

1 file changed

+39
-0
lines changed

rsocket-core/src/test/java/io/rsocket/RSocketRequesterTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import io.netty.buffer.ByteBuf;
2727
import io.netty.buffer.ByteBufAllocator;
28+
import io.netty.util.CharsetUtil;
2829
import io.rsocket.exceptions.ApplicationErrorException;
2930
import io.rsocket.exceptions.RejectedSetupException;
3031
import io.rsocket.frame.*;
@@ -35,6 +36,7 @@
3536
import io.rsocket.util.MultiSubscriberRSocket;
3637
import java.time.Duration;
3738
import java.util.ArrayList;
39+
import java.util.Iterator;
3840
import java.util.List;
3941
import java.util.stream.Collectors;
4042
import org.assertj.core.api.Assertions;
@@ -214,6 +216,43 @@ public void testChannelRequestServerSideCancellation() {
214216
Assertions.assertThat(request.isDisposed()).isTrue();
215217
}
216218

219+
@Test
220+
public void testCorrectFrameOrder() {
221+
MonoProcessor<Object> delayer = MonoProcessor.create();
222+
BaseSubscriber<Payload> subscriber =
223+
new BaseSubscriber<Payload>() {
224+
@Override
225+
protected void hookOnSubscribe(Subscription subscription) {}
226+
};
227+
rule.socket
228+
.requestChannel(
229+
Flux.concat(Flux.just(0).delayUntil(i -> delayer), Flux.range(1, 999))
230+
.map(i -> DefaultPayload.create(i + "")))
231+
.subscribe(subscriber);
232+
233+
subscriber.request(1);
234+
subscriber.request(Long.MAX_VALUE);
235+
delayer.onComplete();
236+
237+
Iterator<ByteBuf> iterator = rule.connection.getSent().iterator();
238+
239+
ByteBuf initialFrame = iterator.next();
240+
241+
Assertions.assertThat(FrameHeaderFlyweight.frameType(initialFrame)).isEqualTo(REQUEST_CHANNEL);
242+
Assertions.assertThat(RequestChannelFrameFlyweight.initialRequestN(initialFrame)).isEqualTo(1);
243+
Assertions.assertThat(
244+
RequestChannelFrameFlyweight.data(initialFrame).toString(CharsetUtil.UTF_8))
245+
.isEqualTo("0");
246+
247+
ByteBuf requestNFrame = iterator.next();
248+
249+
Assertions.assertThat(FrameHeaderFlyweight.frameType(requestNFrame)).isEqualTo(REQUEST_N);
250+
Assertions.assertThat(RequestNFrameFlyweight.requestN(requestNFrame))
251+
.isEqualTo(Integer.MAX_VALUE);
252+
253+
Assertions.assertThat(iterator.hasNext()).isFalse();
254+
}
255+
217256
public int sendRequestResponse(Publisher<Payload> response) {
218257
Subscriber<Payload> sub = TestSubscriber.create();
219258
response.subscribe(sub);

0 commit comments

Comments
 (0)