Skip to content

Commit f2f2a9f

Browse files
author
Robert Winkler
committed
Issue ReactiveX#134: Added Bulkhead to Decorators class and added missing tests.
1 parent ad6116a commit f2f2a9f

File tree

4 files changed

+103
-2
lines changed

4 files changed

+103
-2
lines changed

README.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ Core modules:
2323
* resilience4j-circuitbreaker: Circuit breaking
2424
* resilience4j-ratelimiter: Rate limiting
2525
* resilience4j-bulkhead: Bulkheading
26-
* resilience4j-retry: Automatic retrying
26+
* resilience4j-retry: Automatic retrying (sync and async)
2727
* resilience4j-cache: Response caching
2828

2929
Add-on modules

resilience4j-all/src/main/java/io/github/resilience4j/decorators/Decorators.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.github.resilience4j.decorators;
22

3+
import io.github.resilience4j.bulkhead.Bulkhead;
34
import io.github.resilience4j.cache.Cache;
45
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
56
import io.github.resilience4j.ratelimiter.RateLimiter;
@@ -80,6 +81,11 @@ public DecorateSupplier<T> withRateLimiter(RateLimiter rateLimiter) {
8081
return this;
8182
}
8283

84+
public DecorateSupplier<T> withBulkhead(Bulkhead bulkhead) {
85+
supplier = Bulkhead.decorateSupplier(bulkhead, supplier);
86+
return this;
87+
}
88+
8389
public Supplier<T> decorate() {
8490
return supplier;
8591
}
@@ -111,6 +117,11 @@ public DecorateFunction<T, R> withRateLimiter(RateLimiter rateLimiter) {
111117
return this;
112118
}
113119

120+
public DecorateFunction<T, R> withBulkhead(Bulkhead bulkhead) {
121+
function = Bulkhead.decorateFunction(bulkhead, function);
122+
return this;
123+
}
124+
114125
public Function<T, R> decorate() {
115126
return function;
116127
}
@@ -142,6 +153,11 @@ public DecorateRunnable withRateLimiter(RateLimiter rateLimiter) {
142153
return this;
143154
}
144155

156+
public DecorateRunnable withBulkhead(Bulkhead bulkhead) {
157+
runnable = Bulkhead.decorateRunnable(bulkhead, runnable);
158+
return this;
159+
}
160+
145161
public Runnable decorate() {
146162
return runnable;
147163
}
@@ -178,6 +194,11 @@ public <K> DecorateCheckedFunction<K, T> withCache(Cache<K, T> cache) {
178194
return Decorators.ofCheckedFunction(Cache.decorateCheckedSupplier(cache, supplier));
179195
}
180196

197+
public DecorateCheckedSupplier<T> withBulkhead(Bulkhead bulkhead) {
198+
supplier = Bulkhead.decorateCheckedSupplier(bulkhead, supplier);
199+
return this;
200+
}
201+
181202
public CheckedFunction0<T> decorate() {
182203
return supplier;
183204
}
@@ -209,6 +230,11 @@ public DecorateCheckedFunction<T, R> withRateLimiter(RateLimiter rateLimiter) {
209230
return this;
210231
}
211232

233+
public DecorateCheckedFunction<T, R> withBulkhead(Bulkhead bulkhead) {
234+
function = Bulkhead.decorateCheckedFunction(bulkhead, function);
235+
return this;
236+
}
237+
212238
public CheckedFunction1<T, R> decorate() {
213239
return function;
214240
}
@@ -240,6 +266,11 @@ public DecorateCheckedRunnable withRateLimiter(RateLimiter rateLimiter) {
240266
return this;
241267
}
242268

269+
public DecorateCheckedRunnable withBulkhead(Bulkhead bulkhead) {
270+
runnable = Bulkhead.decorateCheckedRunnable(bulkhead, runnable);
271+
return this;
272+
}
273+
243274
public CheckedRunnable decorate() {
244275
return runnable;
245276
}
@@ -267,6 +298,11 @@ public DecorateCompletionStage<T> withRetry(AsyncRetry retryContext, ScheduledEx
267298
return this;
268299
}
269300

301+
public DecorateCompletionStage<T> withBulkhead(Bulkhead bulkhead) {
302+
stageSupplier = Bulkhead.decorateCompletionStage(bulkhead, stageSupplier);
303+
return this;
304+
}
305+
270306
public Supplier<CompletionStage<T>> decorate() {
271307
return stageSupplier;
272308
}
@@ -294,6 +330,11 @@ public DecorateConsumer<T> withRateLimiter(RateLimiter rateLimiter) {
294330
return this;
295331
}
296332

333+
public DecorateConsumer<T> withBulkhead(Bulkhead bulkhead) {
334+
consumer = Bulkhead.decorateConsumer(bulkhead, consumer);
335+
return this;
336+
}
337+
297338
public Consumer<T> decorate() {
298339
return consumer;
299340
}

resilience4j-all/src/test/java/io/github/resilience4j/decorators/DecoratorsTest.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
*/
1919
package io.github.resilience4j.decorators;
2020

21+
import io.github.resilience4j.bulkhead.Bulkhead;
2122
import io.github.resilience4j.cache.Cache;
2223
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
2324
import io.github.resilience4j.ratelimiter.RateLimiter;
2425
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
2526
import io.github.resilience4j.ratelimiter.RequestNotPermitted;
27+
import io.github.resilience4j.retry.AsyncRetry;
2628
import io.github.resilience4j.retry.Retry;
2729
import io.github.resilience4j.test.HelloWorldService;
2830
import io.vavr.CheckedFunction0;
@@ -35,6 +37,10 @@
3537

3638
import java.io.IOException;
3739
import java.time.Duration;
40+
import java.util.concurrent.CompletableFuture;
41+
import java.util.concurrent.CompletionStage;
42+
import java.util.concurrent.ExecutionException;
43+
import java.util.concurrent.Executors;
3844
import java.util.function.Function;
3945
import java.util.function.Supplier;
4046

@@ -62,6 +68,7 @@ public void testDecorateSupplier() {
6268
.withCircuitBreaker(circuitBreaker)
6369
.withRetry(Retry.ofDefaults("id"))
6470
.withRateLimiter(RateLimiter.ofDefaults("testName"))
71+
.withBulkhead(Bulkhead.ofDefaults("testName"))
6572
.decorate();
6673

6774
String result = decoratedSupplier.get();
@@ -84,6 +91,7 @@ public void testDecorateCheckedSupplier() throws IOException {
8491
.withCircuitBreaker(circuitBreaker)
8592
.withRetry(Retry.ofDefaults("id"))
8693
.withRateLimiter(RateLimiter.ofDefaults("testName"))
94+
.withBulkhead(Bulkhead.ofDefaults("testName"))
8795
.decorate();
8896

8997
String result = Try.of(decoratedSupplier).get();
@@ -104,6 +112,7 @@ public void testDecorateRunnable() {
104112
.withCircuitBreaker(circuitBreaker)
105113
.withRetry(Retry.ofDefaults("id"))
106114
.withRateLimiter(RateLimiter.ofDefaults("testName"))
115+
.withBulkhead(Bulkhead.ofDefaults("testName"))
107116
.decorate();
108117

109118
decoratedRunnable.run();
@@ -124,6 +133,7 @@ public void testDecorateCheckedRunnable() throws IOException {
124133
.withCircuitBreaker(circuitBreaker)
125134
.withRetry(Retry.ofDefaults("id"))
126135
.withRateLimiter(RateLimiter.ofDefaults("testName"))
136+
.withBulkhead(Bulkhead.ofDefaults("testName"))
127137
.decorate();
128138

129139
Try.run(decoratedRunnable);
@@ -135,6 +145,53 @@ public void testDecorateCheckedRunnable() throws IOException {
135145
BDDMockito.then(helloWorldService).should(times(1)).sayHelloWorldWithException();
136146
}
137147

148+
149+
@Test
150+
public void testDecorateCompletionStage() throws ExecutionException, InterruptedException {
151+
// Given the HelloWorldService returns Hello world
152+
given(helloWorldService.returnHelloWorld()).willReturn("Hello world");
153+
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("helloBackend");
154+
155+
Supplier<CompletionStage<String>> completionStageSupplier =
156+
() -> CompletableFuture.supplyAsync(helloWorldService::returnHelloWorld);
157+
158+
CompletionStage<String> completionStage = Decorators.ofCompletionStage(completionStageSupplier)
159+
.withCircuitBreaker(circuitBreaker)
160+
.withRetry(AsyncRetry.ofDefaults("id"), Executors.newSingleThreadScheduledExecutor())
161+
.withBulkhead(Bulkhead.ofDefaults("testName"))
162+
.get();
163+
164+
String value = completionStage.toCompletableFuture().get();
165+
assertThat(value).isEqualTo("Hello world");
166+
167+
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
168+
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1);
169+
assertThat(metrics.getNumberOfSuccessfulCalls()).isEqualTo(1);
170+
171+
// Then the helloWorldService should be invoked 1 time
172+
BDDMockito.then(helloWorldService).should(times(1)).returnHelloWorld();
173+
}
174+
175+
@Test
176+
public void testExecuteConsumer() throws ExecutionException, InterruptedException {
177+
// Given the HelloWorldService returns Hello world
178+
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("helloBackend");
179+
180+
181+
Decorators.ofConsumer((String input) -> helloWorldService.sayHelloWorldWithName(input))
182+
.withCircuitBreaker(circuitBreaker)
183+
.withBulkhead(Bulkhead.ofDefaults("testName"))
184+
.withRateLimiter(RateLimiter.ofDefaults("testName"))
185+
.accept("test");
186+
187+
CircuitBreaker.Metrics metrics = circuitBreaker.getMetrics();
188+
assertThat(metrics.getNumberOfBufferedCalls()).isEqualTo(1);
189+
assertThat(metrics.getNumberOfSuccessfulCalls()).isEqualTo(1);
190+
191+
// Then the helloWorldService should be invoked 1 time
192+
BDDMockito.then(helloWorldService).should(times(1)).sayHelloWorldWithName("test");
193+
}
194+
138195
@Test
139196
public void testDecorateFunction() {
140197
// Given the HelloWorldService returns Hello world
@@ -145,6 +202,7 @@ public void testDecorateFunction() {
145202
.withCircuitBreaker(circuitBreaker)
146203
.withRetry(Retry.ofDefaults("id"))
147204
.withRateLimiter(RateLimiter.ofDefaults("testName"))
205+
.withBulkhead(Bulkhead.ofDefaults("testName"))
148206
.decorate();
149207

150208
String result = decoratedFunction.apply("Name");
@@ -165,6 +223,7 @@ public void testDecorateCheckedFunction() throws IOException {
165223
.withCircuitBreaker(circuitBreaker)
166224
.withRetry(Retry.ofDefaults("id"))
167225
.withRateLimiter(RateLimiter.ofDefaults("testName"))
226+
.withBulkhead(Bulkhead.ofDefaults("testName"))
168227
.decorate();
169228

170229
String result = Try.of(() -> decoratedFunction.apply("Name")).get();
@@ -185,6 +244,7 @@ public void testDecoratorBuilderWithRetry() {
185244
Supplier<String> decoratedSupplier = Decorators.ofSupplier(() -> helloWorldService.returnHelloWorld())
186245
.withCircuitBreaker(circuitBreaker)
187246
.withRetry(Retry.ofDefaults("id"))
247+
.withBulkhead(Bulkhead.ofDefaults("testName"))
188248
.decorate();
189249

190250
Try.of(decoratedSupplier::get);

resilience4j-documentation/src/docs/asciidoc/introduction.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ Core modules:
1010
* resilience4j-circuitbreaker: Circuit breaking
1111
* resilience4j-ratelimiter: Rate limiting
1212
* resilience4j-bulkhead: Bulkheading
13-
* resilience4j-retry: Automatic retrying
13+
* resilience4j-retry: Automatic retrying (sync and async)
1414
* resilience4j-cache: Response caching
1515
1616
Add-on modules

0 commit comments

Comments
 (0)