Skip to content

Commit db47113

Browse files
Ryland Degnanrobertroeser
Ryland Degnan
authored andcommitted
Use ReferenceCountUtil.safeRelease instead of release (#505)
1 parent 0c7f15d commit db47113

File tree

2 files changed

+9
-8
lines changed

2 files changed

+9
-8
lines changed

rsocket-core/src/main/java/io/rsocket/internal/SwitchTransform.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import reactor.core.CoreSubscriber;
2626
import reactor.core.publisher.Flux;
2727
import reactor.core.publisher.Operators;
28+
import reactor.core.publisher.UnicastProcessor;
2829

2930
public final class SwitchTransform<T, R> extends Flux<R> {
3031

@@ -39,7 +40,7 @@ public SwitchTransform(
3940

4041
@Override
4142
public void subscribe(CoreSubscriber<? super R> actual) {
42-
Flux.from(source).subscribe(new SwitchTransformSubscriber<>(actual, transformer));
43+
source.subscribe(new SwitchTransformSubscriber<>(actual, transformer));
4344
}
4445

4546
static final class SwitchTransformSubscriber<T, R>
@@ -50,7 +51,7 @@ static final class SwitchTransformSubscriber<T, R>
5051

5152
final CoreSubscriber<? super R> actual;
5253
final BiFunction<T, Flux<T>, Publisher<? extends R>> transformer;
53-
final UnboundedProcessor<T> processor = new UnboundedProcessor<>();
54+
final UnicastProcessor<T> processor = UnicastProcessor.create();
5455
Subscription s;
5556
volatile int once;
5657

@@ -76,10 +77,10 @@ public void onNext(T t) {
7677
Publisher<? extends R> result =
7778
Objects.requireNonNull(
7879
transformer.apply(t, processor), "The transformer returned a null value");
79-
Flux.from(result).subscribe(actual);
80+
result.subscribe(actual);
8081
} catch (Throwable e) {
8182
onError(Operators.onOperatorError(s, e, t, actual.currentContext()));
82-
ReferenceCountUtil.release(t);
83+
ReferenceCountUtil.safeRelease(t);
8384
return;
8485
}
8586
}

rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber<? super T> a, Queue
158158
while (!q.isEmpty()) {
159159
T t = q.poll();
160160
if (t != null) {
161-
ReferenceCountUtil.release(t);
161+
ReferenceCountUtil.safeRelease(t);
162162
}
163163
}
164164
actual = null;
@@ -202,15 +202,15 @@ public Context currentContext() {
202202
public void onNext(T t) {
203203
if (done || cancelled) {
204204
Operators.onNextDropped(t, currentContext());
205-
ReferenceCountUtil.release(t);
205+
ReferenceCountUtil.safeRelease(t);
206206
return;
207207
}
208208

209209
if (!queue.offer(t)) {
210210
Throwable ex =
211211
Operators.onOperatorError(null, Exceptions.failWithOverflow(), t, currentContext());
212212
onError(Operators.onOperatorError(null, ex, t, currentContext()));
213-
ReferenceCountUtil.release(t);
213+
ReferenceCountUtil.safeRelease(t);
214214
return;
215215
}
216216
drain();
@@ -301,7 +301,7 @@ public void clear() {
301301
while (!queue.isEmpty()) {
302302
T t = queue.poll();
303303
if (t != null) {
304-
ReferenceCountUtil.release(t);
304+
ReferenceCountUtil.safeRelease(t);
305305
}
306306
}
307307
}

0 commit comments

Comments
 (0)