Skip to content

Commit f0d1d34

Browse files
JakeWhartonakarnokd
authored andcommitted
Implement ObservableHide for also masking the Disposable. (#4372)
1 parent 23461ee commit f0d1d34

File tree

2 files changed

+81
-2
lines changed

2 files changed

+81
-2
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7797,8 +7797,7 @@ public final <TRight, TLeftEnd, TRightEnd, R> Observable<R> groupJoin(
77977797
*/
77987798
@SchedulerSupport(SchedulerSupport.NONE)
77997799
public final Observable<T> hide() {
7800-
// TODO hide the Disposable as well
7801-
return new ObservableFromUnsafeSource<T>(this);
7800+
return new ObservableHide<T>(this);
78027801
}
78037802

78047803
/**
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.observable;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.internal.disposables.DisposableHelper;
19+
20+
/**
21+
* Hides the identity of the wrapped ObservableSource and its Disposable.
22+
* @param <T> the value type
23+
*
24+
* @since 2.0
25+
*/
26+
public class ObservableHide<T> extends AbstractObservableWithUpstream<T, T> {
27+
28+
public ObservableHide(ObservableSource<T> source) {
29+
super(source);
30+
}
31+
32+
@Override
33+
protected void subscribeActual(Observer<? super T> o) {
34+
source.subscribe(new HideDisposable<T>(o));
35+
}
36+
37+
static final class HideDisposable<T> implements Observer<T>, Disposable {
38+
39+
final Observer<? super T> actual;
40+
41+
Disposable d;
42+
43+
public HideDisposable(Observer<? super T> actual) {
44+
this.actual = actual;
45+
}
46+
47+
@Override
48+
public void dispose() {
49+
d.dispose();
50+
}
51+
52+
@Override
53+
public boolean isDisposed() {
54+
return d.isDisposed();
55+
}
56+
57+
@Override
58+
public void onSubscribe(Disposable d) {
59+
if (DisposableHelper.validate(this.d, d)) {
60+
this.d = d;
61+
actual.onSubscribe(this);
62+
}
63+
}
64+
65+
@Override
66+
public void onNext(T t) {
67+
actual.onNext(t);
68+
}
69+
70+
@Override
71+
public void onError(Throwable t) {
72+
actual.onError(t);
73+
}
74+
75+
@Override
76+
public void onComplete() {
77+
actual.onComplete();
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)