Skip to content

Commit e778b3e

Browse files
committed
Merge branch '1.0.x'
Signed-off-by: Rossen Stoyanchev <[email protected]>
2 parents b87eed4 + d1e0507 commit e778b3e

File tree

3 files changed

+122
-25
lines changed

3 files changed

+122
-25
lines changed

rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -55,6 +55,8 @@ public class ClientServerInputMultiplexer implements Closeable {
5555
private final DuplexConnection source;
5656
private final DuplexConnection clientServerConnection;
5757

58+
private boolean setupReceived;
59+
5860
public ClientServerInputMultiplexer(DuplexConnection source) {
5961
this(source, emptyInterceptorRegistry, false);
6062
}
@@ -87,6 +89,7 @@ public ClientServerInputMultiplexer(
8789
case RESUME:
8890
case RESUME_OK:
8991
type = Type.SETUP;
92+
setupReceived = true;
9093
break;
9194
case LEASE:
9295
case KEEPALIVE:
@@ -101,6 +104,11 @@ public ClientServerInputMultiplexer(
101104
} else {
102105
type = Type.CLIENT;
103106
}
107+
if (!isClient && type != Type.SETUP && !setupReceived) {
108+
frame.release();
109+
throw new IllegalStateException(
110+
"SETUP or LEASE frame must be received before any others.");
111+
}
104112
return type;
105113
})
106114
.subscribe(
@@ -119,7 +127,11 @@ public ClientServerInputMultiplexer(
119127
break;
120128
}
121129
},
122-
t -> {});
130+
ex -> {
131+
setup.onError(ex);
132+
server.onError(ex);
133+
client.onError(ex);
134+
});
123135
}
124136

125137
public DuplexConnection asClientServerConnection() {

rsocket-core/src/test/java/io/rsocket/core/RSocketServerTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
11
package io.rsocket.core;
22

33
import static io.rsocket.frame.FrameLengthCodec.FRAME_LENGTH_MASK;
4+
import static org.assertj.core.api.Assertions.assertThat;
45

6+
import io.netty.buffer.ByteBufAllocator;
7+
import io.netty.buffer.Unpooled;
8+
import io.rsocket.RSocket;
9+
import io.rsocket.frame.RequestResponseFrameCodec;
10+
import io.rsocket.test.util.TestDuplexConnection;
511
import io.rsocket.test.util.TestServerTransport;
12+
import java.time.Duration;
13+
import java.util.Random;
614
import org.junit.jupiter.api.Test;
15+
import reactor.core.publisher.Mono;
16+
import reactor.core.publisher.MonoProcessor;
717
import reactor.test.StepVerifier;
818

919
public class RSocketServerTest {
@@ -42,4 +52,34 @@ public void ensuresMaxFrameLengthCanNotBeGreaterThenMaxPossibleFrameLength() {
4252
+ FRAME_LENGTH_MASK)
4353
.verify();
4454
}
55+
56+
@Test
57+
public void unexpectedFramesBeforeSetup() {
58+
MonoProcessor<Void> connectedMono = MonoProcessor.create();
59+
60+
TestServerTransport transport = new TestServerTransport();
61+
RSocketServer.create()
62+
.acceptor(
63+
(setup, sendingSocket) -> {
64+
connectedMono.onComplete();
65+
return Mono.just(new RSocket() {});
66+
})
67+
.bind(transport)
68+
.block();
69+
70+
byte[] bytes = new byte[16_000_000];
71+
new Random().nextBytes(bytes);
72+
73+
TestDuplexConnection connection = transport.connect();
74+
connection.addToReceivedBuffer(
75+
RequestResponseFrameCodec.encode(
76+
ByteBufAllocator.DEFAULT,
77+
1,
78+
false,
79+
Unpooled.EMPTY_BUFFER,
80+
ByteBufAllocator.DEFAULT.buffer(bytes.length).writeBytes(bytes)));
81+
82+
StepVerifier.create(connection.onClose()).expectComplete().verify(Duration.ofSeconds(30));
83+
assertThat(connectedMono.isTerminated()).as("Connection should not succeed").isFalse();
84+
}
4585
}

rsocket-core/src/test/java/io/rsocket/internal/ClientServerInputMultiplexerTest.java

Lines changed: 68 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,25 @@
1616

1717
package io.rsocket.internal;
1818

19+
import static org.assertj.core.api.Assertions.assertThat;
1920
import static org.junit.Assert.assertEquals;
2021

2122
import io.netty.buffer.ByteBuf;
2223
import io.netty.buffer.ByteBufAllocator;
2324
import io.netty.buffer.Unpooled;
2425
import io.rsocket.buffer.LeaksTrackingByteBufAllocator;
25-
import io.rsocket.frame.*;
26+
import io.rsocket.frame.ErrorFrameCodec;
27+
import io.rsocket.frame.KeepAliveFrameCodec;
28+
import io.rsocket.frame.LeaseFrameCodec;
29+
import io.rsocket.frame.MetadataPushFrameCodec;
30+
import io.rsocket.frame.ResumeFrameCodec;
31+
import io.rsocket.frame.ResumeOkFrameCodec;
32+
import io.rsocket.frame.SetupFrameCodec;
2633
import io.rsocket.plugins.InitializingInterceptorRegistry;
2734
import io.rsocket.test.util.TestDuplexConnection;
2835
import io.rsocket.util.DefaultPayload;
2936
import java.util.concurrent.atomic.AtomicInteger;
37+
import java.util.concurrent.atomic.AtomicReference;
3038
import org.junit.Before;
3139
import org.junit.Test;
3240

@@ -68,44 +76,44 @@ public void clientSplits() {
6876
.doOnNext(f -> setupFrames.incrementAndGet())
6977
.subscribe();
7078

79+
source.addToReceivedBuffer(setupFrame().retain());
80+
assertEquals(0, clientFrames.get());
81+
assertEquals(0, serverFrames.get());
82+
assertEquals(1, setupFrames.get());
83+
7184
source.addToReceivedBuffer(errorFrame(1).retain());
7285
assertEquals(1, clientFrames.get());
7386
assertEquals(0, serverFrames.get());
74-
assertEquals(0, setupFrames.get());
87+
assertEquals(1, setupFrames.get());
7588

7689
source.addToReceivedBuffer(errorFrame(1).retain());
7790
assertEquals(2, clientFrames.get());
7891
assertEquals(0, serverFrames.get());
79-
assertEquals(0, setupFrames.get());
92+
assertEquals(1, setupFrames.get());
8093

8194
source.addToReceivedBuffer(leaseFrame().retain());
8295
assertEquals(3, clientFrames.get());
8396
assertEquals(0, serverFrames.get());
84-
assertEquals(0, setupFrames.get());
97+
assertEquals(1, setupFrames.get());
8598

8699
source.addToReceivedBuffer(keepAliveFrame().retain());
87100
assertEquals(4, clientFrames.get());
88101
assertEquals(0, serverFrames.get());
89-
assertEquals(0, setupFrames.get());
102+
assertEquals(1, setupFrames.get());
90103

91104
source.addToReceivedBuffer(errorFrame(2).retain());
92105
assertEquals(4, clientFrames.get());
93106
assertEquals(1, serverFrames.get());
94-
assertEquals(0, setupFrames.get());
107+
assertEquals(1, setupFrames.get());
95108

96109
source.addToReceivedBuffer(errorFrame(0).retain());
97110
assertEquals(5, clientFrames.get());
98111
assertEquals(1, serverFrames.get());
99-
assertEquals(0, setupFrames.get());
112+
assertEquals(1, setupFrames.get());
100113

101114
source.addToReceivedBuffer(metadataPushFrame().retain());
102115
assertEquals(5, clientFrames.get());
103116
assertEquals(2, serverFrames.get());
104-
assertEquals(0, setupFrames.get());
105-
106-
source.addToReceivedBuffer(setupFrame().retain());
107-
assertEquals(5, clientFrames.get());
108-
assertEquals(2, serverFrames.get());
109117
assertEquals(1, setupFrames.get());
110118

111119
source.addToReceivedBuffer(resumeFrame().retain());
@@ -141,44 +149,44 @@ public void serverSplits() {
141149
.doOnNext(f -> setupFrames.incrementAndGet())
142150
.subscribe();
143151

152+
source.addToReceivedBuffer(setupFrame().retain());
153+
assertEquals(0, clientFrames.get());
154+
assertEquals(0, serverFrames.get());
155+
assertEquals(1, setupFrames.get());
156+
144157
source.addToReceivedBuffer(errorFrame(1).retain());
145158
assertEquals(1, clientFrames.get());
146159
assertEquals(0, serverFrames.get());
147-
assertEquals(0, setupFrames.get());
160+
assertEquals(1, setupFrames.get());
148161

149162
source.addToReceivedBuffer(errorFrame(1).retain());
150163
assertEquals(2, clientFrames.get());
151164
assertEquals(0, serverFrames.get());
152-
assertEquals(0, setupFrames.get());
165+
assertEquals(1, setupFrames.get());
153166

154167
source.addToReceivedBuffer(leaseFrame().retain());
155168
assertEquals(2, clientFrames.get());
156169
assertEquals(1, serverFrames.get());
157-
assertEquals(0, setupFrames.get());
170+
assertEquals(1, setupFrames.get());
158171

159172
source.addToReceivedBuffer(keepAliveFrame().retain());
160173
assertEquals(2, clientFrames.get());
161174
assertEquals(2, serverFrames.get());
162-
assertEquals(0, setupFrames.get());
175+
assertEquals(1, setupFrames.get());
163176

164177
source.addToReceivedBuffer(errorFrame(2).retain());
165178
assertEquals(2, clientFrames.get());
166179
assertEquals(3, serverFrames.get());
167-
assertEquals(0, setupFrames.get());
180+
assertEquals(1, setupFrames.get());
168181

169182
source.addToReceivedBuffer(errorFrame(0).retain());
170183
assertEquals(2, clientFrames.get());
171184
assertEquals(4, serverFrames.get());
172-
assertEquals(0, setupFrames.get());
185+
assertEquals(1, setupFrames.get());
173186

174187
source.addToReceivedBuffer(metadataPushFrame().retain());
175188
assertEquals(3, clientFrames.get());
176189
assertEquals(4, serverFrames.get());
177-
assertEquals(0, setupFrames.get());
178-
179-
source.addToReceivedBuffer(setupFrame().retain());
180-
assertEquals(3, clientFrames.get());
181-
assertEquals(4, serverFrames.get());
182190
assertEquals(1, setupFrames.get());
183191

184192
source.addToReceivedBuffer(resumeFrame().retain());
@@ -192,6 +200,43 @@ public void serverSplits() {
192200
assertEquals(3, setupFrames.get());
193201
}
194202

203+
@Test
204+
public void unexpectedFramesBeforeSetupFrame() {
205+
AtomicInteger clientFrames = new AtomicInteger();
206+
AtomicInteger serverFrames = new AtomicInteger();
207+
AtomicInteger setupFrames = new AtomicInteger();
208+
209+
AtomicReference<Throwable> clientError = new AtomicReference<>();
210+
AtomicReference<Throwable> serverError = new AtomicReference<>();
211+
AtomicReference<Throwable> setupError = new AtomicReference<>();
212+
213+
serverMultiplexer
214+
.asClientConnection()
215+
.receive()
216+
.subscribe(bb -> clientFrames.incrementAndGet(), clientError::set);
217+
serverMultiplexer
218+
.asServerConnection()
219+
.receive()
220+
.subscribe(bb -> serverFrames.incrementAndGet(), serverError::set);
221+
serverMultiplexer
222+
.asSetupConnection()
223+
.receive()
224+
.subscribe(bb -> setupFrames.incrementAndGet(), setupError::set);
225+
226+
source.addToReceivedBuffer(keepAliveFrame().retain());
227+
228+
assertThat(clientError.get().getMessage())
229+
.isEqualTo("SETUP or LEASE frame must be received before any others.");
230+
assertThat(serverError.get().getMessage())
231+
.isEqualTo("SETUP or LEASE frame must be received before any others.");
232+
assertThat(setupError.get().getMessage())
233+
.isEqualTo("SETUP or LEASE frame must be received before any others.");
234+
235+
assertEquals(0, clientFrames.get());
236+
assertEquals(0, serverFrames.get());
237+
assertEquals(0, setupFrames.get());
238+
}
239+
195240
private ByteBuf resumeFrame() {
196241
return ResumeFrameCodec.encode(allocator, Unpooled.EMPTY_BUFFER, 0, 0);
197242
}

0 commit comments

Comments
 (0)