Skip to content

Commit 46f074d

Browse files
committed
Add tests to observation support
Test with 2 supported codecs. Test a message without tracing info is consumed correctly. References #379
1 parent 8277816 commit 46f074d

File tree

1 file changed

+88
-71
lines changed

1 file changed

+88
-71
lines changed

src/test/java/com/rabbitmq/stream/observation/micrometer/MicrometerObservationCollectorTest.java

Lines changed: 88 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed;
1818
import static com.rabbitmq.stream.impl.TestUtils.localhost;
1919
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
20+
import static io.micrometer.tracing.test.simple.SpanAssert.assertThat;
2021
import static org.assertj.core.api.Assertions.assertThat;
2122

2223
import com.rabbitmq.stream.*;
24+
import com.rabbitmq.stream.codec.QpidProtonCodec;
25+
import com.rabbitmq.stream.codec.SwiftMqCodec;
2326
import com.rabbitmq.stream.impl.TestUtils;
2427
import io.micrometer.tracing.test.SampleTestRunner;
25-
import io.micrometer.tracing.test.simple.SpanAssert;
28+
import io.micrometer.tracing.test.reporter.BuildingBlocks;
2629
import io.micrometer.tracing.test.simple.SpansAssert;
2730
import io.netty.channel.EventLoopGroup;
2831
import java.nio.charset.StandardCharsets;
@@ -48,26 +51,96 @@ EnvironmentBuilder environmentBuilder() {
4851
.addressResolver(add -> localhost());
4952
}
5053

54+
ObservationCollector<?> observationCollector() {
55+
return new MicrometerObservationCollectorBuilder().registry(getObservationRegistry()).build();
56+
}
57+
5158
@Override
5259
public TracingSetup[] getTracingSetup() {
5360
return new TracingSetup[] {TracingSetup.IN_MEMORY_BRAVE, TracingSetup.ZIPKIN_BRAVE};
5461
}
62+
63+
void publishConsume(Codec codec, BuildingBlocks buildingBlocks) throws Exception {
64+
try (Environment env =
65+
environmentBuilder().codec(codec).observationCollector(observationCollector()).build()) {
66+
Producer producer = env.producerBuilder().stream(stream).build();
67+
CountDownLatch publishLatch = new CountDownLatch(1);
68+
producer.send(
69+
producer.messageBuilder().addData(PAYLOAD).build(), status -> publishLatch.countDown());
70+
71+
assertThat(publishLatch).is(completed());
72+
73+
CountDownLatch consumeLatch = new CountDownLatch(1);
74+
env.consumerBuilder().stream(stream)
75+
.offset(first())
76+
.messageHandler((ctx, msg) -> consumeLatch.countDown())
77+
.build();
78+
79+
assertThat(consumeLatch).is(completed());
80+
81+
waitAtMost(() -> buildingBlocks.getFinishedSpans().size() == 2);
82+
83+
SpansAssert.assertThat(buildingBlocks.getFinishedSpans()).haveSameTraceId().hasSize(2);
84+
assertThat(buildingBlocks.getFinishedSpans().get(0))
85+
.hasNameEqualTo(stream + " publish")
86+
.hasTag("messaging.destination.name", stream)
87+
.hasTag("messaging.message.payload_size_bytes", String.valueOf(PAYLOAD.length))
88+
.hasTag("net.protocol.name", "rabbitmq-stream")
89+
.hasTag("net.protocol.version", "1.0");
90+
assertThat(buildingBlocks.getFinishedSpans().get(1))
91+
.hasNameEqualTo(stream + " process")
92+
.hasTag("messaging.destination.name", stream)
93+
.hasTag("messaging.source.name", stream)
94+
.hasTag("messaging.message.payload_size_bytes", String.valueOf(PAYLOAD.length))
95+
.hasTag("net.protocol.name", "rabbitmq-stream")
96+
.hasTag("net.protocol.version", "1.0");
97+
waitAtMost(
98+
() ->
99+
getMeterRegistry().find("rabbitmq.stream.publish").timer() != null
100+
&& getMeterRegistry().find("rabbitmq.stream.process").timer() != null);
101+
getMeterRegistry()
102+
.get("rabbitmq.stream.publish")
103+
.tag("messaging.operation", "publish")
104+
.tag("messaging.system", "rabbitmq")
105+
.timer();
106+
getMeterRegistry()
107+
.get("rabbitmq.stream.process")
108+
.tag("messaging.operation", "process")
109+
.tag("messaging.system", "rabbitmq")
110+
.timer();
111+
}
112+
}
55113
}
56114

57115
@Nested
58-
class PublishConsume extends IntegrationTest {
116+
class PublishConsumeQpidCodec extends IntegrationTest {
117+
118+
@Override
119+
public SampleTestRunnerConsumer yourCode() {
120+
return (buildingBlocks, meterRegistry) ->
121+
publishConsume(new QpidProtonCodec(), buildingBlocks);
122+
}
123+
}
124+
125+
@Nested
126+
class PublishConsumeSwiftMqCodec extends IntegrationTest {
127+
128+
@Override
129+
public SampleTestRunnerConsumer yourCode() {
130+
return (buildingBlocks, meterRegistry) -> publishConsume(new SwiftMqCodec(), buildingBlocks);
131+
}
132+
}
133+
134+
@Nested
135+
class ConsumeWithoutObservationShouldNotFail extends IntegrationTest {
59136

60137
@Override
61138
public SampleTestRunnerConsumer yourCode() {
62139
return (buildingBlocks, meterRegistry) -> {
63-
try (Environment env =
64-
environmentBuilder()
65-
.observationCollector(
66-
new MicrometerObservationCollectorBuilder()
67-
.registry(getObservationRegistry())
68-
.build())
69-
.build()) {
70-
Producer producer = env.producerBuilder().stream(stream).build();
140+
try (Environment publishEnv = environmentBuilder().build();
141+
Environment consumeEnv =
142+
environmentBuilder().observationCollector(observationCollector()).build()) {
143+
Producer producer = publishEnv.producerBuilder().stream(stream).build();
71144
CountDownLatch publishLatch = new CountDownLatch(1);
72145
producer.send(
73146
producer.messageBuilder().addData(PAYLOAD).build(),
@@ -76,86 +149,30 @@ public SampleTestRunnerConsumer yourCode() {
76149
assertThat(publishLatch).is(completed());
77150

78151
CountDownLatch consumeLatch = new CountDownLatch(1);
79-
env.consumerBuilder().stream(stream)
152+
consumeEnv.consumerBuilder().stream(stream)
80153
.offset(first())
81154
.messageHandler((ctx, msg) -> consumeLatch.countDown())
82155
.build();
83156

84157
assertThat(consumeLatch).is(completed());
85158

86-
waitAtMost(() -> buildingBlocks.getFinishedSpans().size() == 2);
159+
waitAtMost(() -> buildingBlocks.getFinishedSpans().size() == 1);
87160

88-
SpansAssert.assertThat(buildingBlocks.getFinishedSpans()).haveSameTraceId().hasSize(2);
89-
SpanAssert.assertThat(buildingBlocks.getFinishedSpans().get(0))
90-
.hasNameEqualTo(stream + " publish")
91-
.hasTag("messaging.destination.name", stream)
92-
.hasTag("messaging.message.payload_size_bytes", String.valueOf(PAYLOAD.length))
93-
.hasTag("net.protocol.name", "rabbitmq-stream")
94-
.hasTag("net.protocol.version", "1.0");
95-
SpanAssert.assertThat(buildingBlocks.getFinishedSpans().get(1))
161+
SpansAssert.assertThat(buildingBlocks.getFinishedSpans()).haveSameTraceId().hasSize(1);
162+
assertThat(buildingBlocks.getFinishedSpans().get(0))
96163
.hasNameEqualTo(stream + " process")
97164
.hasTag("messaging.destination.name", stream)
98165
.hasTag("messaging.source.name", stream)
99166
.hasTag("messaging.message.payload_size_bytes", String.valueOf(PAYLOAD.length))
100167
.hasTag("net.protocol.name", "rabbitmq-stream")
101168
.hasTag("net.protocol.version", "1.0");
102-
waitAtMost(
103-
() ->
104-
getMeterRegistry().find("rabbitmq.stream.publish").timer() != null
105-
&& getMeterRegistry().find("rabbitmq.stream.process").timer() != null);
106-
getMeterRegistry()
107-
.get("rabbitmq.stream.publish")
108-
.tag("messaging.operation", "publish")
109-
.tag("messaging.system", "rabbitmq")
110-
.timer();
169+
waitAtMost(() -> getMeterRegistry().find("rabbitmq.stream.process").timer() != null);
111170
getMeterRegistry()
112171
.get("rabbitmq.stream.process")
113172
.tag("messaging.operation", "process")
114173
.tag("messaging.system", "rabbitmq")
115174
.timer();
116175
}
117-
118-
/*
119-
assertThat(consumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
120-
waitAtMost(() -> buildingBlocks.getFinishedSpans().size() == 2);
121-
SpansAssert.assertThat(buildingBlocks.getFinishedSpans()).haveSameTraceId().hasSize(2);
122-
SpanAssert.assertThat(buildingBlocks.getFinishedSpans().get(0))
123-
.hasNameEqualTo("metrics.queue publish")
124-
.hasTag("messaging.rabbitmq.destination.routing_key", "metrics.queue")
125-
.hasTag("messaging.destination.name", "amq.default")
126-
.hasTag("messaging.message.payload_size_bytes", String.valueOf(PAYLOAD.length))
127-
.hasTagWithKey("net.sock.peer.addr")
128-
.hasTag("net.sock.peer.port", "5672")
129-
.hasTag("net.protocol.name", "amqp")
130-
.hasTag("net.protocol.version", "0.9.1");
131-
SpanAssert.assertThat(buildingBlocks.getFinishedSpans().get(1))
132-
.hasNameEqualTo("metrics.queue process")
133-
.hasTag("messaging.rabbitmq.destination.routing_key", "metrics.queue")
134-
.hasTag("messaging.destination.name", "amq.default")
135-
.hasTag("messaging.source.name", "metrics.queue")
136-
.hasTag("messaging.message.payload_size_bytes", String.valueOf(PAYLOAD.length))
137-
.hasTag("net.protocol.name", "amqp")
138-
.hasTag("net.protocol.version", "0.9.1");
139-
waitAtMost(
140-
() ->
141-
getMeterRegistry().find("rabbitmq.publish").timer() != null
142-
&& getMeterRegistry().find("rabbitmq.process").timer() != null);
143-
getMeterRegistry()
144-
.get("rabbitmq.publish")
145-
.tag("messaging.operation", "publish")
146-
.tag("messaging.system", "rabbitmq")
147-
.timer();
148-
getMeterRegistry()
149-
.get("rabbitmq.process")
150-
.tag("messaging.operation", "process")
151-
.tag("messaging.system", "rabbitmq")
152-
.timer();
153-
} finally {
154-
safeClose(publishConnection);
155-
safeClose(consumeConnection);
156-
}
157-
158-
*/
159176
};
160177
}
161178
}

0 commit comments

Comments
 (0)