Skip to content

Commit d4d55f0

Browse files
vanniktechakarnokd
authored andcommitted
1.x: Completable add doOnEach (#4460)
* 1.x: Completable add doOnEach * Fix Generics
1 parent 30da1aa commit d4d55f0

File tree

2 files changed

+61
-2
lines changed

2 files changed

+61
-2
lines changed

src/main/java/rx/Completable.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1442,7 +1442,31 @@ public void onSubscribe(Subscription d) {
14421442
public final Completable doOnCompleted(Action0 onCompleted) {
14431443
return doOnLifecycle(Actions.empty(), Actions.empty(), onCompleted, Actions.empty(), Actions.empty());
14441444
}
1445-
1445+
1446+
/**
1447+
* Returns a Completable which calls the given onNotification callback when this Completable emits an error or completes.
1448+
* @param onNotification the notification callback
1449+
* @return the new Completable instance
1450+
* @throws NullPointerException if onNotification is null
1451+
*/
1452+
public final Completable doOnEach(final Action1<Notification<Object>> onNotification) {
1453+
if (onNotification == null) {
1454+
throw new IllegalArgumentException("onNotification is null");
1455+
}
1456+
1457+
return doOnLifecycle(Actions.empty(), new Action1<Throwable>() {
1458+
@Override
1459+
public void call(final Throwable throwable) {
1460+
onNotification.call(Notification.createOnError(throwable));
1461+
}
1462+
}, new Action0() {
1463+
@Override
1464+
public void call() {
1465+
onNotification.call(Notification.createOnCompleted());
1466+
}
1467+
}, Actions.empty(), Actions.empty());
1468+
}
1469+
14461470
/**
14471471
* Returns a Completable which calls the given onUnsubscribe callback if the child subscriber cancels
14481472
* the subscription.

src/test/java/rx/CompletableTest.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4174,4 +4174,39 @@ public Object call(Completable completable) {
41744174
assertSame(expectedResult, actualResult);
41754175
assertSame(c, completableRef.get());
41764176
}
4177-
}
4177+
4178+
@Test(expected = IllegalArgumentException.class)
4179+
public void doOnEachNullAction() {
4180+
Completable.complete().doOnEach(null);
4181+
}
4182+
4183+
@Test
4184+
public void doOnEachCompleted() {
4185+
final AtomicInteger atomicInteger = new AtomicInteger(0);
4186+
Completable.complete().doOnEach(new Action1<Notification<Object>>() {
4187+
@Override
4188+
public void call(final Notification<Object> notification) {
4189+
if (notification.isOnCompleted()) {
4190+
atomicInteger.incrementAndGet();
4191+
}
4192+
}
4193+
}).subscribe();
4194+
4195+
assertEquals(1, atomicInteger.get());
4196+
}
4197+
4198+
@Test
4199+
public void doOnEachError() {
4200+
final AtomicInteger atomicInteger = new AtomicInteger(0);
4201+
Completable.error(new RuntimeException("What?")).doOnEach(new Action1<Notification<Object>>() {
4202+
@Override
4203+
public void call(final Notification<Object> notification) {
4204+
if (notification.isOnError()) {
4205+
atomicInteger.incrementAndGet();
4206+
}
4207+
}
4208+
}).subscribe();
4209+
4210+
assertEquals(1, atomicInteger.get());
4211+
}
4212+
}

0 commit comments

Comments
 (0)