Skip to content

Commit d3feccb

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Add Disposable Observer for Maybe, Completable & Single (#4504)
1 parent 1414ddb commit d3feccb

File tree

4 files changed

+155
-1
lines changed

4 files changed

+155
-1
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.observers;
15+
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import io.reactivex.CompletableObserver;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.internal.disposables.DisposableHelper;
21+
22+
/**
23+
* An abstract {@link CompletableObserver} that allows asynchronous cancellation by implementing Disposable.
24+
*/
25+
public abstract class DisposableCompletableObserver implements CompletableObserver, Disposable {
26+
final AtomicReference<Disposable> s = new AtomicReference<Disposable>();
27+
28+
@Override
29+
public final void onSubscribe(Disposable s) {
30+
if (DisposableHelper.setOnce(this.s, s)) {
31+
onStart();
32+
}
33+
}
34+
35+
/**
36+
* Called once the single upstream Disposable is set via onSubscribe.
37+
*/
38+
protected void onStart() {
39+
}
40+
41+
@Override
42+
public final boolean isDisposed() {
43+
return s.get() == DisposableHelper.DISPOSED;
44+
}
45+
46+
@Override
47+
public final void dispose() {
48+
DisposableHelper.dispose(s);
49+
}
50+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.observers;
15+
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import io.reactivex.MaybeObserver;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.internal.disposables.DisposableHelper;
21+
22+
/**
23+
* An abstract {@link MaybeObserver} that allows asynchronous cancellation by implementing Disposable.
24+
*
25+
* @param <T> the received value type
26+
*/
27+
public abstract class DisposableMaybeObserver<T> implements MaybeObserver<T>, Disposable {
28+
final AtomicReference<Disposable> s = new AtomicReference<Disposable>();
29+
30+
@Override
31+
public final void onSubscribe(Disposable s) {
32+
if (DisposableHelper.setOnce(this.s, s)) {
33+
onStart();
34+
}
35+
}
36+
37+
/**
38+
* Called once the single upstream Disposable is set via onSubscribe.
39+
*/
40+
protected void onStart() {
41+
}
42+
43+
@Override
44+
public final boolean isDisposed() {
45+
return s.get() == DisposableHelper.DISPOSED;
46+
}
47+
48+
@Override
49+
public final void dispose() {
50+
DisposableHelper.dispose(s);
51+
}
52+
}

src/main/java/io/reactivex/observers/DisposableObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.reactivex.internal.disposables.*;
2121

2222
/**
23-
* An abstract Observer that allows asynchronous cancellation by implementing Disposable.
23+
* An abstract {@link Observer} that allows asynchronous cancellation by implementing Disposable.
2424
*
2525
* @param <T> the received value type
2626
*/
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.observers;
15+
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import io.reactivex.SingleObserver;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.internal.disposables.DisposableHelper;
21+
22+
/**
23+
* An abstract {@link SingleObserver} that allows asynchronous cancellation by implementing Disposable.
24+
*
25+
* @param <T> the received value type
26+
*/
27+
public abstract class DisposableSingleObserver<T> implements SingleObserver<T>, Disposable {
28+
final AtomicReference<Disposable> s = new AtomicReference<Disposable>();
29+
30+
@Override
31+
public final void onSubscribe(Disposable s) {
32+
if (DisposableHelper.setOnce(this.s, s)) {
33+
onStart();
34+
}
35+
}
36+
37+
/**
38+
* Called once the single upstream Disposable is set via onSubscribe.
39+
*/
40+
protected void onStart() {
41+
}
42+
43+
@Override
44+
public final boolean isDisposed() {
45+
return s.get() == DisposableHelper.DISPOSED;
46+
}
47+
48+
@Override
49+
public final void dispose() {
50+
DisposableHelper.dispose(s);
51+
}
52+
}

0 commit comments

Comments
 (0)