Skip to content

Commit d6470ba

Browse files
Dan MaasRobWin
authored andcommitted
Fixed ratpackk method interceptor and simplifying other method interceptors (ReactiveX#522)
1 parent c147b33 commit d6470ba

File tree

5 files changed

+67
-73
lines changed

5 files changed

+67
-73
lines changed

resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/bulkhead/BulkheadMethodInterceptor.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -84,21 +84,21 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
8484
io.github.resilience4j.bulkhead.Bulkhead bulkhead = registry.bulkhead(annotation.name());
8585
Class<?> returnType = invocation.getMethod().getReturnType();
8686
if (Promise.class.isAssignableFrom(returnType)) {
87-
Promise<?> result = (Promise<?>) invocation.proceed();
87+
Promise<?> result = (Promise<?>) proceed(invocation);
8888
if (result != null) {
8989
BulkheadTransformer transformer = BulkheadTransformer.of(bulkhead).recover(fallbackMethod);
9090
result = result.transform(transformer);
9191
}
9292
return result;
9393
} else if (Flux.class.isAssignableFrom(returnType)) {
94-
Flux<?> result = (Flux<?>) invocation.proceed();
94+
Flux<?> result = (Flux<?>) proceed(invocation);
9595
if (result != null) {
9696
BulkheadOperator operator = BulkheadOperator.of(bulkhead);
9797
result = fallbackMethod.onErrorResume(result.transform(operator));
9898
}
9999
return result;
100100
} else if (Mono.class.isAssignableFrom(returnType)) {
101-
Mono<?> result = (Mono<?>) invocation.proceed();
101+
Mono<?> result = (Mono<?>) proceed(invocation);
102102
if (result != null) {
103103
BulkheadOperator operator = BulkheadOperator.of(bulkhead);
104104
result = fallbackMethod.onErrorResume(result.transform(operator));
@@ -107,7 +107,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
107107
} else if (CompletionStage.class.isAssignableFrom(returnType)) {
108108
final CompletableFuture promise = new CompletableFuture<>();
109109
if (bulkhead.tryAcquirePermission()) {
110-
CompletionStage<?> result = (CompletionStage<?>) invocation.proceed();
110+
CompletionStage<?> result = (CompletionStage<?>) proceed(invocation);
111111
if (result != null) {
112112
result.whenComplete((value, throwable) -> {
113113
bulkhead.onComplete();
@@ -124,21 +124,17 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
124124
}
125125
return promise;
126126
} else {
127-
boolean permission = bulkhead.tryAcquirePermission();
128-
if (!permission) {
129-
Throwable t = new BulkheadFullException(bulkhead);
130-
return fallbackMethod.apply(t);
131-
}
132-
try {
133-
if (Thread.interrupted()) {
134-
throw new IllegalStateException("Thread was interrupted during permission wait");
135-
}
136-
return invocation.proceed();
137-
} catch (Exception e) {
138-
return fallbackMethod.apply(e);
139-
} finally {
140-
bulkhead.onComplete();
141-
}
127+
return handleProceedWithException(invocation, bulkhead, fallbackMethod);
128+
}
129+
}
130+
131+
132+
@Nullable
133+
private Object handleProceedWithException(MethodInvocation invocation, io.github.resilience4j.bulkhead.Bulkhead bulkhead, RecoveryFunction<?> recoveryFunction) throws Throwable {
134+
try {
135+
return io.github.resilience4j.bulkhead.Bulkhead.decorateCheckedSupplier(bulkhead, invocation::proceed).apply();
136+
} catch (Throwable throwable) {
137+
return recoveryFunction.apply(throwable);
142138
}
143139
}
144140

resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/circuitbreaker/CircuitBreakerMethodInterceptor.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
128128
}
129129
return promise;
130130
} else {
131-
try {
132-
return proceed(invocation, breaker);
133-
} catch (Throwable throwable) {
134-
return fallbackMethod.apply(throwable);
135-
}
131+
return handleProceedWithException(invocation, breaker, fallbackMethod);
136132
}
137133
}
138134

@@ -163,4 +159,12 @@ private Object proceed(MethodInvocation invocation, io.github.resilience4j.circu
163159
return result;
164160
}
165161

162+
@Nullable
163+
private Object handleProceedWithException(MethodInvocation invocation, io.github.resilience4j.circuitbreaker.CircuitBreaker breaker, RecoveryFunction<?> recoveryFunction) throws Throwable {
164+
try {
165+
return io.github.resilience4j.circuitbreaker.CircuitBreaker.decorateCheckedSupplier(breaker, invocation::proceed).apply();
166+
} catch (Throwable throwable) {
167+
return recoveryFunction.apply(throwable);
168+
}
169+
}
166170
}

resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/internal/AbstractMethodInterceptor.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
import io.github.resilience4j.ratpack.recovery.RecoveryFunction;
77
import org.aopalliance.intercept.MethodInterceptor;
88
import org.aopalliance.intercept.MethodInvocation;
9+
import ratpack.exec.Promise;
10+
import reactor.core.publisher.Flux;
11+
import reactor.core.publisher.Mono;
912

1013
import java.lang.reflect.Method;
1114
import java.util.Arrays;
@@ -15,6 +18,30 @@
1518

1619
public abstract class AbstractMethodInterceptor implements MethodInterceptor {
1720

21+
@Nullable
22+
protected Object proceed(MethodInvocation invocation) throws Throwable {
23+
Class<?> returnType = invocation.getMethod().getReturnType();
24+
Object result;
25+
try {
26+
result = invocation.proceed();
27+
} catch (Exception e) {
28+
if (Promise.class.isAssignableFrom(returnType)) {
29+
return Promise.error(e);
30+
} else if (Flux.class.isAssignableFrom(returnType)) {
31+
return Flux.error(e);
32+
} else if (Mono.class.isAssignableFrom(returnType)) {
33+
return Mono.error(e);
34+
} else if (CompletionStage.class.isAssignableFrom(returnType)) {
35+
CompletableFuture<?> future = new CompletableFuture<>();
36+
future.completeExceptionally(e);
37+
return future;
38+
} else {
39+
throw e;
40+
}
41+
}
42+
return result;
43+
}
44+
1845
@SuppressWarnings("unchecked")
1946
protected void completeFailedFuture(Throwable throwable, RecoveryFunction<?> fallbackMethod, CompletableFuture promise) {
2047
try {

resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/ratelimiter/RateLimiterMethodInterceptor.java

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -84,29 +84,29 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
8484
io.github.resilience4j.ratelimiter.RateLimiter rateLimiter = registry.rateLimiter(annotation.name());
8585
Class<?> returnType = invocation.getMethod().getReturnType();
8686
if (Promise.class.isAssignableFrom(returnType)) {
87-
Promise<?> result = (Promise<?>) proceed(invocation, rateLimiter, fallbackMethod);
87+
Promise<?> result = (Promise<?>) proceed(invocation);
8888
if (result != null) {
8989
RateLimiterTransformer transformer = RateLimiterTransformer.of(rateLimiter).recover(fallbackMethod);
9090
result = result.transform(transformer);
9191
}
9292
return result;
9393
} else if (Flux.class.isAssignableFrom(returnType)) {
94-
Flux<?> result = (Flux<?>) proceed(invocation, rateLimiter, fallbackMethod);
94+
Flux<?> result = (Flux<?>) proceed(invocation);
9595
if (result != null) {
9696
RateLimiterOperator operator = RateLimiterOperator.of(rateLimiter);
9797
result = fallbackMethod.onErrorResume(result.transform(operator));
9898
}
9999
return result;
100100
} else if (Mono.class.isAssignableFrom(returnType)) {
101-
Mono<?> result = (Mono<?>) proceed(invocation, rateLimiter, fallbackMethod);
101+
Mono<?> result = (Mono<?>) proceed(invocation);
102102
if (result != null) {
103103
RateLimiterOperator operator = RateLimiterOperator.of(rateLimiter);
104104
result = fallbackMethod.onErrorResume(result.transform(operator));
105105
}
106106
return result;
107107
} else if (CompletionStage.class.isAssignableFrom(returnType)) {
108108
if (rateLimiter.acquirePermission()) {
109-
return proceed(invocation, rateLimiter, fallbackMethod);
109+
return proceed(invocation);
110110
} else {
111111
final CompletableFuture promise = new CompletableFuture<>();
112112
Throwable t = new RequestNotPermitted(rateLimiter);
@@ -118,28 +118,13 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
118118
}
119119
}
120120

121-
@Nullable
122-
private Object proceed(MethodInvocation invocation, io.github.resilience4j.ratelimiter.RateLimiter rateLimiter, RecoveryFunction<?> recoveryFunction) throws Throwable {
123-
Object result;
124-
try {
125-
result = invocation.proceed();
126-
} catch (Exception e) {
127-
result = handleProceedWithException(invocation, rateLimiter, recoveryFunction);
128-
}
129-
return result;
130-
}
131-
132121
@Nullable
133122
private Object handleProceedWithException(MethodInvocation invocation, io.github.resilience4j.ratelimiter.RateLimiter rateLimiter, RecoveryFunction<?> recoveryFunction) throws Throwable {
134-
boolean permission = rateLimiter.acquirePermission();
135-
if (Thread.interrupted()) {
136-
throw new IllegalStateException("Thread was interrupted during permission wait");
137-
}
138-
if (!permission) {
139-
Throwable t = new RequestNotPermitted(rateLimiter);
123+
try {
124+
return io.github.resilience4j.ratelimiter.RateLimiter.decorateCheckedSupplier(rateLimiter, invocation::proceed).apply();
125+
} catch (Throwable t) {
140126
return recoveryFunction.apply(t);
141127
}
142-
return invocation.proceed();
143128
}
144129

145130
}

resilience4j-ratpack/src/main/java/io/github/resilience4j/ratpack/retry/RetryMethodInterceptor.java

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,14 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
8181
.orElse(new DefaultRecoveryFunction<>());
8282
Class<?> returnType = invocation.getMethod().getReturnType();
8383
if (Promise.class.isAssignableFrom(returnType)) {
84-
Promise<?> result = (Promise<?>) proceed(invocation, retry, fallbackMethod);
84+
Promise<?> result = (Promise<?>) proceed(invocation);
8585
if (result != null) {
8686
RetryTransformer transformer = RetryTransformer.of(retry).recover(fallbackMethod);
8787
result = result.transform(transformer);
8888
}
8989
return result;
9090
} else if (Flux.class.isAssignableFrom(returnType)) {
91-
Flux<?> result = (Flux<?>) proceed(invocation, retry, fallbackMethod);
91+
Flux<?> result = (Flux<?>) proceed(invocation);
9292
if (result != null) {
9393
RetryTransformer transformer = RetryTransformer.of(retry).recover(fallbackMethod);
9494
final Flux<?> temp = result;
@@ -103,7 +103,7 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
103103
}
104104
return result;
105105
} else if (Mono.class.isAssignableFrom(returnType)) {
106-
Mono<?> result = (Mono<?>) proceed(invocation, retry, fallbackMethod);
106+
Mono<?> result = (Mono<?>) proceed(invocation);
107107
if (result != null) {
108108
RetryTransformer transformer = RetryTransformer.of(retry).recover(fallbackMethod);
109109
final Mono<?> temp = result;
@@ -116,10 +116,10 @@ public Object invoke(MethodInvocation invocation) throws Throwable {
116116
return result;
117117
}
118118
else if (CompletionStage.class.isAssignableFrom(returnType)) {
119-
CompletionStage stage = (CompletionStage) proceed(invocation, retry, fallbackMethod);
119+
CompletionStage stage = (CompletionStage) proceed(invocation);
120120
return executeCompletionStage(invocation, stage, retry.context(), fallbackMethod);
121121
} else {
122-
return proceed(invocation, retry, fallbackMethod);
122+
return handleProceedWithException(invocation, retry, fallbackMethod);
123123
}
124124
}
125125

@@ -145,29 +145,11 @@ private CompletionStage<?> executeCompletionStage(MethodInvocation invocation, C
145145
}
146146

147147
@Nullable
148-
private Object proceed(MethodInvocation invocation, io.github.resilience4j.retry.Retry retry, RecoveryFunction<?> recoveryFunction) throws Throwable {
149-
io.github.resilience4j.retry.Retry.Context context = retry.context();
148+
private Object handleProceedWithException(MethodInvocation invocation, io.github.resilience4j.retry.Retry retry, RecoveryFunction<?> recoveryFunction) throws Throwable {
150149
try {
151-
Object result = invocation.proceed();
152-
context.onSuccess();
153-
return result;
154-
} catch (Exception e) {
155-
// exception thrown, we know a direct value was attempted to be returned
156-
Object result;
157-
context.onError(e);
158-
while (true) {
159-
try {
160-
result = invocation.proceed();
161-
context.onSuccess();
162-
return result;
163-
} catch (Exception e1) {
164-
try {
165-
context.onError(e1);
166-
} catch (Exception e2) {
167-
return recoveryFunction.apply(e2);
168-
}
169-
}
170-
}
150+
return io.github.resilience4j.retry.Retry.decorateCheckedSupplier(retry, invocation::proceed).apply();
151+
} catch (Throwable t) {
152+
return recoveryFunction.apply(t);
171153
}
172154
}
173155

0 commit comments

Comments
 (0)