Skip to content

Commit 62c1bd1

Browse files
storozhukBMRobWin
authored andcommitted
Issue ReactiveX#135 CompletionStage support in RateLimiter (ReactiveX#136)
* Issue ReactiveX#135 CompletionStage support in RateLimiter * Issue ReactiveX#135 codacy issue fix * Issue ReactiveX#135 codacy issue fix * Issue ReactiveX#135 Decorators interface support
1 parent ec5abeb commit 62c1bd1

File tree

5 files changed

+130
-86
lines changed

5 files changed

+130
-86
lines changed

resilience4j-all/src/main/java/io/github/resilience4j/decorators/Decorators.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,11 @@ public DecorateCompletionStage<T> withBulkhead(Bulkhead bulkhead) {
303303
return this;
304304
}
305305

306+
public DecorateCompletionStage<T> withRateLimiter(RateLimiter rateLimiter) {
307+
stageSupplier = RateLimiter.decorateCompletionStage(rateLimiter, stageSupplier);
308+
return this;
309+
}
310+
306311
public Supplier<CompletionStage<T>> decorate() {
307312
return stageSupplier;
308313
}

resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/operator/CircuitBreakerOperator.java

Lines changed: 16 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
package io.github.resilience4j.circuitbreaker.operator;
2020

2121

22+
import org.reactivestreams.Subscriber;
23+
import org.reactivestreams.Subscription;
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
2227
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
2328
import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException;
2429
import io.github.resilience4j.core.StopWatch;
@@ -28,10 +33,6 @@
2833
import io.reactivex.SingleObserver;
2934
import io.reactivex.SingleOperator;
3035
import io.reactivex.disposables.Disposable;
31-
import org.reactivestreams.Subscriber;
32-
import org.reactivestreams.Subscription;
33-
import org.slf4j.Logger;
34-
import org.slf4j.LoggerFactory;
3536

3637
import java.util.concurrent.atomic.AtomicBoolean;
3738

@@ -102,9 +103,7 @@ private final class CircuitBreakerSubscriber implements Subscriber<T>, Subscript
102103
@Override
103104
public void onSubscribe(Subscription subscription) {
104105
this.subscription = subscription;
105-
if (LOG.isDebugEnabled()) {
106-
LOG.info("onSubscribe");
107-
}
106+
LOG.debug("onSubscribe");
108107
if (circuitBreaker.isCallPermitted()) {
109108
stopWatch = StopWatch.start(circuitBreaker.getName());
110109
childSubscriber.onSubscribe(this);
@@ -121,9 +120,7 @@ public void onSubscribe(Subscription subscription) {
121120
*/
122121
@Override
123122
public void onNext(T event) {
124-
if (LOG.isDebugEnabled()) {
125-
LOG.info("onNext: {}", event);
126-
}
123+
LOG.debug("onNext: {}", event);
127124
if (!isCancelled()) {
128125
childSubscriber.onNext(event);
129126
}
@@ -134,9 +131,7 @@ public void onNext(T event) {
134131
*/
135132
@Override
136133
public void onError(Throwable e) {
137-
if (LOG.isDebugEnabled()) {
138-
LOG.info("onError", e);
139-
}
134+
LOG.debug("onError", e);
140135
if (!isCancelled()) {
141136
circuitBreaker.onError(stopWatch.stop().getProcessingDuration().toNanos(), e);
142137
childSubscriber.onError(e);
@@ -149,9 +144,7 @@ public void onError(Throwable e) {
149144
*/
150145
@Override
151146
public void onComplete() {
152-
if (LOG.isDebugEnabled()) {
153-
LOG.info("onComplete");
154-
}
147+
LOG.debug("onComplete");
155148
if (!isCancelled()) {
156149
circuitBreaker.onSuccess(stopWatch.stop().getProcessingDuration().toNanos());
157150
childSubscriber.onComplete();
@@ -199,9 +192,7 @@ private final class CircuitBreakerObserver implements Observer<T>, Disposable {
199192
@Override
200193
public void onSubscribe(Disposable disposable) {
201194
this.disposable = disposable;
202-
if (LOG.isDebugEnabled()) {
203-
LOG.info("onSubscribe");
204-
}
195+
LOG.debug("onSubscribe");
205196
if (circuitBreaker.isCallPermitted()) {
206197
stopWatch = StopWatch.start(circuitBreaker.getName());
207198
childObserver.onSubscribe(this);
@@ -218,9 +209,7 @@ public void onSubscribe(Disposable disposable) {
218209
*/
219210
@Override
220211
public void onNext(T event) {
221-
if (LOG.isDebugEnabled()) {
222-
LOG.info("onNext: {}", event);
223-
}
212+
LOG.debug("onNext: {}", event);
224213
if (!isDisposed()) {
225214
childObserver.onNext(event);
226215
}
@@ -231,9 +220,7 @@ public void onNext(T event) {
231220
*/
232221
@Override
233222
public void onError(Throwable e) {
234-
if (LOG.isDebugEnabled()) {
235-
LOG.info("onError", e);
236-
}
223+
LOG.debug("onError", e);
237224
if (!isDisposed()) {
238225
circuitBreaker.onError(stopWatch.stop().getProcessingDuration().toNanos(), e);
239226
childObserver.onError(e);
@@ -245,9 +232,7 @@ public void onError(Throwable e) {
245232
*/
246233
@Override
247234
public void onComplete() {
248-
if (LOG.isDebugEnabled()) {
249-
LOG.info("onComplete");
250-
}
235+
LOG.debug("onComplete");
251236
if (!isDisposed()) {
252237
circuitBreaker.onSuccess(stopWatch.stop().getProcessingDuration().toNanos());
253238
childObserver.onComplete();
@@ -292,9 +277,7 @@ private class CircuitBreakerSingleObserver implements SingleObserver<T>, Disposa
292277
@Override
293278
public void onSubscribe(Disposable disposable) {
294279
this.disposable = disposable;
295-
if (LOG.isDebugEnabled()) {
296-
LOG.info("onSubscribe");
297-
}
280+
LOG.debug("onSubscribe");
298281
if (circuitBreaker.isCallPermitted()) {
299282
stopWatch = StopWatch.start(circuitBreaker.getName());
300283
childObserver.onSubscribe(this);
@@ -311,9 +294,7 @@ public void onSubscribe(Disposable disposable) {
311294
*/
312295
@Override
313296
public void onError(Throwable e) {
314-
if (LOG.isDebugEnabled()) {
315-
LOG.info("onError", e);
316-
}
297+
LOG.debug("onError", e);
317298
if (!isDisposed()) {
318299
circuitBreaker.onError(stopWatch.stop().getProcessingDuration().toNanos(), e);
319300
childObserver.onError(e);
@@ -325,9 +306,7 @@ public void onError(Throwable e) {
325306
*/
326307
@Override
327308
public void onSuccess(T value) {
328-
if (LOG.isDebugEnabled()) {
329-
LOG.info("onComplete");
330-
}
309+
LOG.debug("onComplete");
331310
if (!isDisposed()) {
332311
circuitBreaker.onSuccess(stopWatch.stop().getProcessingDuration().toNanos());
333312
childObserver.onSuccess(value);

resilience4j-ratelimiter/src/main/java/io/github/resilience4j/ratelimiter/RateLimiter.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import java.time.Duration;
2929
import java.util.concurrent.Callable;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.CompletionStage;
3032
import java.util.function.Consumer;
3133
import java.util.function.Function;
3234
import java.util.function.Supplier;
@@ -69,6 +71,37 @@ static RateLimiter ofDefaults(String name) {
6971
return new AtomicRateLimiter(name, RateLimiterConfig.ofDefaults());
7072
}
7173

74+
/**
75+
* Returns a supplier which is decorated by a rateLimiter.
76+
*
77+
* @param rateLimiter the rateLimiter
78+
* @param supplier the original supplier
79+
* @param <T> the type of the returned CompletionStage's result
80+
* @return a supplier which is decorated by a RateLimiter.
81+
*/
82+
static <T> Supplier<CompletionStage<T>> decorateCompletionStage(RateLimiter rateLimiter, Supplier<CompletionStage<T>> supplier) {
83+
return () -> {
84+
85+
final CompletableFuture<T> promise = new CompletableFuture<>();
86+
try {
87+
waitForPermission(rateLimiter);
88+
supplier.get()
89+
.whenComplete(
90+
(result, throwable) -> {
91+
if (throwable != null) {
92+
promise.completeExceptionally(throwable);
93+
} else {
94+
promise.complete(result);
95+
}
96+
}
97+
);
98+
} catch (Throwable throwable) {
99+
promise.completeExceptionally(throwable);
100+
}
101+
return promise;
102+
};
103+
}
104+
72105
/**
73106
* Creates a supplier which is restricted by a RateLimiter.
74107
*

resilience4j-ratelimiter/src/main/java/io/github/resilience4j/ratelimiter/operator/RateLimiterOperator.java

Lines changed: 16 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616

1717
package io.github.resilience4j.ratelimiter.operator;
1818

19+
import org.reactivestreams.Subscriber;
20+
import org.reactivestreams.Subscription;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
1924
import io.github.resilience4j.ratelimiter.RateLimiter;
2025
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
2126
import io.reactivex.FlowableOperator;
@@ -24,10 +29,6 @@
2429
import io.reactivex.SingleObserver;
2530
import io.reactivex.SingleOperator;
2631
import io.reactivex.disposables.Disposable;
27-
import org.reactivestreams.Subscriber;
28-
import org.reactivestreams.Subscription;
29-
import org.slf4j.Logger;
30-
import org.slf4j.LoggerFactory;
3132

3233
import java.util.concurrent.atomic.AtomicBoolean;
3334

@@ -92,9 +93,7 @@ private final class RateLimiterSubscriber implements Subscriber<T>, Subscription
9293
@Override
9394
public void onSubscribe(Subscription subscription) {
9495
this.subscription = subscription;
95-
if (LOG.isDebugEnabled()) {
96-
LOG.info("onSubscribe");
97-
}
96+
LOG.debug("onSubscribe");
9897
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
9998
childSubscriber.onSubscribe(this);
10099
} else {
@@ -109,9 +108,7 @@ public void onSubscribe(Subscription subscription) {
109108
*/
110109
@Override
111110
public void onNext(T event) {
112-
if (LOG.isDebugEnabled()) {
113-
LOG.info("onNext: {}", event);
114-
}
111+
LOG.debug("onNext: {}", event);
115112
if (!isCancelled()) {
116113
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
117114
childSubscriber.onNext(event);
@@ -127,9 +124,7 @@ public void onNext(T event) {
127124
*/
128125
@Override
129126
public void onError(Throwable e) {
130-
if (LOG.isDebugEnabled()) {
131-
LOG.info("onError", e);
132-
}
127+
LOG.debug("onError", e);
133128
if (!isCancelled()) {
134129
childSubscriber.onError(e);
135130

@@ -141,9 +136,7 @@ public void onError(Throwable e) {
141136
*/
142137
@Override
143138
public void onComplete() {
144-
if (LOG.isDebugEnabled()) {
145-
LOG.info("onComplete");
146-
}
139+
LOG.debug("onComplete");
147140
if (!isCancelled()) {
148141
childSubscriber.onComplete();
149142
}
@@ -189,9 +182,7 @@ private final class RateLimiterObserver implements Observer<T>, Disposable {
189182
@Override
190183
public void onSubscribe(Disposable disposable) {
191184
this.disposable = disposable;
192-
if (LOG.isDebugEnabled()) {
193-
LOG.info("onSubscribe");
194-
}
185+
LOG.debug("onSubscribe");
195186
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
196187
childObserver.onSubscribe(this);
197188
} else {
@@ -206,9 +197,7 @@ public void onSubscribe(Disposable disposable) {
206197
*/
207198
@Override
208199
public void onNext(T event) {
209-
if (LOG.isDebugEnabled()) {
210-
LOG.info("onNext: {}", event);
211-
}
200+
LOG.debug("onNext: {}", event);
212201
if (!isDisposed()) {
213202
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
214203
childObserver.onNext(event);
@@ -224,9 +213,7 @@ public void onNext(T event) {
224213
*/
225214
@Override
226215
public void onError(Throwable e) {
227-
if (LOG.isDebugEnabled()) {
228-
LOG.info("onError", e);
229-
}
216+
LOG.debug("onError", e);
230217
if (!isDisposed()) {
231218
childObserver.onError(e);
232219
}
@@ -237,9 +224,7 @@ public void onError(Throwable e) {
237224
*/
238225
@Override
239226
public void onComplete() {
240-
if (LOG.isDebugEnabled()) {
241-
LOG.info("onComplete");
242-
}
227+
LOG.debug("onComplete");
243228
if (!isDisposed()) {
244229
childObserver.onComplete();
245230
}
@@ -282,9 +267,7 @@ private class RateLimiterSingleObserver implements SingleObserver<T>, Disposable
282267
@Override
283268
public void onSubscribe(Disposable disposable) {
284269
this.disposable = disposable;
285-
if (LOG.isDebugEnabled()) {
286-
LOG.info("onSubscribe");
287-
}
270+
LOG.debug("onSubscribe");
288271
if (rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) {
289272
childObserver.onSubscribe(this);
290273
} else {
@@ -299,9 +282,7 @@ public void onSubscribe(Disposable disposable) {
299282
*/
300283
@Override
301284
public void onError(Throwable e) {
302-
if (LOG.isDebugEnabled()) {
303-
LOG.info("onError", e);
304-
}
285+
LOG.debug("onError", e);
305286
if (!isDisposed()) {
306287
childObserver.onError(e);
307288
}
@@ -312,9 +293,7 @@ public void onError(Throwable e) {
312293
*/
313294
@Override
314295
public void onSuccess(T value) {
315-
if (LOG.isDebugEnabled()) {
316-
LOG.info("onComplete");
317-
}
296+
LOG.debug("onComplete");
318297
if (!isDisposed()) {
319298
childObserver.onSuccess(value);
320299
}

0 commit comments

Comments
 (0)