Skip to content

Commit fecccea

Browse files
Added support for Micrometer Observations
Signed-off-by: Marcin Grzejszczak <[email protected]>
1 parent 000f6da commit fecccea

20 files changed

+1664
-4
lines changed

build.gradle

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ subprojects {
4242
ext['slf4j.version'] = '1.7.36'
4343
ext['jmh.version'] = '1.35'
4444
ext['junit.version'] = '5.8.1'
45-
ext['micrometer.version'] = '1.8.4'
45+
ext['micrometer.version'] = '1.10.0-RC1'
46+
ext['micrometer-tracing.version'] = '1.0.0-RC1'
4647
ext['assertj.version'] = '3.22.0'
4748
ext['netflix.limits.version'] = '0.3.6'
4849
ext['bouncycastle-bcpkix.version'] = '1.70'
@@ -69,14 +70,15 @@ subprojects {
6970
mavenBom "io.projectreactor:reactor-bom:${ext['reactor-bom.version']}"
7071
mavenBom "io.netty:netty-bom:${ext['netty-bom.version']}"
7172
mavenBom "org.junit:junit-bom:${ext['junit.version']}"
73+
mavenBom "io.micrometer:micrometer-bom:${ext['micrometer.version']}"
74+
mavenBom "io.micrometer:micrometer-tracing-bom:${ext['micrometer-tracing.version']}"
7275
}
7376

7477
dependencies {
7578
dependency "com.netflix.concurrency-limits:concurrency-limits-core:${ext['netflix.limits.version']}"
7679
dependency "ch.qos.logback:logback-classic:${ext['logback.version']}"
7780
dependency "io.netty:netty-tcnative-boringssl-static:${ext['netty-boringssl.version']}"
7881
dependency "org.bouncycastle:bcpkix-jdk15on:${ext['bouncycastle-bcpkix.version']}"
79-
dependency "io.micrometer:micrometer-core:${ext['micrometer.version']}"
8082
dependency "org.assertj:assertj-core:${ext['assertj.version']}"
8183
dependency "org.hdrhistogram:HdrHistogram:${ext['hdrhistogram.version']}"
8284
dependency "org.slf4j:slf4j-api:${ext['slf4j.version']}"
@@ -103,6 +105,7 @@ subprojects {
103105
content {
104106
includeGroup "io.projectreactor"
105107
includeGroup "io.projectreactor.netty"
108+
includeGroup "io.micrometer"
106109
}
107110
}
108111

rsocket-examples/build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ dependencies {
2525
implementation project(':rsocket-transport-netty')
2626

2727
implementation 'com.netflix.concurrency-limits:concurrency-limits-core'
28+
implementation "io.micrometer:micrometer-core"
29+
implementation "io.micrometer:micrometer-tracing"
30+
implementation project(":rsocket-micrometer")
31+
testImplementation 'org.awaitility:awaitility'
2832

2933
runtimeOnly 'ch.qos.logback:logback-classic'
3034

@@ -33,6 +37,8 @@ dependencies {
3337
testImplementation 'org.mockito:mockito-core'
3438
testImplementation 'org.assertj:assertj-core'
3539
testImplementation 'io.projectreactor:reactor-test'
40+
testImplementation "io.micrometer:micrometer-test"
41+
testImplementation "io.micrometer:micrometer-tracing-integration-test"
3642

3743
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
3844
}
Lines changed: 246 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,246 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.integration.observation;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import io.micrometer.core.instrument.MeterRegistry;
22+
import io.micrometer.core.instrument.Tag;
23+
import io.micrometer.core.instrument.Tags;
24+
import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
25+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
26+
import io.micrometer.core.tck.MeterRegistryAssert;
27+
import io.micrometer.observation.Observation;
28+
import io.micrometer.observation.ObservationHandler;
29+
import io.micrometer.observation.ObservationRegistry;
30+
import io.micrometer.tracing.test.SampleTestRunner;
31+
import io.micrometer.tracing.test.reporter.BuildingBlocks;
32+
import io.micrometer.tracing.test.simple.SpansAssert;
33+
import io.rsocket.Payload;
34+
import io.rsocket.RSocket;
35+
import io.rsocket.core.RSocketConnector;
36+
import io.rsocket.core.RSocketServer;
37+
import io.rsocket.micrometer.observation.ByteBufGetter;
38+
import io.rsocket.micrometer.observation.ByteBufSetter;
39+
import io.rsocket.micrometer.observation.ObservationRequesterRSocketProxy;
40+
import io.rsocket.micrometer.observation.ObservationResponderRSocketProxy;
41+
import io.rsocket.micrometer.observation.RSocketRequesterTracingObservationHandler;
42+
import io.rsocket.micrometer.observation.RSocketResponderTracingObservationHandler;
43+
import io.rsocket.plugins.RSocketInterceptor;
44+
import io.rsocket.transport.netty.client.TcpClientTransport;
45+
import io.rsocket.transport.netty.server.CloseableChannel;
46+
import io.rsocket.transport.netty.server.TcpServerTransport;
47+
import io.rsocket.util.DefaultPayload;
48+
import java.time.Duration;
49+
import java.util.Deque;
50+
import java.util.concurrent.atomic.AtomicInteger;
51+
import java.util.function.BiConsumer;
52+
import org.awaitility.Awaitility;
53+
import org.junit.jupiter.api.AfterEach;
54+
import org.reactivestreams.Publisher;
55+
import reactor.core.publisher.Flux;
56+
import reactor.core.publisher.Mono;
57+
58+
public class ObservationIntegrationTest extends SampleTestRunner {
59+
private static final MeterRegistry registry = new SimpleMeterRegistry();
60+
private static final ObservationRegistry observationRegistry = ObservationRegistry.create();
61+
62+
static {
63+
observationRegistry
64+
.observationConfig()
65+
.observationHandler(new DefaultMeterObservationHandler(registry));
66+
}
67+
68+
private final RSocketInterceptor requesterInterceptor;
69+
private final RSocketInterceptor responderInterceptor;
70+
71+
ObservationIntegrationTest() {
72+
super(SampleRunnerConfig.builder().build());
73+
requesterInterceptor =
74+
reactiveSocket -> new ObservationRequesterRSocketProxy(reactiveSocket, observationRegistry);
75+
76+
responderInterceptor =
77+
reactiveSocket -> new ObservationResponderRSocketProxy(reactiveSocket, observationRegistry);
78+
}
79+
80+
private CloseableChannel server;
81+
private RSocket client;
82+
private AtomicInteger counter;
83+
84+
@Override
85+
public BiConsumer<BuildingBlocks, Deque<ObservationHandler<? extends Observation.Context>>>
86+
customizeObservationHandlers() {
87+
return (buildingBlocks, observationHandlers) -> {
88+
observationHandlers.addFirst(
89+
new RSocketRequesterTracingObservationHandler(
90+
buildingBlocks.getTracer(),
91+
buildingBlocks.getPropagator(),
92+
new ByteBufSetter(),
93+
false));
94+
observationHandlers.addFirst(
95+
new RSocketResponderTracingObservationHandler(
96+
buildingBlocks.getTracer(),
97+
buildingBlocks.getPropagator(),
98+
new ByteBufGetter(),
99+
false));
100+
};
101+
}
102+
103+
@AfterEach
104+
public void teardown() {
105+
if (server != null) {
106+
server.dispose();
107+
}
108+
}
109+
110+
private void testRequest() {
111+
counter.set(0);
112+
client.requestResponse(DefaultPayload.create("REQUEST", "META")).block();
113+
assertThat(counter).as("Server did not see the request.").hasValue(1);
114+
}
115+
116+
private void testStream() {
117+
counter.set(0);
118+
client.requestStream(DefaultPayload.create("start")).blockLast();
119+
120+
assertThat(counter).as("Server did not see the request.").hasValue(1);
121+
}
122+
123+
private void testRequestChannel() {
124+
counter.set(0);
125+
client.requestChannel(Mono.just(DefaultPayload.create("start"))).blockFirst();
126+
assertThat(counter).as("Server did not see the request.").hasValue(1);
127+
}
128+
129+
private void testFireAndForget() {
130+
counter.set(0);
131+
client.fireAndForget(DefaultPayload.create("start")).subscribe();
132+
Awaitility.await().atMost(Duration.ofSeconds(50)).until(() -> counter.get() == 1);
133+
assertThat(counter).as("Server did not see the request.").hasValue(1);
134+
}
135+
136+
@Override
137+
public SampleTestRunnerConsumer yourCode() {
138+
return (bb, meterRegistry) -> {
139+
counter = new AtomicInteger();
140+
server =
141+
RSocketServer.create(
142+
(setup, sendingSocket) -> {
143+
sendingSocket.onClose().subscribe();
144+
145+
return Mono.just(
146+
new RSocket() {
147+
@Override
148+
public Mono<Payload> requestResponse(Payload payload) {
149+
payload.release();
150+
counter.incrementAndGet();
151+
return Mono.just(DefaultPayload.create("RESPONSE", "METADATA"));
152+
}
153+
154+
@Override
155+
public Flux<Payload> requestStream(Payload payload) {
156+
payload.release();
157+
counter.incrementAndGet();
158+
return Flux.range(1, 10_000)
159+
.map(i -> DefaultPayload.create("data -> " + i));
160+
}
161+
162+
@Override
163+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
164+
counter.incrementAndGet();
165+
return Flux.from(payloads);
166+
}
167+
168+
@Override
169+
public Mono<Void> fireAndForget(Payload payload) {
170+
payload.release();
171+
counter.incrementAndGet();
172+
return Mono.empty();
173+
}
174+
});
175+
})
176+
.interceptors(registry -> registry.forResponder(responderInterceptor))
177+
.bind(TcpServerTransport.create("localhost", 0))
178+
.block();
179+
180+
client =
181+
RSocketConnector.create()
182+
.interceptors(registry -> registry.forRequester(requesterInterceptor))
183+
.connect(TcpClientTransport.create(server.address()))
184+
.block();
185+
186+
testRequest();
187+
188+
testStream();
189+
190+
testRequestChannel();
191+
192+
testFireAndForget();
193+
194+
// @formatter:off
195+
SpansAssert.assertThat(bb.getFinishedSpans())
196+
.haveSameTraceId()
197+
// "request_*" + "handle" x 4
198+
.hasNumberOfSpansEqualTo(8)
199+
.hasNumberOfSpansWithNameEqualTo("handle", 4)
200+
.forAllSpansWithNameEqualTo("handle", span -> span.hasTagWithKey("rsocket.request-type"))
201+
.hasASpanWithNameIgnoreCase("request_stream")
202+
.thenASpanWithNameEqualToIgnoreCase("request_stream")
203+
.hasTag("rsocket.request-type", "REQUEST_STREAM")
204+
.backToSpans()
205+
.hasASpanWithNameIgnoreCase("request_channel")
206+
.thenASpanWithNameEqualToIgnoreCase("request_channel")
207+
.hasTag("rsocket.request-type", "REQUEST_CHANNEL")
208+
.backToSpans()
209+
.hasASpanWithNameIgnoreCase("request_fnf")
210+
.thenASpanWithNameEqualToIgnoreCase("request_fnf")
211+
.hasTag("rsocket.request-type", "REQUEST_FNF")
212+
.backToSpans()
213+
.hasASpanWithNameIgnoreCase("request_response")
214+
.thenASpanWithNameEqualToIgnoreCase("request_response")
215+
.hasTag("rsocket.request-type", "REQUEST_RESPONSE");
216+
217+
MeterRegistryAssert.assertThat(registry)
218+
.hasTimerWithNameAndTags(
219+
"rsocket.response",
220+
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_RESPONSE")))
221+
.hasTimerWithNameAndTags(
222+
"rsocket.fnf",
223+
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_FNF")))
224+
.hasTimerWithNameAndTags(
225+
"rsocket.request",
226+
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_RESPONSE")))
227+
.hasTimerWithNameAndTags(
228+
"rsocket.channel",
229+
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_CHANNEL")))
230+
.hasTimerWithNameAndTags(
231+
"rsocket.stream",
232+
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_STREAM")));
233+
// @formatter:on
234+
};
235+
}
236+
237+
@Override
238+
protected MeterRegistry getMeterRegistry() {
239+
return registry;
240+
}
241+
242+
@Override
243+
protected ObservationRegistry getObservationRegistry() {
244+
return observationRegistry;
245+
}
246+
}

rsocket-micrometer/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ plugins {
2323
dependencies {
2424
api project(':rsocket-core')
2525
api 'io.micrometer:micrometer-core'
26+
api 'io.micrometer:micrometer-tracing'
2627

2728
implementation 'org.slf4j:slf4j-api'
2829

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2013-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.micrometer.observation;
18+
19+
import io.micrometer.tracing.propagation.Propagator;
20+
import io.netty.buffer.ByteBuf;
21+
import io.netty.util.CharsetUtil;
22+
import io.rsocket.metadata.CompositeMetadata;
23+
24+
public class ByteBufGetter implements Propagator.Getter<ByteBuf> {
25+
26+
@Override
27+
public String get(ByteBuf carrier, String key) {
28+
final CompositeMetadata compositeMetadata = new CompositeMetadata(carrier, false);
29+
for (CompositeMetadata.Entry entry : compositeMetadata) {
30+
if (key.equals(entry.getMimeType())) {
31+
return entry.getContent().toString(CharsetUtil.UTF_8);
32+
}
33+
}
34+
return null;
35+
}
36+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2013-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.micrometer.observation;
18+
19+
import io.micrometer.tracing.propagation.Propagator;
20+
import io.netty.buffer.ByteBufAllocator;
21+
import io.netty.buffer.ByteBufUtil;
22+
import io.netty.buffer.CompositeByteBuf;
23+
import io.rsocket.metadata.CompositeMetadataCodec;
24+
25+
public class ByteBufSetter implements Propagator.Setter<CompositeByteBuf> {
26+
27+
@Override
28+
public void set(CompositeByteBuf carrier, String key, String value) {
29+
final ByteBufAllocator alloc = carrier.alloc();
30+
CompositeMetadataCodec.encodeAndAddMetadataWithCompression(
31+
carrier, alloc, key, ByteBufUtil.writeUtf8(alloc, value));
32+
}
33+
}

0 commit comments

Comments
 (0)