Skip to content

Commit 2e5d0c3

Browse files
committed
provides tests and some minor fixes to UnicastMonoProcessor
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 809db47 commit 2e5d0c3

File tree

2 files changed

+652
-8
lines changed

2 files changed

+652
-8
lines changed

rsocket-core/src/main/java/io/rsocket/internal/UnicastMonoProcessor.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ public static <T> UnicastMonoProcessor<T> create() {
4545
static final AtomicIntegerFieldUpdater<UnicastMonoProcessor> ONCE =
4646
AtomicIntegerFieldUpdater.newUpdater(UnicastMonoProcessor.class, "once");
4747

48-
Publisher<? extends O> source;
49-
5048
Throwable error;
5149
volatile boolean terminated;
5250
O value;
@@ -67,7 +65,6 @@ public final void cancel() {
6765
return;
6866
}
6967

70-
source = null;
7168
if (s != null) {
7269
s.cancel();
7370
}
@@ -84,7 +81,6 @@ public void dispose() {
8481
final CancellationException e = new CancellationException("Disposed");
8582
error = e;
8683
value = null;
87-
source = null;
8884
terminated = true;
8985
if (s != null) {
9086
s.cancel();
@@ -159,7 +155,6 @@ public final void onError(Throwable cause) {
159155

160156
error = cause;
161157
value = null;
162-
source = null;
163158
terminated = true;
164159

165160
final CoreSubscriber<? super O> a = actual;
@@ -182,8 +177,6 @@ public final void onNext(@Nullable O value) {
182177
}
183178

184179
this.value = value;
185-
final Publisher<? extends O> parent = source;
186-
source = null;
187180
terminated = true;
188181

189182
final CoreSubscriber<? super O> a = actual;
@@ -193,7 +186,7 @@ public final void onNext(@Nullable O value) {
193186
a.onComplete();
194187
}
195188
} else {
196-
if (s != null && !(parent instanceof Mono)) {
189+
if (s != null) {
197190
s.cancel();
198191
}
199192

0 commit comments

Comments
 (0)