Skip to content

Commit 6e4cb87

Browse files
authored
Issue ReactiveX#273: Added remove method to AbstractRegistry. Issue ReactiveX#273: Added replace method to AbstractRegistry. Issue ReactiveX#327: MicroMeter tagged Metric classes are automatically updated when an entry is added, removed or replaced in AbstractRegistry.
1 parent 3665c70 commit 6e4cb87

File tree

51 files changed

+1078
-485
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1078
-485
lines changed

resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/FixedThreadPoolBulkhead.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,19 +204,19 @@ private class BulkheadEventProcessor extends EventProcessor<BulkheadEvent> imple
204204

205205
@Override
206206
public ThreadPoolBulkheadEventPublisher onCallPermitted(EventConsumer<BulkheadOnCallPermittedEvent> onCallPermittedEventConsumer) {
207-
registerConsumer(BulkheadOnCallPermittedEvent.class, onCallPermittedEventConsumer);
207+
registerConsumer(BulkheadOnCallPermittedEvent.class.getSimpleName(), onCallPermittedEventConsumer);
208208
return this;
209209
}
210210

211211
@Override
212212
public ThreadPoolBulkheadEventPublisher onCallRejected(EventConsumer<BulkheadOnCallRejectedEvent> onCallRejectedEventConsumer) {
213-
registerConsumer(BulkheadOnCallRejectedEvent.class, onCallRejectedEventConsumer);
213+
registerConsumer(BulkheadOnCallRejectedEvent.class.getSimpleName(), onCallRejectedEventConsumer);
214214
return this;
215215
}
216216

217217
@Override
218218
public ThreadPoolBulkheadEventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> onCallFinishedEventConsumer) {
219-
registerConsumer(BulkheadOnCallFinishedEvent.class, onCallFinishedEventConsumer);
219+
registerConsumer(BulkheadOnCallFinishedEvent.class.getSimpleName(), onCallFinishedEventConsumer);
220220
return this;
221221
}
222222

resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/InMemoryBulkheadRegistry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.github.resilience4j.bulkhead.Bulkhead;
2222
import io.github.resilience4j.bulkhead.BulkheadConfig;
2323
import io.github.resilience4j.bulkhead.BulkheadRegistry;
24-
import io.github.resilience4j.core.AbstractRegistry;
24+
import io.github.resilience4j.core.registry.AbstractRegistry;
2525
import io.github.resilience4j.core.ConfigurationNotFoundException;
2626
import io.vavr.collection.Array;
2727
import io.vavr.collection.Seq;
@@ -62,7 +62,7 @@ public InMemoryBulkheadRegistry(BulkheadConfig defaultConfig) {
6262
*/
6363
@Override
6464
public Seq<Bulkhead> getAllBulkheads() {
65-
return Array.ofAll(targetMap.values());
65+
return Array.ofAll(entryMap.values());
6666
}
6767

6868
/**

resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/InMemoryThreadPoolBulkheadRegistry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
2222
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadConfig;
2323
import io.github.resilience4j.bulkhead.ThreadPoolBulkheadRegistry;
24-
import io.github.resilience4j.core.AbstractRegistry;
24+
import io.github.resilience4j.core.registry.AbstractRegistry;
2525
import io.github.resilience4j.core.ConfigurationNotFoundException;
2626
import io.vavr.collection.Array;
2727
import io.vavr.collection.Seq;
@@ -62,7 +62,7 @@ public InMemoryThreadPoolBulkheadRegistry(ThreadPoolBulkheadConfig defaultConfig
6262
*/
6363
@Override
6464
public Seq<ThreadPoolBulkhead> getAllBulkheads() {
65-
return Array.ofAll(targetMap.values());
65+
return Array.ofAll(entryMap.values());
6666
}
6767

6868
/**

resilience4j-bulkhead/src/main/java/io/github/resilience4j/bulkhead/internal/SemaphoreBulkhead.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,19 +173,19 @@ private class BulkheadEventProcessor extends EventProcessor<BulkheadEvent> imple
173173

174174
@Override
175175
public EventPublisher onCallPermitted(EventConsumer<BulkheadOnCallPermittedEvent> onCallPermittedEventConsumer) {
176-
registerConsumer(BulkheadOnCallPermittedEvent.class, onCallPermittedEventConsumer);
176+
registerConsumer(BulkheadOnCallPermittedEvent.class.getSimpleName(), onCallPermittedEventConsumer);
177177
return this;
178178
}
179179

180180
@Override
181181
public EventPublisher onCallRejected(EventConsumer<BulkheadOnCallRejectedEvent> onCallRejectedEventConsumer) {
182-
registerConsumer(BulkheadOnCallRejectedEvent.class, onCallRejectedEventConsumer);
182+
registerConsumer(BulkheadOnCallRejectedEvent.class.getSimpleName(), onCallRejectedEventConsumer);
183183
return this;
184184
}
185185

186186
@Override
187187
public EventPublisher onCallFinished(EventConsumer<BulkheadOnCallFinishedEvent> onCallFinishedEventConsumer) {
188-
registerConsumer(BulkheadOnCallFinishedEvent.class, onCallFinishedEventConsumer);
188+
registerConsumer(BulkheadOnCallFinishedEvent.class.getSimpleName(), onCallFinishedEventConsumer);
189189
return this;
190190
}
191191

resilience4j-bulkhead/src/test/java/io/github/resilience4j/bulkhead/BulkheadRegistryTest.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,31 +20,26 @@
2020

2121
import org.junit.Before;
2222
import org.junit.Test;
23-
import org.mockito.BDDMockito;
2423
import org.slf4j.Logger;
2524

2625
import java.util.HashMap;
2726
import java.util.Map;
28-
import java.util.function.Consumer;
2927

3028
import static org.assertj.core.api.BDDAssertions.assertThat;
3129
import static org.mockito.Mockito.mock;
32-
import static org.mockito.Mockito.times;
3330

3431

3532
public class BulkheadRegistryTest {
3633

3734
private BulkheadConfig config;
3835
private BulkheadRegistry registry;
3936
private Logger LOGGER;
40-
private Consumer<Bulkhead> post_consumer = circuitBreaker -> LOGGER.info("invoking the post consumer1");
4137

4238
@Before
4339
public void setUp() {
4440
LOGGER = mock(Logger.class);
4541
// registry with default config
4642
registry = BulkheadRegistry.ofDefaults();
47-
registry.registerPostCreationConsumer(post_consumer);
4843
// registry with custom config
4944
config = BulkheadConfig.custom()
5045
.maxConcurrentCalls(100)
@@ -71,7 +66,6 @@ public void shouldReturnTheCorrectName() {
7166
assertThat(bulkhead.getName()).isEqualTo("test");
7267
assertThat(bulkhead.getBulkheadConfig().getMaxConcurrentCalls()).isEqualTo(25);
7368
assertThat(bulkhead.getMetrics().getAvailableConcurrentCalls()).isEqualTo(25);
74-
BDDMockito.then(LOGGER).should(times(1)).info("invoking the post consumer1");
7569
}
7670

7771
@Test
@@ -82,7 +76,6 @@ public void shouldBeTheSameInstance() {
8276

8377
assertThat(bulkhead1).isSameAs(bulkhead2);
8478
assertThat(registry.getAllBulkheads()).hasSize(1);
85-
BDDMockito.then(LOGGER).should(times(1)).info("invoking the post consumer1");
8679
}
8780

8881
@Test
@@ -93,7 +86,6 @@ public void shouldBeNotTheSameInstance() {
9386

9487
assertThat(bulkhead1).isNotSameAs(bulkhead2);
9588
assertThat(registry.getAllBulkheads()).hasSize(2);
96-
BDDMockito.then(LOGGER).should(times(2)).info("invoking the post consumer1");
9789
}
9890

9991
@Test

resilience4j-cache/src/main/java/io/github/resilience4j/cache/internal/CacheImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,19 +127,19 @@ private class CacheEventProcessor extends EventProcessor<CacheEvent> implements
127127

128128
@Override
129129
public EventPublisher onCacheHit(EventConsumer<CacheOnHitEvent> eventConsumer) {
130-
registerConsumer(CacheOnHitEvent.class, eventConsumer);
130+
registerConsumer(CacheOnHitEvent.class.getSimpleName(), eventConsumer);
131131
return this;
132132
}
133133

134134
@Override
135135
public EventPublisher onCacheMiss(EventConsumer<CacheOnMissEvent> eventConsumer) {
136-
registerConsumer(CacheOnMissEvent.class, eventConsumer);
136+
registerConsumer(CacheOnMissEvent.class.getSimpleName(), eventConsumer);
137137
return this;
138138
}
139139

140140
@Override
141141
public EventPublisher onError(EventConsumer<CacheOnErrorEvent> eventConsumer) {
142-
registerConsumer(CacheOnErrorEvent.class, eventConsumer);
142+
registerConsumer(CacheOnErrorEvent.class.getSimpleName(), eventConsumer);
143143
return this;
144144
}
145145

resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/CircuitBreakerStateMachine.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -333,37 +333,37 @@ Clock getClock() {
333333
private class CircuitBreakerEventProcessor extends EventProcessor<CircuitBreakerEvent> implements EventConsumer<CircuitBreakerEvent>, EventPublisher {
334334
@Override
335335
public EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> onSuccessEventConsumer) {
336-
registerConsumer(CircuitBreakerOnSuccessEvent.class, onSuccessEventConsumer);
336+
registerConsumer(CircuitBreakerOnSuccessEvent.class.getSimpleName(), onSuccessEventConsumer);
337337
return this;
338338
}
339339

340340
@Override
341341
public EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> onErrorEventConsumer) {
342-
registerConsumer(CircuitBreakerOnErrorEvent.class, onErrorEventConsumer);
342+
registerConsumer(CircuitBreakerOnErrorEvent.class.getSimpleName(), onErrorEventConsumer);
343343
return this;
344344
}
345345

346346
@Override
347347
public EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> onStateTransitionEventConsumer) {
348-
registerConsumer(CircuitBreakerOnStateTransitionEvent.class, onStateTransitionEventConsumer);
348+
registerConsumer(CircuitBreakerOnStateTransitionEvent.class.getSimpleName(), onStateTransitionEventConsumer);
349349
return this;
350350
}
351351

352352
@Override
353353
public EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> onResetEventConsumer) {
354-
registerConsumer(CircuitBreakerOnResetEvent.class, onResetEventConsumer);
354+
registerConsumer(CircuitBreakerOnResetEvent.class.getSimpleName(), onResetEventConsumer);
355355
return this;
356356
}
357357

358358
@Override
359359
public EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> onIgnoredErrorEventConsumer) {
360-
registerConsumer(CircuitBreakerOnIgnoredErrorEvent.class, onIgnoredErrorEventConsumer);
360+
registerConsumer(CircuitBreakerOnIgnoredErrorEvent.class.getSimpleName(), onIgnoredErrorEventConsumer);
361361
return this;
362362
}
363363

364364
@Override
365365
public EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> onCallNotPermittedEventConsumer) {
366-
registerConsumer(CircuitBreakerOnCallNotPermittedEvent.class, onCallNotPermittedEventConsumer);
366+
registerConsumer(CircuitBreakerOnCallNotPermittedEvent.class.getSimpleName(), onCallNotPermittedEventConsumer);
367367
return this;
368368
}
369369

resilience4j-circuitbreaker/src/main/java/io/github/resilience4j/circuitbreaker/internal/InMemoryCircuitBreakerRegistry.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
2222
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
2323
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
24-
import io.github.resilience4j.core.AbstractRegistry;
24+
import io.github.resilience4j.core.registry.AbstractRegistry;
2525
import io.github.resilience4j.core.ConfigurationNotFoundException;
2626
import io.vavr.collection.Array;
2727
import io.vavr.collection.Seq;
@@ -62,7 +62,7 @@ public InMemoryCircuitBreakerRegistry(CircuitBreakerConfig defaultConfig) {
6262
*/
6363
@Override
6464
public Seq<CircuitBreaker> getAllCircuitBreakers() {
65-
return Array.ofAll(targetMap.values());
65+
return Array.ofAll(entryMap.values());
6666
}
6767

6868
/**

resilience4j-circuitbreaker/src/test/java/io/github/resilience4j/circuitbreaker/internal/InMemoryCircuitBreakerRegistryTest.java

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,19 @@
11
package io.github.resilience4j.circuitbreaker.internal;
22

3-
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
4-
import static org.mockito.BDDMockito.then;
5-
import static org.mockito.Mockito.mock;
6-
import static org.mockito.Mockito.times;
7-
8-
import java.util.HashMap;
9-
import java.util.Map;
10-
import java.util.function.Consumer;
11-
3+
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
4+
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
5+
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
6+
import io.github.resilience4j.core.ConfigurationNotFoundException;
127
import org.assertj.core.api.Assertions;
138
import org.junit.Before;
149
import org.junit.Test;
1510
import org.slf4j.Logger;
1611

17-
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
18-
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
19-
import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry;
20-
import io.github.resilience4j.core.ConfigurationNotFoundException;
12+
import java.util.HashMap;
13+
import java.util.Map;
14+
15+
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
16+
import static org.mockito.Mockito.mock;
2117

2218

2319
public class InMemoryCircuitBreakerRegistryTest {
@@ -29,26 +25,6 @@ public void setUp() {
2925
LOGGER = mock(Logger.class);
3026
}
3127

32-
@Test
33-
public void testPostConsumerBeingCalled() {
34-
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();
35-
Consumer<CircuitBreaker> consumer1 = circuitBreaker -> LOGGER.info("invoking the post consumer1");
36-
Consumer<CircuitBreaker> consumer2 = circuitBreaker -> LOGGER.info("invoking the post consumer2");
37-
38-
circuitBreakerRegistry.registerPostCreationConsumer(consumer1);
39-
40-
circuitBreakerRegistry.circuitBreaker("testCircuitBreaker");
41-
circuitBreakerRegistry.circuitBreaker("testCircuitBreaker2", CircuitBreakerConfig.ofDefaults());
42-
circuitBreakerRegistry.circuitBreaker("testCircuitBreaker3", CircuitBreakerConfig::ofDefaults);
43-
44-
then(LOGGER).should(times(3)).info("invoking the post consumer1");
45-
46-
circuitBreakerRegistry.registerPostCreationConsumer(consumer2);
47-
circuitBreakerRegistry.unregisterPostCreationConsumer(consumer1);
48-
circuitBreakerRegistry.circuitBreaker("testCircuitBreaker4");
49-
then(LOGGER).should(times(1)).info("invoking the post consumer2");
50-
}
51-
5228
@Test
5329
public void testAddCircuitBreakerRegistry() {
5430
CircuitBreakerRegistry circuitBreakerRegistry = CircuitBreakerRegistry.ofDefaults();

resilience4j-core/src/main/java/io/github/resilience4j/core/EventProcessor.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,37 +20,47 @@
2020

2121
import io.github.resilience4j.core.lang.Nullable;
2222

23+
import java.util.ArrayList;
24+
import java.util.List;
2325
import java.util.concurrent.ConcurrentHashMap;
2426
import java.util.concurrent.ConcurrentMap;
27+
import java.util.concurrent.CopyOnWriteArrayList;
2528

2629
public class EventProcessor<T> implements EventPublisher<T> {
2730

2831
private boolean consumerRegistered;
29-
@Nullable private EventConsumer<T> onEventConsumer;
30-
private ConcurrentMap<Class<? extends T>, EventConsumer<Object>> eventConsumers = new ConcurrentHashMap<>();
32+
List<EventConsumer<T>> onEventConsumers = new CopyOnWriteArrayList<>();
33+
ConcurrentMap<String, List<EventConsumer<T>>> eventConsumerMap = new ConcurrentHashMap<>();
3134

3235
public boolean hasConsumers(){
3336
return consumerRegistered;
3437
}
3538

3639
@SuppressWarnings("unchecked")
37-
public synchronized <E extends T> void registerConsumer(Class<? extends E> eventType, EventConsumer<E> eventConsumer){
38-
consumerRegistered = true;
39-
eventConsumers.put(eventType, (EventConsumer<Object>) eventConsumer);
40+
public synchronized void registerConsumer(String className, EventConsumer<? extends T> eventConsumer){
41+
this.consumerRegistered = true;
42+
this.eventConsumerMap.compute(className, (k, consumers) -> {
43+
if(consumers == null){
44+
consumers = new ArrayList<>();
45+
consumers.add((EventConsumer<T>) eventConsumer);
46+
return consumers;
47+
}else{
48+
consumers.add((EventConsumer<T>) eventConsumer);
49+
return consumers;
50+
}
51+
});
4052
}
4153

42-
@SuppressWarnings("unchecked")
4354
public <E extends T> boolean processEvent(E event) {
4455
boolean consumed = false;
45-
EventConsumer<T> onEventConsumer = this.onEventConsumer;
46-
if(onEventConsumer != null){
47-
onEventConsumer.consumeEvent(event);
56+
if(!onEventConsumers.isEmpty()){
57+
onEventConsumers.forEach(onEventConsumer -> onEventConsumer.consumeEvent(event));
4858
consumed = true;
4959
}
50-
if(!eventConsumers.isEmpty()){
51-
EventConsumer<T> eventConsumer = (EventConsumer<T>) eventConsumers.get(event.getClass());
52-
if(eventConsumer != null){
53-
eventConsumer.consumeEvent(event);
60+
if(!eventConsumerMap.isEmpty()){
61+
List<EventConsumer<T>> eventConsumers = this.eventConsumerMap.get(event.getClass().getSimpleName());
62+
if(eventConsumers != null && !eventConsumers.isEmpty()){
63+
eventConsumers.forEach(consumer -> consumer.consumeEvent(event));
5464
consumed = true;
5565
}
5666
}
@@ -59,7 +69,7 @@ public <E extends T> boolean processEvent(E event) {
5969

6070
@Override
6171
public synchronized void onEvent(@Nullable EventConsumer<T> onEventConsumer) {
62-
consumerRegistered = true;
63-
this.onEventConsumer = onEventConsumer;
72+
this.consumerRegistered = true;
73+
this.onEventConsumers.add(onEventConsumer);
6474
}
6575
}

0 commit comments

Comments
 (0)