Skip to content

2.x: disposable unit tests + fix to RefCountDisposable behavior #3334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 9, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,149 changes: 973 additions & 176 deletions src/main/java/io/reactivex/Observable.java

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions src/main/java/io/reactivex/annotations/BackpressureKind.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

/**
* Enumeration for various kinds of backpressure support.
*/
public enum BackpressureKind {
/**
* The backpressure-related requests pass through this operator without change
*/
PASS_THROUGH,
/**
* The operator fully supports backpressure and may coordinate downstream requests
* with upstream requests through batching, arbitration or by other means.
*/
FULL,
/**
* The operator performs special backpressure management; see the associated javadoc.
*/
SPECIAL,
/**
* The operator requests Long.MAX_VALUE from upstream but respects the backpressure
* of the downstream.
*/
UNBOUNDED_IN,
/**
* The operator will emit a MissingBackpressureException if the downstream didn't request
* enough or in time.
*/
ERROR,
/**
* The operator ignores all kinds of backpressure and may overflow the downstream.
*/
NONE
}
30 changes: 30 additions & 0 deletions src/main/java/io/reactivex/annotations/BackpressureSupport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

import java.lang.annotation.*;

/**
* Indicates the backpressure support kind of the associated operator or class.
*/
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface BackpressureSupport {
/**
* The backpressure supported by this method or class.
* @return backpressure supported by this method or class.
*/
BackpressureKind value();
}
22 changes: 22 additions & 0 deletions src/main/java/io/reactivex/annotations/Beta.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

/**
* Indicates the feature is in beta state: it will be most likely stay but
* the signature may change between versions without warning.
*/
public @interface Beta {

}
22 changes: 22 additions & 0 deletions src/main/java/io/reactivex/annotations/Experimental.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

/**
* Indicates the feature is in experimental state: its existence, signature or behavior
* might change without warning from one release to the next.
*/
public @interface Experimental {

}
48 changes: 48 additions & 0 deletions src/main/java/io/reactivex/annotations/SchedulerKind.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

/**
* Indicates what scheduler the method or class uses by default
*/
public enum SchedulerKind {
/**
* The operator/class doesn't use schedulers.
*/
NONE,
/**
* The operator/class runs on the computation scheduler or takes timing information from it.
*/
COMPUTATION,
/**
* The operator/class runs on the io scheduler or takes timing information from it.
*/
IO,
/**
* The operator/class runs on the new thread scheduler or takes timing information from it.
*/
NEW_THREAD,
/**
* The operator/class runs on the trampoline scheduler or takes timing information from it.
*/
TRAMPOLINE,
/**
* The operator/class runs on the single scheduler or takes timing information from it.
*/
SINGLE,
/**
* The operator/class requires a scheduler to be manually specified.
*/
CUSTOM
}
30 changes: 30 additions & 0 deletions src/main/java/io/reactivex/annotations/SchedulerSupport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.annotations;

import java.lang.annotation.*;

/**
* Indicates what kind of scheduler the class or method uses.
*/
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface SchedulerSupport {
/**
* The kind of scheduler the class or method uses.
* @return the kind of scheduler the class or method uses
*/
SchedulerKind value();
}
17 changes: 15 additions & 2 deletions src/main/java/io/reactivex/disposables/RefCountDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,27 @@ public final class RefCountDisposable implements Disposable {
volatile int count;
static final AtomicIntegerFieldUpdater<RefCountDisposable> COUNT =
AtomicIntegerFieldUpdater.newUpdater(RefCountDisposable.class, "count");


volatile int once;
static final AtomicIntegerFieldUpdater<RefCountDisposable> ONCE =
AtomicIntegerFieldUpdater.newUpdater(RefCountDisposable.class, "once");

public RefCountDisposable(Disposable resource) {
Objects.requireNonNull(resource);
RESOURCE.lazySet(this, resource);
COUNT.lazySet(this, 1);
}

@Override
public void dispose() {
if (ONCE.compareAndSet(this, 0, 1)) {
if (COUNT.decrementAndGet(this) == 0) {
disposeActual();
}
}
}

void disposeActual() {
Disposable d = resource;
if (d != DISPOSED) {
d = RESOURCE.getAndSet(this, DISPOSED);
Expand All @@ -51,7 +64,7 @@ public Disposable get() {

void release() {
if (COUNT.decrementAndGet(this) == 0) {
dispose();
disposeActual();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import java.util.function.Consumer;

import io.reactivex.disposables.Disposable;
import io.reactivex.internal.util.OpenHashSet;
import io.reactivex.internal.util.*;

/**
* A set-based composite resource with custom disposer callback.
Expand Down Expand Up @@ -143,7 +143,7 @@ public void dispose() {
set = null;
}
if (s != null) {
s.forEach(disposer);
disposeAll(s);
}
}
}
Expand All @@ -163,8 +163,14 @@ public void clear() {
set = null;
}
if (s != null) {
s.forEach(disposer);
disposeAll(s);
}
}
}
void disposeAll(OpenHashSet<T> s) {
Throwable ex = s.forEachSuppress(disposer);
if (ex != null) {
Exceptions.propagate(ex);
}
}
}
30 changes: 30 additions & 0 deletions src/main/java/io/reactivex/internal/util/OpenHashSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Arrays;
import java.util.function.Consumer;

import io.reactivex.exceptions.CompositeException;

/**
* A simple open hash set with add, remove and clear capabilities only.
* <p>Doesn't support nor checks for {@code null}s.
Expand Down Expand Up @@ -191,6 +193,34 @@ public void forEach(Consumer<? super T> consumer) {
}
}
}

/**
* Loops through all values in the set and collects any exceptions from the consumer
* into a Throwable.
* @param consumer the consumer to call
* @return if not null, contains a CompositeException with all the suppressed exceptions
*/
public Throwable forEachSuppress(Consumer<? super T> consumer) {
CompositeException ex = null;
int count = 0;
for (T k : keys) {
if (k != null) {
try {
consumer.accept(k);
} catch (Throwable e) {
if (ex == null) {
ex = new CompositeException();
}
count++;
ex.addSuppressed(e);
}
}
}
if (count == 1) {
return ex.getSuppressed()[0];
}
return ex;
}

public boolean isEmpty() {
return size == 0;
Expand Down
Loading