Skip to content

Commit 3a537dc

Browse files
committed
Generify ObservationCollector
Type parameter is the context, e.g. Observation for Micrometer. References #379
1 parent 8bc3f77 commit 3a537dc

File tree

7 files changed

+25
-27
lines changed

7 files changed

+25
-27
lines changed

src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public interface EnvironmentBuilder {
222222
*/
223223
EnvironmentBuilder metricsCollector(MetricsCollector metricsCollector);
224224

225-
EnvironmentBuilder observationCollector(ObservationCollector observationCollector);
225+
EnvironmentBuilder observationCollector(ObservationCollector<?> observationCollector);
226226

227227
/**
228228
* The maximum number of producers allocated to a single connection.

src/main/java/com/rabbitmq/stream/ObservationCollector.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,27 +13,27 @@
1313
1414
package com.rabbitmq.stream;
1515

16-
public interface ObservationCollector {
16+
public interface ObservationCollector<T> {
1717

18-
ObservationCollector NO_OP =
19-
new ObservationCollector() {
18+
ObservationCollector<Void> NO_OP =
19+
new ObservationCollector<Void>() {
2020
@Override
21-
public Object prePublish(String stream, Message message) {
21+
public Void prePublish(String stream, Message message) {
2222
return null;
2323
}
2424

2525
@Override
26-
public void published(Object context, Message message) {}
26+
public void published(Void context, Message message) {}
2727

2828
@Override
2929
public MessageHandler subscribe(MessageHandler handler) {
3030
return handler;
3131
}
3232
};
3333

34-
void published(Object context, Message message);
34+
void published(T context, Message message);
3535

36-
Object prePublish(String stream, Message message);
36+
T prePublish(String stream, Message message);
3737

3838
MessageHandler subscribe(MessageHandler handler);
3939
}

src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ class SimpleMessageAccumulator implements MessageAccumulator {
3232
private final ToLongFunction<Message> publishSequenceFunction;
3333
private final Function<Message, String> filterValueExtractor;
3434
final String stream;
35-
final ObservationCollector observationCollector;
35+
final ObservationCollector<Object> observationCollector;
3636

37+
@SuppressWarnings("unchecked")
3738
SimpleMessageAccumulator(
3839
int capacity,
3940
Codec codec,
@@ -42,7 +43,7 @@ class SimpleMessageAccumulator implements MessageAccumulator {
4243
Function<Message, String> filterValueExtractor,
4344
Clock clock,
4445
String stream,
45-
ObservationCollector observationCollector) {
46+
ObservationCollector<?> observationCollector) {
4647
this.capacity = capacity;
4748
this.messages = new LinkedBlockingQueue<>(capacity);
4849
this.codec = codec;
@@ -52,7 +53,7 @@ class SimpleMessageAccumulator implements MessageAccumulator {
5253
filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor;
5354
this.clock = clock;
5455
this.stream = stream;
55-
this.observationCollector = observationCollector;
56+
this.observationCollector = (ObservationCollector<Object>) observationCollector;
5657
}
5758

5859
public boolean add(Message message, ConfirmationHandler confirmationHandler) {

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ class StreamEnvironment implements Environment {
8585
private final Runnable locatorInitializationSequence;
8686
private final List<Locator> locators = new CopyOnWriteArrayList<>();
8787
private final ExecutorServiceFactory executorServiceFactory;
88-
private final ObservationCollector observationCollector;
88+
private final ObservationCollector<?> observationCollector;
8989

9090
StreamEnvironment(
9191
ScheduledExecutorService scheduledExecutorService,
@@ -102,7 +102,7 @@ class StreamEnvironment implements Environment {
102102
boolean lazyInit,
103103
Function<ClientConnectionType, String> connectionNamingStrategy,
104104
Function<Client.ClientParameters, Client> clientFactory,
105-
ObservationCollector observationCollector) {
105+
ObservationCollector<?> observationCollector) {
106106
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
107107
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
108108
this.byteBufAllocator = byteBufAllocator;
@@ -646,7 +646,7 @@ CompressionCodecFactory compressionCodecFactory() {
646646
return this.clientParametersPrototype.compressionCodecFactory;
647647
}
648648

649-
ObservationCollector observationCollector() {
649+
ObservationCollector<?> observationCollector() {
650650
return this.observationCollector;
651651
}
652652

src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
6363
private CompressionCodecFactory compressionCodecFactory;
6464
private boolean lazyInit = false;
6565
private Function<Client.ClientParameters, Client> clientFactory = Client::new;
66-
private ObservationCollector observationCollector = ObservationCollector.NO_OP;
66+
private ObservationCollector<?> observationCollector = ObservationCollector.NO_OP;
6767

6868
public StreamEnvironmentBuilder() {}
6969

@@ -282,7 +282,7 @@ StreamEnvironmentBuilder clientFactory(Function<Client.ClientParameters, Client>
282282
}
283283

284284
@Override
285-
public EnvironmentBuilder observationCollector(ObservationCollector observationCollector) {
285+
public EnvironmentBuilder observationCollector(ObservationCollector<?> observationCollector) {
286286
this.observationCollector = observationCollector;
287287
return this;
288288
}

src/main/java/com/rabbitmq/stream/observation/micrometer/MicrometerObservationCollector.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import io.micrometer.observation.Observation;
1818
import io.micrometer.observation.ObservationRegistry;
1919

20-
class MicrometerObservationCollector implements ObservationCollector {
20+
class MicrometerObservationCollector implements ObservationCollector<Observation> {
2121

2222
private final ObservationRegistry registry;
2323
private final PublishObservationConvention customPublishConvention, defaultPublishConvention;
@@ -37,19 +37,16 @@ public MicrometerObservationCollector(
3737
}
3838

3939
@Override
40-
public void published(Object context, Message message) {
41-
if (context instanceof Observation) {
42-
Observation observation = (Observation) context;
43-
try {
44-
observation.stop();
45-
} catch (Exception e) {
46-
// TODO log error
47-
}
40+
public void published(Observation observation, Message message) {
41+
try {
42+
observation.stop();
43+
} catch (Exception e) {
44+
// TODO log error
4845
}
4946
}
5047

5148
@Override
52-
public Object prePublish(String stream, Message message) {
49+
public Observation prePublish(String stream, Message message) {
5350
PublishContext context = new PublishContext(stream, message);
5451
Observation observation =
5552
StreamObservationDocumentation.PUBLISH_OBSERVATION.observation(

src/test/java/com/rabbitmq/stream/impl/StreamProducerUnitTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ void init() {
113113
when(env.locatorOperation(any())).thenCallRealMethod();
114114
when(env.clock()).thenReturn(clock);
115115
when(env.codec()).thenReturn(new SimpleCodec());
116-
when(env.observationCollector()).thenReturn(ObservationCollector.NO_OP);
116+
when(env.observationCollector()).thenAnswer(invocation -> ObservationCollector.NO_OP);
117117
doAnswer(
118118
(Answer<Runnable>)
119119
invocationOnMock -> {

0 commit comments

Comments
 (0)