Skip to content

Commit 6bef4ce

Browse files
authored
1.x: distinctUntilChanged with direct value comparator - alternative (#4034)
* 1.x: distinctUntilChanged with direct value comparator - alternative * Use primitive boolean * Add experimental annotation
1 parent c5a0c36 commit 6bef4ce

File tree

3 files changed

+97
-17
lines changed

3 files changed

+97
-17
lines changed

src/main/java/rx/Observable.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4804,6 +4804,29 @@ public final <U> Observable<T> distinctUntilChanged(Func1<? super T, ? extends U
48044804
return lift(new OperatorDistinctUntilChanged<T, U>(keySelector));
48054805
}
48064806

4807+
/**
4808+
* Returns an Observable that emits all items emitted by the source Observable that are distinct from their
4809+
* immediate predecessors when compared with each other via the provided comparator function.
4810+
* <p>
4811+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/distinctUntilChanged.png" alt="">
4812+
* <dl>
4813+
* <dt><b>Scheduler:</b></dt>
4814+
* <dd>{@code distinctUntilChanged} does not operate by default on a particular {@link Scheduler}.</dd>
4815+
* </dl>
4816+
*
4817+
* @param comparator the function that receives the previous item and the current item and is
4818+
* expected to return true if the two are equal, thus skipping the current value.
4819+
* @return an Observable that emits those items from the source Observable that are distinct from their
4820+
* immediate predecessors
4821+
* @see <a href="http://reactivex.io/documentation/operators/distinct.html">ReactiveX operators documentation: Distinct</a>
4822+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical
4823+
* with the release number)
4824+
*/
4825+
@Experimental
4826+
public final Observable<T> distinctUntilChanged(Func2<? super T, ? super T, Boolean> comparator) {
4827+
return lift(new OperatorDistinctUntilChanged<T, T>(comparator));
4828+
}
4829+
48074830
/**
48084831
* Modifies the source Observable so that it invokes an action when it calls {@code onCompleted}.
48094832
* <p>

src/main/java/rx/internal/operators/OperatorDistinctUntilChanged.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,19 @@
1818
import rx.Observable.Operator;
1919
import rx.Subscriber;
2020
import rx.exceptions.Exceptions;
21-
import rx.functions.Func1;
21+
import rx.functions.*;
2222
import rx.internal.util.UtilityFunctions;
2323

2424
/**
2525
* Returns an Observable that emits all sequentially distinct items emitted by the source.
2626
* @param <T> the value type
2727
* @param <U> the key type
2828
*/
29-
public final class OperatorDistinctUntilChanged<T, U> implements Operator<T, T> {
29+
public final class OperatorDistinctUntilChanged<T, U> implements Operator<T, T>, Func2<U, U, Boolean> {
3030
final Func1<? super T, ? extends U> keySelector;
3131

32+
final Func2<? super U, ? super U, Boolean> comparator;
33+
3234
private static class Holder {
3335
static final OperatorDistinctUntilChanged<?,?> INSTANCE = new OperatorDistinctUntilChanged<Object,Object>(UtilityFunctions.identity());
3436
}
@@ -48,6 +50,19 @@ public static <T> OperatorDistinctUntilChanged<T, T> instance() {
4850

4951
public OperatorDistinctUntilChanged(Func1<? super T, ? extends U> keySelector) {
5052
this.keySelector = keySelector;
53+
this.comparator = this;
54+
55+
}
56+
57+
@SuppressWarnings({ "unchecked", "rawtypes" })
58+
public OperatorDistinctUntilChanged(Func2<? super U, ? super U, Boolean> comparator) {
59+
this.keySelector = (Func1)UtilityFunctions.identity();
60+
this.comparator = comparator;
61+
}
62+
63+
@Override
64+
public Boolean call(U t1, U t2) {
65+
return (t1 == t2 || (t1 != null && t1.equals(t2)));
5166
}
5267

5368
@Override
@@ -68,7 +83,16 @@ public void onNext(T t) {
6883
previousKey = key;
6984

7085
if (hasPrevious) {
71-
if (!(currentKey == key || (key != null && key.equals(currentKey)))) {
86+
boolean comparison;
87+
88+
try {
89+
comparison = comparator.call(currentKey, key);
90+
} catch (Throwable e) {
91+
Exceptions.throwOrReport(e, child, key);
92+
return;
93+
}
94+
95+
if (!comparison) {
7296
child.onNext(t);
7397
} else {
7498
request(1);

src/test/java/rx/internal/operators/OperatorDistinctUntilChangedTest.java

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,19 @@
1616
package rx.internal.operators;
1717

1818
import static org.junit.Assert.assertFalse;
19-
import static org.mockito.Matchers.any;
20-
import static org.mockito.Matchers.anyString;
21-
import static org.mockito.Mockito.inOrder;
22-
import static org.mockito.Mockito.never;
23-
import static org.mockito.Mockito.times;
24-
import static org.mockito.Mockito.verify;
19+
import static org.mockito.Matchers.*;
20+
import static org.mockito.Mockito.*;
2521
import static org.mockito.MockitoAnnotations.initMocks;
2622

2723
import java.util.concurrent.atomic.AtomicBoolean;
2824

29-
import org.junit.Before;
30-
import org.junit.Test;
31-
import org.mockito.InOrder;
32-
import org.mockito.Mock;
25+
import org.junit.*;
26+
import org.mockito.*;
3327

34-
import rx.Observable;
35-
import rx.Observer;
36-
import rx.functions.Action1;
37-
import rx.functions.Func1;
28+
import rx.*;
29+
import rx.exceptions.TestException;
30+
import rx.functions.*;
31+
import rx.observers.TestSubscriber;
3832

3933
public class OperatorDistinctUntilChangedTest {
4034

@@ -164,4 +158,43 @@ public void call(Throwable t) {
164158
.subscribe(w);
165159
assertFalse(errorOccurred.get());
166160
}
161+
162+
@Test
163+
public void customComparator() {
164+
Observable<String> source = Observable.just("a", "b", "B", "A","a", "C");
165+
166+
TestSubscriber<String> ts = TestSubscriber.create();
167+
168+
source.distinctUntilChanged(new Func2<String, String, Boolean>() {
169+
@Override
170+
public Boolean call(String a, String b) {
171+
return a.compareToIgnoreCase(b) == 0;
172+
}
173+
})
174+
.subscribe(ts);
175+
176+
ts.assertValues("a", "b", "A", "C");
177+
ts.assertNoErrors();
178+
ts.assertCompleted();
179+
}
180+
181+
@Test
182+
public void customComparatorThrows() {
183+
Observable<String> source = Observable.just("a", "b", "B", "A","a", "C");
184+
185+
TestSubscriber<String> ts = TestSubscriber.create();
186+
187+
source.distinctUntilChanged(new Func2<String, String, Boolean>() {
188+
@Override
189+
public Boolean call(String a, String b) {
190+
throw new TestException();
191+
}
192+
})
193+
.subscribe(ts);
194+
195+
ts.assertValue("a");
196+
ts.assertNotCompleted();
197+
ts.assertError(TestException.class);
198+
}
199+
167200
}

0 commit comments

Comments
 (0)