Skip to content

Commit 0389530

Browse files
authored
Issue ReactiveX#384: Added higher-order functions to chain resultHandler and e… (ReactiveX#415)
1 parent 945d6d1 commit 0389530

File tree

8 files changed

+383
-12
lines changed

8 files changed

+383
-12
lines changed

resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/CircuitBreakerTest.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
package io.github.resilience4j.circuitbreaker;
2020

2121
import io.github.resilience4j.test.HelloWorldService;
22-
import io.vavr.*;
22+
import io.vavr.CheckedConsumer;
23+
import io.vavr.CheckedFunction0;
24+
import io.vavr.CheckedFunction1;
25+
import io.vavr.CheckedRunnable;
2326
import io.vavr.control.Try;
2427
import org.junit.Before;
2528
import org.junit.Test;
@@ -30,14 +33,16 @@
3033
import java.io.IOException;
3134
import java.net.SocketTimeoutException;
3235
import java.time.Duration;
33-
import java.util.concurrent.*;
36+
import java.util.concurrent.Callable;
37+
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.CompletionStage;
39+
import java.util.concurrent.ExecutionException;
3440
import java.util.function.Consumer;
3541
import java.util.function.Function;
3642
import java.util.function.Supplier;
3743

3844
import static io.vavr.API.*;
39-
import static io.vavr.API.$;
40-
import static io.vavr.Predicates.*;
45+
import static io.vavr.Predicates.instanceOf;
4146
import static org.assertj.core.api.Assertions.assertThat;
4247
import static org.assertj.core.api.Assertions.assertThatThrownBy;
4348

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package io.github.resilience4j.core;
2+
3+
import java.util.concurrent.Callable;
4+
import java.util.function.BiFunction;
5+
import java.util.function.Function;
6+
7+
public class CallableUtils {
8+
9+
private CallableUtils(){}
10+
11+
/**
12+
* Returns a composed function that first applies the Callable and then applies
13+
* the resultHandler.
14+
*
15+
* @param <T> return type of callable
16+
* @param <R> return type of handler
17+
* @param resultHandler the function applied after callable
18+
* @return a function composed of supplier and resultHandler
19+
*/
20+
public static <T, R> Callable<R> andThen(Callable<T> callable, Function<T, R> resultHandler){
21+
return () -> resultHandler.apply(callable.call());
22+
}
23+
24+
/**
25+
* Returns a composed function that first applies the Callable and then applies
26+
* {@linkplain BiFunction} {@code after} to the result.
27+
*
28+
* @param <T> return type of callable
29+
* @param <R> return type of handler
30+
* @param handler the function applied after callable
31+
* @return a function composed of supplier and handler
32+
*/
33+
public static <T, R> Callable<R> andThen(Callable<T> callable, BiFunction<T, Exception, R> handler){
34+
return () -> {
35+
try{
36+
T result = callable.call();
37+
return handler.apply(result, null);
38+
}catch (Exception exception){
39+
return handler.apply(null, exception);
40+
}
41+
};
42+
}
43+
44+
/**
45+
* Returns a composed function that first applies the Callable and then applies
46+
* either the resultHandler or exceptionHandler.
47+
*
48+
* @param <T> return type of callable
49+
* @param <R> return type of resultHandler and exceptionHandler
50+
* @param resultHandler the function applied after callable was successful
51+
* @param exceptionHandler the function applied after callable has failed
52+
* @return a function composed of supplier and handler
53+
*/
54+
public static <T, R> Callable<R> andThen(Callable<T> callable, Function<T, R> resultHandler, Function<Exception, R> exceptionHandler){
55+
return () -> {
56+
try{
57+
T result = callable.call();
58+
return resultHandler.apply(result);
59+
}catch (Exception exception){
60+
return exceptionHandler.apply(exception);
61+
}
62+
};
63+
}
64+
65+
/**
66+
* Returns a composed function that first executes the Callable and optionally recovers from an exception.
67+
*
68+
* @param <T> return type of after
69+
* @param exceptionHandler the exception handler
70+
* @return a function composed of callable and exceptionHandler
71+
*/
72+
public static <T> Callable<T> recover(Callable<T> callable, Function<Exception, T> exceptionHandler){
73+
return () -> {
74+
try{
75+
return callable.call();
76+
}catch (Exception exception){
77+
return exceptionHandler.apply(exception);
78+
}
79+
};
80+
}
81+
}

resilience4j-core/src/main/java/io/github/resilience4j/core/EventProcessor.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,16 @@
2525

2626
public class EventProcessor<T> implements EventPublisher<T> {
2727

28-
protected volatile boolean consumerRegistered;
29-
@Nullable private volatile EventConsumer<T> onEventConsumer;
28+
private boolean consumerRegistered;
29+
@Nullable private EventConsumer<T> onEventConsumer;
3030
private ConcurrentMap<Class<? extends T>, EventConsumer<Object>> eventConsumers = new ConcurrentHashMap<>();
3131

3232
public boolean hasConsumers(){
3333
return consumerRegistered;
3434
}
3535

3636
@SuppressWarnings("unchecked")
37-
public <E extends T> void registerConsumer(Class<? extends E> eventType, EventConsumer<E> eventConsumer){
37+
public synchronized <E extends T> void registerConsumer(Class<? extends E> eventType, EventConsumer<E> eventConsumer){
3838
consumerRegistered = true;
3939
eventConsumers.put(eventType, (EventConsumer<Object>) eventConsumer);
4040
}
@@ -58,7 +58,7 @@ public <E extends T> boolean processEvent(E event) {
5858
}
5959

6060
@Override
61-
public void onEvent(@Nullable EventConsumer<T> onEventConsumer) {
61+
public synchronized void onEvent(@Nullable EventConsumer<T> onEventConsumer) {
6262
consumerRegistered = true;
6363
this.onEventConsumer = onEventConsumer;
6464
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package io.github.resilience4j.core;
2+
3+
import java.util.function.BiFunction;
4+
import java.util.function.Function;
5+
import java.util.function.Supplier;
6+
7+
public class SupplierUtils {
8+
9+
private SupplierUtils(){}
10+
11+
/**
12+
* Returns a composed function that first applies the Supplier and then applies
13+
* the resultHandler.
14+
*
15+
* @param <T> return type of callable
16+
* @param <R> return type of handler
17+
* @param resultHandler the function applied after supplier
18+
* @return a function composed of supplier and resultHandler
19+
*/
20+
public static <T, R> Supplier<R> andThen(Supplier<T> supplier, Function<T, R> resultHandler){
21+
return () -> resultHandler.apply(supplier.get());
22+
}
23+
24+
/**
25+
* Returns a composed function that first applies the Supplier and then applies
26+
* {@linkplain BiFunction} {@code after} to the result.
27+
*
28+
* @param <T> return type of after
29+
* @param handler the function applied after supplier
30+
* @return a function composed of supplier and handler
31+
*/
32+
public static <T, R> Supplier<R> andThen(Supplier<T> supplier, BiFunction<T, Exception, R> handler){
33+
return () -> {
34+
try{
35+
T result = supplier.get();
36+
return handler.apply(result, null);
37+
}catch (Exception exception){
38+
return handler.apply(null, exception);
39+
}
40+
};
41+
}
42+
43+
/**
44+
* Returns a composed function that first executes the Supplier and optionally recovers from an exception.
45+
*
46+
* @param <T> return type of after
47+
* @param exceptionHandler the exception handler
48+
* @return a function composed of supplier and exceptionHandler
49+
*/
50+
public static <T> Supplier<T> recover(Supplier<T> supplier, Function<Exception, T> exceptionHandler){
51+
return () -> {
52+
try{
53+
return supplier.get();
54+
}catch (Exception exception){
55+
return exceptionHandler.apply(exception);
56+
}
57+
};
58+
}
59+
60+
/**
61+
* Returns a composed function that first applies the Supplier and then applies
62+
* either the resultHandler or exceptionHandler.
63+
*
64+
* @param <T> return type of after
65+
* @param resultHandler the function applied after Supplier was successful
66+
* @param exceptionHandler the function applied after Supplier has failed
67+
* @return a function composed of supplier and handler
68+
*/
69+
public static <T, R> Supplier<R> andThen(Supplier<T> supplier, Function<T, R> resultHandler, Function<Exception, R> exceptionHandler){
70+
return () -> {
71+
try{
72+
T result = supplier.get();
73+
return resultHandler.apply(result);
74+
}catch (Exception exception){
75+
return exceptionHandler.apply(exception);
76+
}
77+
};
78+
}
79+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package io.github.resilience4j.core;
2+
3+
import org.junit.Test;
4+
5+
import javax.xml.ws.WebServiceException;
6+
import java.io.IOException;
7+
import java.util.concurrent.Callable;
8+
9+
import static org.assertj.core.api.Assertions.assertThat;
10+
11+
public class CallableUtilsTest {
12+
13+
@Test
14+
public void shouldChainCallableAndResultHandler() throws Exception {
15+
16+
Callable<String> supplier = () -> "BLA";
17+
//When
18+
Callable<String> callableWithRecovery = CallableUtils.andThen(supplier, result -> "Bla");
19+
20+
String result = callableWithRecovery.call();
21+
22+
//Then
23+
assertThat(result).isEqualTo("Bla");
24+
}
25+
26+
27+
@Test
28+
public void shouldChainCallableAndRecoverFromException() throws Exception {
29+
30+
Callable<String> callable = () -> {
31+
throw new IOException("BAM!");
32+
};
33+
//When
34+
Callable<String> callableWithRecovery = CallableUtils.andThen(callable, (result, ex) -> "Bla");
35+
36+
String result = callableWithRecovery.call();
37+
38+
//Then
39+
assertThat(result).isEqualTo("Bla");
40+
}
41+
42+
@Test
43+
public void shouldChainCallableAndRecoverWithErrorHandler() throws Exception {
44+
45+
Callable<String> callable = () -> {
46+
throw new IOException("BAM!");
47+
};
48+
//When
49+
Callable<String> callableWithRecovery = CallableUtils.andThen(callable, (result) -> result, ex -> "Bla");
50+
51+
String result = callableWithRecovery.call();
52+
53+
//Then
54+
assertThat(result).isEqualTo("Bla");
55+
}
56+
57+
@Test
58+
public void shouldRecoverCallableFromException() throws Exception {
59+
60+
Callable<String> callable = () -> {
61+
throw new IOException("BAM!");
62+
};
63+
//When
64+
Callable<String> callableWithRecovery = CallableUtils.recover(callable, (ex) -> "Bla");
65+
66+
String result = callableWithRecovery.call();
67+
68+
//Then
69+
assertThat(result).isEqualTo("Bla");
70+
}
71+
72+
@Test(expected = WebServiceException.class)
73+
public void shouldRethrowException() throws Exception {
74+
75+
Callable<String> callable = () -> {
76+
throw new IOException("BAM!");
77+
};
78+
//When
79+
Callable<String> callableWithRecovery = CallableUtils.recover(callable, (ex) -> {
80+
throw new WebServiceException();
81+
});
82+
83+
callableWithRecovery.call();
84+
}
85+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package io.github.resilience4j.core;
2+
3+
import org.junit.Test;
4+
5+
import javax.xml.ws.WebServiceException;
6+
import java.util.function.Supplier;
7+
8+
import static org.assertj.core.api.Assertions.assertThat;
9+
10+
public class SupplierUtilsTest {
11+
12+
@Test
13+
public void shouldChainSupplierAndResultHandler() {
14+
15+
Supplier<String> supplier = () -> "BLA";
16+
//When
17+
Supplier<String> supplierWithRecovery = SupplierUtils.andThen(supplier, result -> "Bla");
18+
19+
String result = supplierWithRecovery.get();
20+
21+
//Then
22+
assertThat(result).isEqualTo("Bla");
23+
}
24+
25+
@Test
26+
public void shouldChainSupplierAndRecoverWithHandler() {
27+
28+
Supplier<String> supplier = () -> {
29+
throw new RuntimeException("BAM!");
30+
};
31+
//When
32+
Supplier<String> supplierWithRecovery = SupplierUtils.andThen(supplier, (result, ex) -> "Bla");
33+
34+
String result = supplierWithRecovery.get();
35+
36+
//Then
37+
assertThat(result).isEqualTo("Bla");
38+
}
39+
40+
@Test
41+
public void shouldChainSupplierAndRecoverWithErrorHandler() {
42+
43+
Supplier<String> supplier = () -> {
44+
throw new RuntimeException("BAM!");
45+
};
46+
//When
47+
Supplier<String> supplierWithRecovery = SupplierUtils.andThen(supplier, (result) -> result, ex -> "Bla");
48+
49+
String result = supplierWithRecovery.get();
50+
51+
//Then
52+
assertThat(result).isEqualTo("Bla");
53+
}
54+
55+
56+
@Test
57+
public void shouldRecoverSupplierFromException() {
58+
59+
Supplier<String> supplier = () -> {
60+
throw new RuntimeException("BAM!");
61+
};
62+
//When
63+
Supplier<String> supplierWithRecovery = SupplierUtils.recover(supplier, (ex) -> "Bla");
64+
65+
String result = supplierWithRecovery.get();
66+
67+
//Then
68+
assertThat(result).isEqualTo("Bla");
69+
}
70+
71+
@Test(expected = WebServiceException.class)
72+
public void shouldRethrowException() {
73+
74+
Supplier<String> supplier = () -> {
75+
throw new RuntimeException("BAM!");
76+
};
77+
//When
78+
Supplier<String> supplierWithRecovery = SupplierUtils.recover(supplier, (ex) -> {
79+
throw new WebServiceException();
80+
});
81+
82+
supplierWithRecovery.get();
83+
}
84+
}

0 commit comments

Comments
 (0)