Skip to content

Commit 92b8009

Browse files
OlegDokukarobertroeser
authored andcommitted
backports fixes bug with incorrect WS framesize setup. (#610) (#644)
* backports fixes bug with incorrect WS framesize setup (#614) * rollback mockito version * changed timeout * changed timeout * small fix * replace with concurrent maps Signed-off-by: Oleh Dokuka <[email protected]>
1 parent c78bab2 commit 92b8009

File tree

14 files changed

+243
-62
lines changed

14 files changed

+243
-62
lines changed

build.gradle

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,12 @@ subprojects {
5555
dependency "io.micrometer:micrometer-core:${ext['micrometer.version']}"
5656
dependency "org.assertj:assertj-core:${ext['assertj.version']}"
5757
dependency "org.hdrhistogram:HdrHistogram:${ext['hdrhistogram.version']}"
58-
dependency "org.mockito:mockito-core:${ ext['mockito.version']}"
5958
dependency "org.slf4j:slf4j-api:${ext['slf4j.version']}"
59+
60+
dependencySet(group: 'org.mockito', version: ext['mockito.version']) {
61+
entry 'mockito-junit-jupiter'
62+
entry 'mockito-core'
63+
}
6064

6165
dependencySet(group: 'org.junit.jupiter', version: ext['junit.version']) {
6266
entry 'junit-jupiter-api'

rsocket-core/src/main/java/io/rsocket/RSocketClient.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,6 @@
2323
import io.rsocket.internal.LimitableRequestPublisher;
2424
import io.rsocket.internal.UnboundedProcessor;
2525
import io.rsocket.internal.UnicastMonoProcessor;
26-
import org.reactivestreams.Processor;
27-
import org.reactivestreams.Publisher;
28-
import org.reactivestreams.Subscriber;
29-
import reactor.core.publisher.*;
30-
3126
import java.nio.channels.ClosedChannelException;
3227
import java.time.Duration;
3328
import java.util.Collections;
@@ -489,10 +484,10 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
489484
{
490485
LimitableRequestPublisher sender = senders.remove(streamId);
491486
if (sender != null) {
492-
sender.cancel();
487+
sender.cancel();
488+
}
489+
break;
493490
}
494-
break;
495-
}
496491
case NEXT:
497492
receiver.onNext(frameDecoder.apply(frame));
498493
break;

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -305,9 +305,7 @@ private class ServerStart<T extends Closeable> implements Start<T>, ServerTransp
305305

306306
@Override
307307
public Mono<T> start() {
308-
return transportServer
309-
.get()
310-
.start(this::acceptor);
308+
return transportServer.get().start(this::acceptor);
311309
}
312310

313311
@Override
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

rsocket-core/src/test/java/io/rsocket/SetupRejectionTest.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
import java.util.concurrent.Executors;
1616
import java.util.concurrent.ScheduledExecutorService;
1717
import java.util.concurrent.TimeUnit;
18-
1918
import org.junit.jupiter.api.Test;
2019
import reactor.core.publisher.Mono;
20+
import reactor.core.publisher.MonoProcessor;
2121
import reactor.core.publisher.UnicastProcessor;
2222
import reactor.test.StepVerifier;
2323

@@ -55,15 +55,21 @@ void requesterStreamsTerminatedOnZeroErrorFrame() {
5555

5656
String errorMsg = "error";
5757

58+
MonoProcessor processor = MonoProcessor.create();
5859
scheduler.schedule(
59-
() -> conn.addToReceivedBuffer(Frame.Error.from(0, new RejectedSetupException(errorMsg))),
60+
() -> {
61+
conn.addToReceivedBuffer(Frame.Error.from(0,
62+
new RejectedSetupException(errorMsg)));
63+
processor.onComplete();
64+
},
6065
100,
6166
TimeUnit.MILLISECONDS);
6267

63-
StepVerifier.create(rSocket.requestResponse(DefaultPayload.create("test")))
64-
.expectErrorMatches(
65-
err -> err instanceof RejectedSetupException && errorMsg.equals(err.getMessage()))
66-
.verify(Duration.ofSeconds(5));
68+
StepVerifier.create(rSocket.requestResponse(DefaultPayload.create("test"))
69+
.delaySubscription(processor))
70+
.expectErrorMatches(
71+
err -> err instanceof RejectedSetupException && errorMsg.equals(err.getMessage()))
72+
.verify(Duration.ofSeconds(5));
6773

6874
assertThat(errors).hasSize(1);
6975
assertThat(rSocket.isDisposed()).isTrue();

rsocket-transport-netty/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ dependencies {
3636
testImplementation project(':rsocket-test')
3737
testImplementation 'io.projectreactor:reactor-test'
3838
testImplementation 'org.assertj:assertj-core'
39+
testImplementation 'org.mockito:mockito-core'
40+
testImplementation 'org.mockito:mockito-junit-jupiter'
3941
testImplementation 'org.junit.jupiter:junit-jupiter-api'
4042
testImplementation 'org.junit.jupiter:junit-jupiter-params'
4143

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/WebsocketClientTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.rsocket.transport.netty.client;
1818

19+
import static io.rsocket.frame.FrameHeaderFlyweight.FRAME_LENGTH_MASK;
1920
import static io.rsocket.transport.netty.UriUtils.getPort;
2021
import static io.rsocket.transport.netty.UriUtils.isSecure;
2122

@@ -137,7 +138,7 @@ public static WebsocketClientTransport create(HttpClient client, String path) {
137138
public Mono<DuplexConnection> connect() {
138139
return client
139140
.headers(headers -> transportHeaders.get().forEach(headers::set))
140-
.websocket()
141+
.websocket(FRAME_LENGTH_MASK)
141142
.uri(path)
142143
.connect()
143144
.map(WebsocketDuplexConnection::new);

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketRouteTransport.java

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,21 @@
1616

1717
package io.rsocket.transport.netty.server;
1818

19+
import static io.rsocket.frame.FrameHeaderFlyweight.FRAME_LENGTH_MASK;
20+
21+
import io.netty.handler.codec.http.HttpMethod;
1922
import io.rsocket.Closeable;
2023
import io.rsocket.transport.ServerTransport;
2124
import io.rsocket.transport.netty.WebsocketDuplexConnection;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.Map;
2228
import java.util.Objects;
29+
import java.util.concurrent.ConcurrentHashMap;
2330
import java.util.function.BiFunction;
2431
import java.util.function.Consumer;
32+
import java.util.regex.Matcher;
33+
import java.util.regex.Pattern;
2534
import org.reactivestreams.Publisher;
2635
import reactor.core.publisher.Mono;
2736
import reactor.netty.Connection;
@@ -36,7 +45,7 @@
3645
*/
3746
public final class WebsocketRouteTransport implements ServerTransport<Closeable> {
3847

39-
private final String path;
48+
private final UriPathTemplate template;
4049

4150
private final Consumer<? super HttpServerRoutes> routesBuilder;
4251

@@ -54,7 +63,7 @@ public WebsocketRouteTransport(
5463

5564
this.server = Objects.requireNonNull(server, "server must not be null");
5665
this.routesBuilder = Objects.requireNonNull(routesBuilder, "routesBuilder must not be null");
57-
this.path = Objects.requireNonNull(path, "path must not be null");
66+
this.template = new UriPathTemplate(Objects.requireNonNull(path, "path must not be null"));
5867
}
5968

6069
@Override
@@ -65,7 +74,11 @@ public Mono<Closeable> start(ConnectionAcceptor acceptor) {
6574
.route(
6675
routes -> {
6776
routesBuilder.accept(routes);
68-
routes.ws(path, newHandler(acceptor));
77+
routes.ws(
78+
hsr -> hsr.method().equals(HttpMethod.GET) && template.matches(hsr.uri()),
79+
newHandler(acceptor),
80+
null,
81+
FRAME_LENGTH_MASK);
6982
})
7083
.bind()
7184
.map(CloseableChannel::new);
@@ -88,4 +101,83 @@ public static BiFunction<WebsocketInbound, WebsocketOutbound, Publisher<Void>> n
88101
return acceptor.apply(connection).then(out.neverComplete());
89102
};
90103
}
104+
105+
static final class UriPathTemplate {
106+
107+
private static final Pattern FULL_SPLAT_PATTERN = Pattern.compile("[\\*][\\*]");
108+
private static final String FULL_SPLAT_REPLACEMENT = ".*";
109+
110+
private static final Pattern NAME_SPLAT_PATTERN = Pattern.compile("\\{([^/]+?)\\}[\\*][\\*]");
111+
private static final String NAME_SPLAT_REPLACEMENT = "(?<%NAME%>.*)";
112+
113+
private static final Pattern NAME_PATTERN = Pattern.compile("\\{([^/]+?)\\}");
114+
private static final String NAME_REPLACEMENT = "(?<%NAME%>[^\\/]*)";
115+
116+
private final List<String> pathVariables = new ArrayList<>();
117+
private final Map<String, Matcher> matchers = new ConcurrentHashMap<>();
118+
119+
private final Pattern uriPattern;
120+
121+
static String filterQueryParams(String uri) {
122+
int hasQuery = uri.lastIndexOf("?");
123+
if (hasQuery != -1) {
124+
return uri.substring(0, hasQuery);
125+
} else {
126+
return uri;
127+
}
128+
}
129+
130+
/**
131+
* Creates a new {@code UriPathTemplate} from the given {@code uriPattern}.
132+
*
133+
* @param uriPattern The pattern to be used by the template
134+
*/
135+
UriPathTemplate(String uriPattern) {
136+
String s = "^" + filterQueryParams(uriPattern);
137+
138+
Matcher m = NAME_SPLAT_PATTERN.matcher(s);
139+
while (m.find()) {
140+
for (int i = 1; i <= m.groupCount(); i++) {
141+
String name = m.group(i);
142+
pathVariables.add(name);
143+
s = m.replaceFirst(NAME_SPLAT_REPLACEMENT.replaceAll("%NAME%", name));
144+
m.reset(s);
145+
}
146+
}
147+
148+
m = NAME_PATTERN.matcher(s);
149+
while (m.find()) {
150+
for (int i = 1; i <= m.groupCount(); i++) {
151+
String name = m.group(i);
152+
pathVariables.add(name);
153+
s = m.replaceFirst(NAME_REPLACEMENT.replaceAll("%NAME%", name));
154+
m.reset(s);
155+
}
156+
}
157+
158+
m = FULL_SPLAT_PATTERN.matcher(s);
159+
while (m.find()) {
160+
s = m.replaceAll(FULL_SPLAT_REPLACEMENT);
161+
m.reset(s);
162+
}
163+
164+
this.uriPattern = Pattern.compile(s + "$");
165+
}
166+
167+
/**
168+
* Tests the given {@code uri} against this template, returning {@code true} if the uri matches
169+
* the template, {@code false} otherwise.
170+
*
171+
* @param uri The uri to match
172+
* @return {@code true} if there's a match, {@code false} otherwise
173+
*/
174+
public boolean matches(String uri) {
175+
return matcher(uri).matches();
176+
}
177+
178+
private Matcher matcher(String uri) {
179+
final String foundUri = filterQueryParams(uri);
180+
return matchers.computeIfAbsent(uri, __ -> uriPattern.matcher(foundUri));
181+
}
182+
}
91183
}

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package io.rsocket.transport.netty.server;
1818

19+
import static io.rsocket.frame.FrameHeaderFlyweight.FRAME_LENGTH_MASK;
20+
1921
import io.rsocket.transport.ClientTransport;
2022
import io.rsocket.transport.ServerTransport;
2123
import io.rsocket.transport.TransportHeaderAware;
@@ -108,7 +110,8 @@ public Mono<CloseableChannel> start(ConnectionAcceptor acceptor) {
108110
.handle(
109111
(request, response) -> {
110112
transportHeaders.get().forEach(response::addHeader);
111-
return response.sendWebsocket(WebsocketRouteTransport.newHandler(acceptor));
113+
return response.sendWebsocket(
114+
null, FRAME_LENGTH_MASK, WebsocketRouteTransport.newHandler(acceptor));
112115
})
113116
.bind()
114117
.map(CloseableChannel::new);
Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package io.rsocket.transport.netty;
22

3-
import java.net.URI;
4-
import java.time.Duration;
5-
63
import io.rsocket.AbstractRSocket;
74
import io.rsocket.Payload;
85
import io.rsocket.RSocket;
@@ -12,6 +9,8 @@
129
import io.rsocket.transport.netty.server.WebsocketRouteTransport;
1310
import io.rsocket.util.DefaultPayload;
1411
import io.rsocket.util.EmptyPayload;
12+
import java.net.URI;
13+
import java.time.Duration;
1514
import org.junit.jupiter.api.Test;
1615
import reactor.core.publisher.Flux;
1716
import reactor.core.publisher.Mono;
@@ -21,40 +20,41 @@
2120

2221
public class WebSocketTransportIntegrationTest {
2322

24-
25-
@Test
26-
public void sendStreamOfDataWithExternalHttpServerTest() {
27-
ServerTransport.ConnectionAcceptor acceptor =
28-
RSocketFactory.receive()
29-
.acceptor((setupPayload, sendingRSocket) -> {
30-
return Mono.just(new AbstractRSocket() {
31-
@Override
32-
public Flux<Payload> requestStream(Payload payload) {
33-
return Flux.range(0, 10)
34-
.map(i -> DefaultPayload.create(String.valueOf(i)));
35-
}
36-
});
37-
})
38-
.toConnectionAcceptor();
39-
40-
DisposableServer server = HttpServer.create()
41-
.host("localhost")
42-
.route(router -> router.ws("/test",
43-
WebsocketRouteTransport.newHandler(
44-
acceptor)))
45-
.bindNow();
46-
47-
RSocket rsocket = RSocketFactory.connect()
48-
.transport(WebsocketClientTransport.create(
49-
URI.create("ws://" + server.host() + ":" + server.port() + "/test")
50-
))
51-
.start()
52-
.block();
53-
54-
StepVerifier.create(rsocket.requestStream(EmptyPayload.INSTANCE))
55-
.expectSubscription()
56-
.expectNextCount(10)
57-
.expectComplete()
58-
.verify(Duration.ofMillis(1000));
59-
}
23+
@Test
24+
public void sendStreamOfDataWithExternalHttpServerTest() {
25+
ServerTransport.ConnectionAcceptor acceptor =
26+
RSocketFactory.receive()
27+
.acceptor(
28+
(setupPayload, sendingRSocket) -> {
29+
return Mono.just(
30+
new AbstractRSocket() {
31+
@Override
32+
public Flux<Payload> requestStream(Payload payload) {
33+
return Flux.range(0, 10)
34+
.map(i -> DefaultPayload.create(String.valueOf(i)));
35+
}
36+
});
37+
})
38+
.toConnectionAcceptor();
39+
40+
DisposableServer server =
41+
HttpServer.create()
42+
.host("localhost")
43+
.route(router -> router.ws("/test", WebsocketRouteTransport.newHandler(acceptor)))
44+
.bindNow();
45+
46+
RSocket rsocket =
47+
RSocketFactory.connect()
48+
.transport(
49+
WebsocketClientTransport.create(
50+
URI.create("ws://" + server.host() + ":" + server.port() + "/test")))
51+
.start()
52+
.block();
53+
54+
StepVerifier.create(rsocket.requestStream(EmptyPayload.INSTANCE))
55+
.expectSubscription()
56+
.expectNextCount(10)
57+
.expectComplete()
58+
.verify(Duration.ofMillis(1000));
59+
}
6060
}

0 commit comments

Comments
 (0)