Skip to content

Commit ef6cb56

Browse files
Merge pull request #539 from rsocket/reactor-netty-bugs-workaround
Workaround reactor-netty issues in setup rejection verification code
2 parents dcc9f8d + 02b9045 commit ef6cb56

File tree

13 files changed

+162
-170
lines changed

13 files changed

+162
-170
lines changed

rsocket-core/src/main/java/io/rsocket/internal/SwitchTransformFlux.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,11 @@
1616

1717
package io.rsocket.internal;
1818

19+
import io.netty.util.ReferenceCountUtil;
1920
import java.util.Objects;
2021
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2122
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2223
import java.util.function.BiFunction;
23-
24-
import io.netty.util.ReferenceCountUtil;
2524
import org.reactivestreams.Publisher;
2625
import org.reactivestreams.Subscription;
2726
import reactor.core.CoreSubscriber;
@@ -36,7 +35,7 @@ public final class SwitchTransformFlux<T, R> extends Flux<R> {
3635
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
3736

3837
public SwitchTransformFlux(
39-
Publisher<? extends T> source, BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
38+
Publisher<? extends T> source, BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
4039
this.source = Objects.requireNonNull(source, "source");
4140
this.transformer = Objects.requireNonNull(transformer, "transformer");
4241
}
@@ -60,14 +59,14 @@ static final class SwitchTransformMain<T, R> implements CoreSubscriber<T>, Scann
6059
Subscription s;
6160

6261
volatile int once;
62+
6363
@SuppressWarnings("rawtypes")
6464
static final AtomicIntegerFieldUpdater<SwitchTransformMain> ONCE =
65-
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformMain.class, "once");
65+
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformMain.class, "once");
6666

6767
SwitchTransformMain(
68-
CoreSubscriber<? super R> actual,
69-
BiFunction<T, Flux<T>, Publisher<? extends R>> transformer
70-
) {
68+
CoreSubscriber<? super R> actual,
69+
BiFunction<T, Flux<T>, Publisher<? extends R>> transformer) {
7170
this.actual = actual;
7271
this.transformer = transformer;
7372
this.inner = new SwitchTransformInner<>(this);
@@ -100,7 +99,8 @@ public void onNext(T t) {
10099
try {
101100
inner.first = t;
102101
Publisher<? extends R> result =
103-
Objects.requireNonNull(transformer.apply(t, inner), "The transformer returned a null value");
102+
Objects.requireNonNull(
103+
transformer.apply(t, inner), "The transformer returned a null value");
104104
result.subscribe(actual);
105105
return;
106106
} catch (Throwable e) {
@@ -151,25 +151,28 @@ void cancel() {
151151
}
152152
}
153153

154-
static final class SwitchTransformInner<V> extends Flux<V>
155-
implements Scannable, Subscription {
154+
static final class SwitchTransformInner<V> extends Flux<V> implements Scannable, Subscription {
156155

157156
final SwitchTransformMain<V, ?> parent;
158157

159158
volatile CoreSubscriber<? super V> actual;
159+
160160
@SuppressWarnings("rawtypes")
161161
static final AtomicReferenceFieldUpdater<SwitchTransformInner, CoreSubscriber> ACTUAL =
162-
AtomicReferenceFieldUpdater.newUpdater(SwitchTransformInner.class, CoreSubscriber.class, "actual");
162+
AtomicReferenceFieldUpdater.newUpdater(
163+
SwitchTransformInner.class, CoreSubscriber.class, "actual");
163164

164165
volatile V first;
166+
165167
@SuppressWarnings("rawtypes")
166168
static final AtomicReferenceFieldUpdater<SwitchTransformInner, Object> FIRST =
167-
AtomicReferenceFieldUpdater.newUpdater(SwitchTransformInner.class, Object.class, "first");
169+
AtomicReferenceFieldUpdater.newUpdater(SwitchTransformInner.class, Object.class, "first");
168170

169171
volatile int once;
172+
170173
@SuppressWarnings("rawtypes")
171174
static final AtomicIntegerFieldUpdater<SwitchTransformInner> ONCE =
172-
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformInner.class, "once");
175+
AtomicIntegerFieldUpdater.newUpdater(SwitchTransformInner.class, "once");
173176

174177
SwitchTransformInner(SwitchTransformMain<V, ?> parent) {
175178
this.parent = parent;
@@ -204,8 +207,7 @@ public void subscribe(CoreSubscriber<? super V> actual) {
204207
if (once == 0 && ONCE.compareAndSet(this, 0, 1)) {
205208
ACTUAL.lazySet(this, actual);
206209
actual.onSubscribe(this);
207-
}
208-
else {
210+
} else {
209211
actual.onError(new IllegalStateException("SwitchTransform allows only one Subscriber"));
210212
}
211213
}
@@ -246,4 +248,4 @@ public CoreSubscriber<? super V> actual() {
246248
return actual;
247249
}
248250
}
249-
}
251+
}
Lines changed: 92 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,109 +1,108 @@
11
package io.rsocket.internal;
22

33
import java.time.Duration;
4-
54
import org.junit.Test;
65
import reactor.core.publisher.Flux;
76
import reactor.test.StepVerifier;
87
import reactor.test.publisher.TestPublisher;
98

109
public class SwitchTransformFluxTest {
1110

12-
@Test
13-
public void backpressureTest() {
14-
TestPublisher<Long> publisher = TestPublisher.createCold();
11+
@Test
12+
public void backpressureTest() {
13+
TestPublisher<Long> publisher = TestPublisher.createCold();
1514

16-
Flux<String> switchTransformed = publisher
15+
Flux<String> switchTransformed =
16+
publisher
1717
.flux()
18-
.transform(flux -> new SwitchTransformFlux<>(
19-
flux,
20-
(first, innerFlux) -> innerFlux.map(String::valueOf)
21-
));
22-
23-
publisher.next(1L);
24-
25-
StepVerifier.create(switchTransformed, 0)
26-
.thenRequest(1)
27-
.expectNext("1")
28-
.thenRequest(1)
29-
.then(() -> publisher.next(2L))
30-
.expectNext("2")
31-
.then(publisher::complete)
32-
.expectComplete()
33-
.verify(Duration.ofSeconds(10));
34-
35-
publisher.assertWasRequested();
36-
publisher.assertNoRequestOverflow();
37-
}
38-
39-
@Test
40-
public void shouldErrorOnOverflowTest() {
41-
TestPublisher<Long> publisher = TestPublisher.createCold();
42-
43-
Flux<String> switchTransformed = publisher
44-
.flux()
45-
.transform(flux -> new SwitchTransformFlux<>(
46-
flux,
47-
(first, innerFlux) -> innerFlux.map(String::valueOf)
48-
));
49-
50-
publisher.next(1L);
51-
52-
StepVerifier.create(switchTransformed, 0)
53-
.thenRequest(1)
54-
.expectNext("1")
55-
.then(() -> publisher.next(2L))
56-
.expectError()
57-
.verify(Duration.ofSeconds(10));
58-
59-
publisher.assertWasRequested();
60-
publisher.assertNoRequestOverflow();
61-
}
62-
63-
@Test
64-
public void shouldPropagateonCompleteCorrectly() {
65-
Flux<String> switchTransformed = Flux.empty()
66-
.transform(flux -> new SwitchTransformFlux<>(
67-
flux,
68-
(first, innerFlux) -> innerFlux.map(String::valueOf)
69-
));
70-
71-
StepVerifier.create(switchTransformed)
72-
.expectComplete()
73-
.verify(Duration.ofSeconds(10));
74-
}
75-
76-
@Test
77-
public void shouldPropagateErrorCorrectly() {
78-
Flux<String> switchTransformed = Flux.error(new RuntimeException("hello"))
79-
.transform(flux -> new SwitchTransformFlux<>(
80-
flux,
81-
(first, innerFlux) -> innerFlux.map(String::valueOf)
82-
));
83-
84-
StepVerifier.create(switchTransformed)
85-
.expectErrorMessage("hello")
86-
.verify(Duration.ofSeconds(10));
87-
}
88-
89-
@Test
90-
public void shouldBeAbleToBeCancelledProperly() {
91-
TestPublisher<Integer> publisher = TestPublisher.createCold();
92-
Flux<String> switchTransformed = publisher
18+
.transform(
19+
flux ->
20+
new SwitchTransformFlux<>(
21+
flux, (first, innerFlux) -> innerFlux.map(String::valueOf)));
22+
23+
publisher.next(1L);
24+
25+
StepVerifier.create(switchTransformed, 0)
26+
.thenRequest(1)
27+
.expectNext("1")
28+
.thenRequest(1)
29+
.then(() -> publisher.next(2L))
30+
.expectNext("2")
31+
.then(publisher::complete)
32+
.expectComplete()
33+
.verify(Duration.ofSeconds(10));
34+
35+
publisher.assertWasRequested();
36+
publisher.assertNoRequestOverflow();
37+
}
38+
39+
@Test
40+
public void shouldErrorOnOverflowTest() {
41+
TestPublisher<Long> publisher = TestPublisher.createCold();
42+
43+
Flux<String> switchTransformed =
44+
publisher
9345
.flux()
94-
.transform(flux -> new SwitchTransformFlux<>(
95-
flux,
96-
(first, innerFlux) -> innerFlux.map(String::valueOf)
97-
));
98-
99-
publisher.emit(1, 2, 3, 4, 5);
46+
.transform(
47+
flux ->
48+
new SwitchTransformFlux<>(
49+
flux, (first, innerFlux) -> innerFlux.map(String::valueOf)));
50+
51+
publisher.next(1L);
52+
53+
StepVerifier.create(switchTransformed, 0)
54+
.thenRequest(1)
55+
.expectNext("1")
56+
.then(() -> publisher.next(2L))
57+
.expectError()
58+
.verify(Duration.ofSeconds(10));
59+
60+
publisher.assertWasRequested();
61+
publisher.assertNoRequestOverflow();
62+
}
63+
64+
@Test
65+
public void shouldPropagateonCompleteCorrectly() {
66+
Flux<String> switchTransformed =
67+
Flux.empty()
68+
.transform(
69+
flux ->
70+
new SwitchTransformFlux<>(
71+
flux, (first, innerFlux) -> innerFlux.map(String::valueOf)));
72+
73+
StepVerifier.create(switchTransformed).expectComplete().verify(Duration.ofSeconds(10));
74+
}
75+
76+
@Test
77+
public void shouldPropagateErrorCorrectly() {
78+
Flux<String> switchTransformed =
79+
Flux.error(new RuntimeException("hello"))
80+
.transform(
81+
flux ->
82+
new SwitchTransformFlux<>(
83+
flux, (first, innerFlux) -> innerFlux.map(String::valueOf)));
84+
85+
StepVerifier.create(switchTransformed)
86+
.expectErrorMessage("hello")
87+
.verify(Duration.ofSeconds(10));
88+
}
89+
90+
@Test
91+
public void shouldBeAbleToBeCancelledProperly() {
92+
TestPublisher<Integer> publisher = TestPublisher.createCold();
93+
Flux<String> switchTransformed =
94+
publisher
95+
.flux()
96+
.transform(
97+
flux ->
98+
new SwitchTransformFlux<>(
99+
flux, (first, innerFlux) -> innerFlux.map(String::valueOf)));
100100

101-
StepVerifier.create(switchTransformed, 0)
102-
.thenCancel()
103-
.verify(Duration.ofSeconds(10));
101+
publisher.emit(1, 2, 3, 4, 5);
104102

105-
publisher.assertCancelled();
106-
publisher.assertWasRequested();
103+
StepVerifier.create(switchTransformed, 0).thenCancel().verify(Duration.ofSeconds(10));
107104

108-
}
109-
}
105+
publisher.assertCancelled();
106+
publisher.assertWasRequested();
107+
}
108+
}

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818

1919
import io.rsocket.DuplexConnection;
2020
import io.rsocket.Frame;
21+
import java.util.Objects;
2122
import org.reactivestreams.Publisher;
2223
import reactor.core.publisher.Flux;
2324
import reactor.core.publisher.Mono;
2425
import reactor.netty.Connection;
2526

26-
import java.util.Objects;
27-
2827
/** An implementation of {@link DuplexConnection} that connects via TCP. */
2928
public final class TcpDuplexConnection implements DuplexConnection {
3029

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpUriHandler.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,7 @@ public Optional<ServerTransport> buildServer(URI uri) {
5353
return Optional.empty();
5454
}
5555

56-
return Optional.of(TcpServerTransport.create(
57-
TcpServer.create()
58-
.host(uri.getHost())
59-
.port(uri.getPort())));
56+
return Optional.of(
57+
TcpServerTransport.create(TcpServer.create().host(uri.getHost()).port(uri.getPort())));
6058
}
6159
}

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,9 @@ public Mono<Void> onClose() {
6767

6868
@Override
6969
public Flux<Frame> receive() {
70-
return connection.inbound().receive()
70+
return connection
71+
.inbound()
72+
.receive()
7173
.map(
7274
buf -> {
7375
CompositeByteBuf composite = connection.channel().alloc().compositeBuffer();
@@ -85,7 +87,9 @@ public Mono<Void> send(Publisher<Frame> frames) {
8587

8688
@Override
8789
public Mono<Void> sendOne(Frame frame) {
88-
return connection.outbound().sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE)))
90+
return connection
91+
.outbound()
92+
.sendObject(new BinaryWebSocketFrame(frame.content().skipBytes(FRAME_LENGTH_SIZE)))
8993
.then();
9094
}
9195
}

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/client/TcpClientTransport.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ public static TcpClientTransport create(TcpClient client) {
9393
@Override
9494
public Mono<DuplexConnection> connect() {
9595
return client
96-
.doOnConnected(c -> c.addHandlerLast(new RSocketLengthCodec()))
97-
.connect()
98-
.map(TcpDuplexConnection::new);
96+
.doOnConnected(c -> c.addHandlerLast(new RSocketLengthCodec()))
97+
.connect()
98+
.map(TcpDuplexConnection::new);
9999
}
100100
}

0 commit comments

Comments
 (0)