|
26 | 26 | import io.netty.buffer.ByteBuf;
|
27 | 27 | import io.netty.buffer.ByteBufAllocator;
|
28 | 28 | import io.rsocket.Payload;
|
| 29 | +import io.netty.util.CharsetUtil; |
29 | 30 | import io.rsocket.exceptions.ApplicationErrorException;
|
30 | 31 | import io.rsocket.exceptions.RejectedSetupException;
|
31 | 32 | import io.rsocket.frame.*;
|
|
36 | 37 | import io.rsocket.util.MultiSubscriberRSocket;
|
37 | 38 | import java.time.Duration;
|
38 | 39 | import java.util.ArrayList;
|
| 40 | +import java.util.Iterator; |
39 | 41 | import java.util.List;
|
40 | 42 | import java.util.stream.Collectors;
|
41 | 43 | import org.assertj.core.api.Assertions;
|
@@ -215,6 +217,43 @@ public void testChannelRequestServerSideCancellation() {
|
215 | 217 | Assertions.assertThat(request.isDisposed()).isTrue();
|
216 | 218 | }
|
217 | 219 |
|
| 220 | + @Test |
| 221 | + public void testCorrectFrameOrder() { |
| 222 | + MonoProcessor<Object> delayer = MonoProcessor.create(); |
| 223 | + BaseSubscriber<Payload> subscriber = |
| 224 | + new BaseSubscriber<Payload>() { |
| 225 | + @Override |
| 226 | + protected void hookOnSubscribe(Subscription subscription) {} |
| 227 | + }; |
| 228 | + rule.socket |
| 229 | + .requestChannel( |
| 230 | + Flux.concat(Flux.just(0).delayUntil(i -> delayer), Flux.range(1, 999)) |
| 231 | + .map(i -> DefaultPayload.create(i + ""))) |
| 232 | + .subscribe(subscriber); |
| 233 | + |
| 234 | + subscriber.request(1); |
| 235 | + subscriber.request(Long.MAX_VALUE); |
| 236 | + delayer.onComplete(); |
| 237 | + |
| 238 | + Iterator<ByteBuf> iterator = rule.connection.getSent().iterator(); |
| 239 | + |
| 240 | + ByteBuf initialFrame = iterator.next(); |
| 241 | + |
| 242 | + Assertions.assertThat(FrameHeaderFlyweight.frameType(initialFrame)).isEqualTo(REQUEST_CHANNEL); |
| 243 | + Assertions.assertThat(RequestChannelFrameFlyweight.initialRequestN(initialFrame)).isEqualTo(1); |
| 244 | + Assertions.assertThat( |
| 245 | + RequestChannelFrameFlyweight.data(initialFrame).toString(CharsetUtil.UTF_8)) |
| 246 | + .isEqualTo("0"); |
| 247 | + |
| 248 | + ByteBuf requestNFrame = iterator.next(); |
| 249 | + |
| 250 | + Assertions.assertThat(FrameHeaderFlyweight.frameType(requestNFrame)).isEqualTo(REQUEST_N); |
| 251 | + Assertions.assertThat(RequestNFrameFlyweight.requestN(requestNFrame)) |
| 252 | + .isEqualTo(Integer.MAX_VALUE); |
| 253 | + |
| 254 | + Assertions.assertThat(iterator.hasNext()).isFalse(); |
| 255 | + } |
| 256 | + |
218 | 257 | public int sendRequestResponse(Publisher<Payload> response) {
|
219 | 258 | Subscriber<Payload> sub = TestSubscriber.create();
|
220 | 259 | response.subscribe(sub);
|
|
0 commit comments