Skip to content

1.x: distinctUntilChanged with direct value comparator - alternative #4034

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4804,6 +4804,29 @@ public final <U> Observable<T> distinctUntilChanged(Func1<? super T, ? extends U
return lift(new OperatorDistinctUntilChanged<T, U>(keySelector));
}

/**
* Returns an Observable that emits all items emitted by the source Observable that are distinct from their
* immediate predecessors when compared with each other via the provided comparator function.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/distinctUntilChanged.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code distinctUntilChanged} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param comparator the function that receives the previous item and the current item and is
* expected to return true if the two are equal, thus skipping the current value.
* @return an Observable that emits those items from the source Observable that are distinct from their
* immediate predecessors
* @see <a href="http://reactivex.io/documentation/operators/distinct.html">ReactiveX operators documentation: Distinct</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical
* with the release number)
*/
@Experimental
public final Observable<T> distinctUntilChanged(Func2<? super T, ? super T, Boolean> comparator) {
return lift(new OperatorDistinctUntilChanged<T, T>(comparator));
}

/**
* Modifies the source Observable so that it invokes an action when it calls {@code onCompleted}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,19 @@
import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.Exceptions;
import rx.functions.Func1;
import rx.functions.*;
import rx.internal.util.UtilityFunctions;

/**
* Returns an Observable that emits all sequentially distinct items emitted by the source.
* @param <T> the value type
* @param <U> the key type
*/
public final class OperatorDistinctUntilChanged<T, U> implements Operator<T, T> {
public final class OperatorDistinctUntilChanged<T, U> implements Operator<T, T>, Func2<U, U, Boolean> {
final Func1<? super T, ? extends U> keySelector;

final Func2<? super U, ? super U, Boolean> comparator;

private static class Holder {
static final OperatorDistinctUntilChanged<?,?> INSTANCE = new OperatorDistinctUntilChanged<Object,Object>(UtilityFunctions.identity());
}
Expand All @@ -48,6 +50,19 @@ public static <T> OperatorDistinctUntilChanged<T, T> instance() {

public OperatorDistinctUntilChanged(Func1<? super T, ? extends U> keySelector) {
this.keySelector = keySelector;
this.comparator = this;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cute trick


}

@SuppressWarnings({ "unchecked", "rawtypes" })
public OperatorDistinctUntilChanged(Func2<? super U, ? super U, Boolean> comparator) {
this.keySelector = (Func1)UtilityFunctions.identity();
this.comparator = comparator;
}

@Override
public Boolean call(U t1, U t2) {
return (t1 == t2 || (t1 != null && t1.equals(t2)));
}

@Override
Expand All @@ -68,7 +83,16 @@ public void onNext(T t) {
previousKey = key;

if (hasPrevious) {
if (!(currentKey == key || (key != null && key.equals(currentKey)))) {
boolean comparison;

try {
comparison = comparator.call(currentKey, key);
} catch (Throwable e) {
Exceptions.throwOrReport(e, child, key);
return;
}

if (!comparison) {
Copy link
Contributor

@JakeWharton JakeWharton Jun 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a subtle NPE potential here. Not sure if you want to explicitly guard against it from a misbehaving comparator implementation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I'll change the comparison to be primitive boolean to trigger NPE inside the try-block if necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

child.onNext(t);
} else {
request(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,19 @@
package rx.internal.operators;

import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.junit.*;
import org.mockito.*;

import rx.Observable;
import rx.Observer;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.*;
import rx.exceptions.TestException;
import rx.functions.*;
import rx.observers.TestSubscriber;

public class OperatorDistinctUntilChangedTest {

Expand Down Expand Up @@ -164,4 +158,43 @@ public void call(Throwable t) {
.subscribe(w);
assertFalse(errorOccurred.get());
}

@Test
public void customComparator() {
Observable<String> source = Observable.just("a", "b", "B", "A","a", "C");

TestSubscriber<String> ts = TestSubscriber.create();

source.distinctUntilChanged(new Func2<String, String, Boolean>() {
@Override
public Boolean call(String a, String b) {
return a.compareToIgnoreCase(b) == 0;
}
})
.subscribe(ts);

ts.assertValues("a", "b", "A", "C");
ts.assertNoErrors();
ts.assertCompleted();
}

@Test
public void customComparatorThrows() {
Observable<String> source = Observable.just("a", "b", "B", "A","a", "C");

TestSubscriber<String> ts = TestSubscriber.create();

source.distinctUntilChanged(new Func2<String, String, Boolean>() {
@Override
public Boolean call(String a, String b) {
throw new TestException();
}
})
.subscribe(ts);

ts.assertValue("a");
ts.assertNotCompleted();
ts.assertError(TestException.class);
}

}