Skip to content

Commit 0b6ad3e

Browse files
committed
Replace use of deprecated Reactor Processor types
Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent 3fdc78f commit 0b6ad3e

File tree

26 files changed

+510
-335
lines changed

26 files changed

+510
-335
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
import org.slf4j.LoggerFactory;
4444
import reactor.core.publisher.Flux;
4545
import reactor.core.publisher.Mono;
46-
import reactor.core.publisher.MonoProcessor;
46+
import reactor.core.publisher.Sinks;
4747
import reactor.util.annotation.Nullable;
4848

4949
/**
@@ -65,7 +65,7 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
6565

6666
@Nullable private final RequesterLeaseTracker requesterLeaseTracker;
6767
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
68-
private final MonoProcessor<Void> onClose;
68+
private final Sinks.Empty<Void> onClose;
6969

7070
RSocketRequester(
7171
DuplexConnection connection,
@@ -89,7 +89,7 @@ class RSocketRequester extends RequesterResponderSupport implements RSocket {
8989
requestInterceptorFunction);
9090

9191
this.requesterLeaseTracker = requesterLeaseTracker;
92-
this.onClose = MonoProcessor.create();
92+
this.onClose = Sinks.empty();
9393

9494
// DO NOT Change the order here. The Send processor must be subscribed to before receiving
9595
connection.onClose().subscribe(null, this::tryTerminateOnConnectionError, this::tryShutdown);
@@ -196,7 +196,7 @@ public boolean isDisposed() {
196196

197197
@Override
198198
public Mono<Void> onClose() {
199-
return onClose;
199+
return onClose.asMono();
200200
}
201201

202202
private void handleIncomingFrames(ByteBuf frame) {
@@ -356,9 +356,9 @@ private void terminate(Throwable e) {
356356
}
357357

358358
if (e == CLOSED_CHANNEL_EXCEPTION) {
359-
onClose.onComplete();
359+
onClose.tryEmitEmpty();
360360
} else {
361-
onClose.onError(e);
361+
onClose.tryEmitError(e);
362362
}
363363
}
364364
}

rsocket-core/src/main/java/io/rsocket/internal/BaseDuplexConnection.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,33 @@
1+
/*
2+
* Copyright 2015-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.rsocket.internal;
217

318
import io.netty.buffer.ByteBuf;
419
import io.rsocket.DuplexConnection;
20+
import reactor.core.Scannable;
521
import reactor.core.publisher.Mono;
6-
import reactor.core.publisher.MonoProcessor;
22+
import reactor.core.publisher.Sinks;
723

824
public abstract class BaseDuplexConnection implements DuplexConnection {
9-
protected MonoProcessor<Void> onClose = MonoProcessor.create();
25+
protected Sinks.Empty<Void> onClose = Sinks.empty();
1026

1127
protected UnboundedProcessor sender = new UnboundedProcessor();
1228

1329
public BaseDuplexConnection() {
14-
onClose.doFinally(s -> doOnClose()).subscribe();
30+
onClose().doFinally(s -> doOnClose()).subscribe();
1531
}
1632

1733
@Override
@@ -27,16 +43,17 @@ public void sendFrame(int streamId, ByteBuf frame) {
2743

2844
@Override
2945
public final Mono<Void> onClose() {
30-
return onClose;
46+
return onClose.asMono();
3147
}
3248

3349
@Override
3450
public final void dispose() {
35-
onClose.onComplete();
51+
onClose.tryEmitEmpty();
3652
}
3753

3854
@Override
55+
@SuppressWarnings("ConstantConditions")
3956
public final boolean isDisposed() {
40-
return onClose.isDisposed();
57+
return onClose.scan(Scannable.Attr.TERMINATED) || onClose.scan(Scannable.Attr.CANCELLED);
4158
}
4259
}

rsocket-core/src/main/java/io/rsocket/resume/InMemoryResumableFramesStore.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2019 the original author or authors.
2+
* Copyright 2015-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,8 +28,8 @@
2828
import reactor.core.CoreSubscriber;
2929
import reactor.core.publisher.Flux;
3030
import reactor.core.publisher.Mono;
31-
import reactor.core.publisher.MonoProcessor;
3231
import reactor.core.publisher.Operators;
32+
import reactor.core.publisher.Sinks;
3333

3434
/**
3535
* writes - n (where n is frequent, primary operation) reads - m (where m == KeepAliveFrequency)
@@ -40,7 +40,7 @@ public class InMemoryResumableFramesStore extends Flux<ByteBuf>
4040

4141
private static final Logger logger = LoggerFactory.getLogger(InMemoryResumableFramesStore.class);
4242

43-
final MonoProcessor<Void> disposed = MonoProcessor.create();
43+
final Sinks.Empty<Void> disposed = Sinks.empty();
4444
final ArrayList<ByteBuf> cachedFrames;
4545
final String tag;
4646
final int cacheLimit;
@@ -189,7 +189,7 @@ void resumeImplied() {
189189

190190
@Override
191191
public Mono<Void> onClose() {
192-
return disposed;
192+
return disposed.asMono();
193193
}
194194

195195
@Override
@@ -205,7 +205,7 @@ public void dispose() {
205205
}
206206
cachedFrames.clear();
207207
}
208-
disposed.onComplete();
208+
disposed.tryEmitEmpty();
209209
}
210210
}
211211

rsocket-core/src/main/java/io/rsocket/resume/ResumableDuplexConnection.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2020 the original author or authors.
2+
* Copyright 2015-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,9 +30,9 @@
3030
import org.slf4j.LoggerFactory;
3131
import reactor.core.CoreSubscriber;
3232
import reactor.core.Disposable;
33+
import reactor.core.Scannable;
3334
import reactor.core.publisher.Flux;
3435
import reactor.core.publisher.Mono;
35-
import reactor.core.publisher.MonoProcessor;
3636
import reactor.core.publisher.Operators;
3737
import reactor.core.publisher.Sinks;
3838

@@ -46,7 +46,7 @@ public class ResumableDuplexConnection extends Flux<ByteBuf>
4646

4747
final UnboundedProcessor savableFramesSender;
4848
final Disposable framesSaverDisposable;
49-
final MonoProcessor<Void> onClose;
49+
final Sinks.Empty<Void> onClose;
5050
final SocketAddress remoteAddress;
5151
final Sinks.Many<Integer> onConnectionClosedSink;
5252

@@ -72,7 +72,7 @@ public ResumableDuplexConnection(
7272
this.resumableFramesStore = resumableFramesStore;
7373
this.savableFramesSender = new UnboundedProcessor();
7474
this.framesSaverDisposable = resumableFramesStore.saveFrames(savableFramesSender).subscribe();
75-
this.onClose = MonoProcessor.create();
75+
this.onClose = Sinks.empty();
7676
this.remoteAddress = initialConnection.remoteAddress();
7777

7878
ACTIVE_CONNECTION.lazySet(this, initialConnection);
@@ -164,17 +164,17 @@ public void sendErrorAndClose(RSocketErrorException rSocketErrorException) {
164164
framesSaverDisposable.dispose();
165165
savableFramesSender.dispose();
166166
onConnectionClosedSink.tryEmitComplete();
167-
onClose.onError(t);
167+
onClose.tryEmitError(t);
168168
},
169169
() -> {
170170
framesSaverDisposable.dispose();
171171
savableFramesSender.dispose();
172172
onConnectionClosedSink.tryEmitComplete();
173173
final Throwable cause = rSocketErrorException.getCause();
174174
if (cause == null) {
175-
onClose.onComplete();
175+
onClose.tryEmitEmpty();
176176
} else {
177-
onClose.onError(cause);
177+
onClose.tryEmitError(cause);
178178
}
179179
});
180180
}
@@ -191,7 +191,7 @@ public ByteBufAllocator alloc() {
191191

192192
@Override
193193
public Mono<Void> onClose() {
194-
return onClose;
194+
return onClose.asMono();
195195
}
196196

197197
@Override
@@ -210,12 +210,13 @@ public void dispose() {
210210
activeReceivingSubscriber.dispose();
211211
savableFramesSender.dispose();
212212
onConnectionClosedSink.tryEmitComplete();
213-
onClose.onComplete();
213+
onClose.tryEmitEmpty();
214214
}
215215

216216
@Override
217+
@SuppressWarnings("ConstantConditions")
217218
public boolean isDisposed() {
218-
return onClose.isDisposed();
219+
return onClose.scan(Scannable.Attr.TERMINATED) || onClose.scan(Scannable.Attr.CANCELLED);
219220
}
220221

221222
@Override

rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.rsocket.core;
22
/*
3-
* Copyright 2015-2020 the original author or authors.
3+
* Copyright 2015-2021 the original author or authors.
44
*
55
* Licensed under the Apache License, Version 2.0 (the "License");
66
* you may not use this file except in compliance with the License.
@@ -57,8 +57,8 @@
5757
import reactor.core.publisher.Flux;
5858
import reactor.core.publisher.Hooks;
5959
import reactor.core.publisher.Mono;
60-
import reactor.core.publisher.MonoProcessor;
6160
import reactor.core.publisher.SignalType;
61+
import reactor.core.publisher.Sinks;
6262
import reactor.test.StepVerifier;
6363
import reactor.test.publisher.TestPublisher;
6464
import reactor.test.util.RaceTestUtils;
@@ -516,16 +516,17 @@ public static class ClientSocketRule extends AbstractSocketRule<RSocketRequester
516516

517517
protected RSocketClient client;
518518
protected Runnable delayer;
519-
protected MonoProcessor<RSocket> producer;
519+
protected Sinks.One<RSocket> producer;
520520

521521
@Override
522522
protected void init() {
523523
super.init();
524-
delayer = () -> producer.onNext(socket);
525-
producer = MonoProcessor.create();
524+
delayer = () -> producer.tryEmitValue(socket);
525+
producer = Sinks.one();
526526
client =
527527
new DefaultRSocketClient(
528528
producer
529+
.asMono()
529530
.doOnCancel(() -> socket.dispose())
530531
.doOnDiscard(Disposable.class, Disposable::dispose));
531532
}

0 commit comments

Comments
 (0)