Skip to content

Commit 40c1dbd

Browse files
adds Micrometer Observation API integration (#1056)
Co-authored-by: Marcin Grzejszczak <[email protected]>
1 parent 571af15 commit 40c1dbd

20 files changed

+1675
-2
lines changed

build.gradle

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ subprojects {
4242
ext['slf4j.version'] = '1.7.36'
4343
ext['jmh.version'] = '1.33'
4444
ext['junit.version'] = '5.8.1'
45-
ext['micrometer.version'] = '1.8.4'
45+
ext['micrometer.version'] = '1.10.0-SNAPSHOT'
46+
ext['micrometer-tracing.version'] = '1.0.0-SNAPSHOT'
4647
ext['assertj.version'] = '3.22.0'
4748
ext['netflix.limits.version'] = '0.3.6'
4849
ext['bouncycastle-bcpkix.version'] = '1.70'
@@ -77,6 +78,10 @@ subprojects {
7778
dependency "io.netty:netty-tcnative-boringssl-static:${ext['netty-boringssl.version']}"
7879
dependency "org.bouncycastle:bcpkix-jdk15on:${ext['bouncycastle-bcpkix.version']}"
7980
dependency "io.micrometer:micrometer-core:${ext['micrometer.version']}"
81+
dependency "io.micrometer:micrometer-observation:${ext['micrometer.version']}"
82+
dependency "io.micrometer:micrometer-test:${ext['micrometer.version']}"
83+
dependency "io.micrometer:micrometer-tracing:${ext['micrometer-tracing.version']}"
84+
dependency "io.micrometer:micrometer-tracing-integration-test:${ext['micrometer-tracing.version']}"
8085
dependency "org.assertj:assertj-core:${ext['assertj.version']}"
8186
dependency "org.hdrhistogram:HdrHistogram:${ext['hdrhistogram.version']}"
8287
dependency "org.slf4j:slf4j-api:${ext['slf4j.version']}"
@@ -117,6 +122,7 @@ subprojects {
117122
if (version.endsWith('SNAPSHOT') || project.hasProperty('versionSuffix')) {
118123
maven { url 'https://repo.spring.io/libs-snapshot' }
119124
maven { url 'https://oss.jfrog.org/artifactory/oss-snapshot-local' }
125+
mavenLocal()
120126
}
121127
}
122128

@@ -256,4 +262,31 @@ description = 'RSocket: Stream Oriented Messaging Passing with Reactive Stream S
256262

257263
repositories {
258264
mavenCentral()
265+
266+
maven { url 'https://repo.spring.io/snapshot' }
267+
mavenLocal()
268+
}
269+
270+
configurations {
271+
adoc
272+
}
273+
274+
dependencies {
275+
adoc "io.micrometer:micrometer-docs-generator-spans:1.0.0-SNAPSHOT"
276+
adoc "io.micrometer:micrometer-docs-generator-metrics:1.0.0-SNAPSHOT"
277+
}
278+
279+
task generateObservabilityDocs(dependsOn: ["generateObservabilityMetricsDocs", "generateObservabilitySpansDocs"]) {
280+
}
281+
282+
task generateObservabilityMetricsDocs(type: JavaExec) {
283+
mainClass = "io.micrometer.docs.metrics.DocsFromSources"
284+
classpath configurations.adoc
285+
args project.rootDir.getAbsolutePath(), ".*", project.rootProject.buildDir.getAbsolutePath()
286+
}
287+
288+
task generateObservabilitySpansDocs(type: JavaExec) {
289+
mainClass = "io.micrometer.docs.spans.DocsFromSources"
290+
classpath configurations.adoc
291+
args project.rootDir.getAbsolutePath(), ".*", project.rootProject.buildDir.getAbsolutePath()
259292
}

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313
#
14-
version=1.1.2
14+
version=1.2.0-SNAPSHOT
1515
perfBaselineVersion=1.1.1

rsocket-examples/build.gradle

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ dependencies {
2424
implementation project(':rsocket-transport-local')
2525
implementation project(':rsocket-transport-netty')
2626

27+
implementation "io.micrometer:micrometer-core"
28+
implementation "io.micrometer:micrometer-tracing"
29+
implementation project(":rsocket-micrometer")
30+
testImplementation 'org.awaitility:awaitility'
31+
2732
implementation 'com.netflix.concurrency-limits:concurrency-limits-core'
2833

2934
runtimeOnly 'ch.qos.logback:logback-classic'
@@ -33,6 +38,8 @@ dependencies {
3338
testImplementation 'org.mockito:mockito-core'
3439
testImplementation 'org.assertj:assertj-core'
3540
testImplementation 'io.projectreactor:reactor-test'
41+
testImplementation "io.micrometer:micrometer-test"
42+
testImplementation "io.micrometer:micrometer-tracing-integration-test"
3643

3744
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
3845
}
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
/*
2+
* Copyright 2015-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.integration.observation;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import io.micrometer.core.instrument.MeterRegistry;
22+
import io.micrometer.core.instrument.Tag;
23+
import io.micrometer.core.instrument.Tags;
24+
import io.micrometer.core.instrument.observation.DefaultMeterObservationHandler;
25+
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
26+
import io.micrometer.core.tck.MeterRegistryAssert;
27+
import io.micrometer.observation.Observation;
28+
import io.micrometer.observation.ObservationHandler;
29+
import io.micrometer.observation.ObservationRegistry;
30+
import io.micrometer.tracing.test.SampleTestRunner;
31+
import io.micrometer.tracing.test.reporter.BuildingBlocks;
32+
import io.micrometer.tracing.test.simple.SpansAssert;
33+
import io.rsocket.Payload;
34+
import io.rsocket.RSocket;
35+
import io.rsocket.core.RSocketConnector;
36+
import io.rsocket.core.RSocketServer;
37+
import io.rsocket.micrometer.observation.ByteBufGetter;
38+
import io.rsocket.micrometer.observation.ByteBufSetter;
39+
import io.rsocket.micrometer.observation.ObservationRequesterRSocketProxy;
40+
import io.rsocket.micrometer.observation.ObservationResponderRSocketProxy;
41+
import io.rsocket.micrometer.observation.RSocketRequesterTracingObservationHandler;
42+
import io.rsocket.micrometer.observation.RSocketResponderTracingObservationHandler;
43+
import io.rsocket.plugins.RSocketInterceptor;
44+
import io.rsocket.transport.netty.client.TcpClientTransport;
45+
import io.rsocket.transport.netty.server.CloseableChannel;
46+
import io.rsocket.transport.netty.server.TcpServerTransport;
47+
import io.rsocket.util.DefaultPayload;
48+
import java.time.Duration;
49+
import java.util.Deque;
50+
import java.util.concurrent.atomic.AtomicInteger;
51+
import java.util.function.BiConsumer;
52+
import org.awaitility.Awaitility;
53+
import org.junit.jupiter.api.AfterEach;
54+
import org.reactivestreams.Publisher;
55+
import reactor.core.publisher.Flux;
56+
import reactor.core.publisher.Mono;
57+
58+
public class ObservationIntegrationTest extends SampleTestRunner {
59+
private static final MeterRegistry registry = new SimpleMeterRegistry();
60+
private static final ObservationRegistry observationRegistry = ObservationRegistry.create();
61+
62+
static {
63+
observationRegistry
64+
.observationConfig()
65+
.observationHandler(new DefaultMeterObservationHandler(registry));
66+
}
67+
68+
private final RSocketInterceptor requesterInterceptor;
69+
private final RSocketInterceptor responderInterceptor;
70+
71+
ObservationIntegrationTest() {
72+
super(SampleRunnerConfig.builder().build(), observationRegistry, registry);
73+
requesterInterceptor =
74+
reactiveSocket -> new ObservationRequesterRSocketProxy(reactiveSocket, observationRegistry);
75+
76+
responderInterceptor =
77+
reactiveSocket -> new ObservationResponderRSocketProxy(reactiveSocket, observationRegistry);
78+
}
79+
80+
private CloseableChannel server;
81+
private RSocket client;
82+
private AtomicInteger counter;
83+
84+
@Override
85+
public BiConsumer<BuildingBlocks, Deque<ObservationHandler<? extends Observation.Context>>>
86+
customizeObservationHandlers() {
87+
return (buildingBlocks, observationHandlers) -> {
88+
observationHandlers.addFirst(
89+
new RSocketRequesterTracingObservationHandler(
90+
buildingBlocks.getTracer(),
91+
buildingBlocks.getPropagator(),
92+
new ByteBufSetter(),
93+
false));
94+
observationHandlers.addFirst(
95+
new RSocketResponderTracingObservationHandler(
96+
buildingBlocks.getTracer(),
97+
buildingBlocks.getPropagator(),
98+
new ByteBufGetter(),
99+
false));
100+
};
101+
}
102+
103+
@AfterEach
104+
public void teardown() {
105+
if (server != null) {
106+
server.dispose();
107+
}
108+
}
109+
110+
private void testRequest() {
111+
counter.set(0);
112+
client.requestResponse(DefaultPayload.create("REQUEST", "META")).block();
113+
assertThat(counter).as("Server did not see the request.").hasValue(1);
114+
}
115+
116+
private void testStream() {
117+
counter.set(0);
118+
client.requestStream(DefaultPayload.create("start")).blockLast();
119+
120+
assertThat(counter).as("Server did not see the request.").hasValue(1);
121+
}
122+
123+
private void testRequestChannel() {
124+
counter.set(0);
125+
client.requestChannel(Mono.just(DefaultPayload.create("start"))).blockFirst();
126+
assertThat(counter).as("Server did not see the request.").hasValue(1);
127+
}
128+
129+
private void testFireAndForget() {
130+
counter.set(0);
131+
client.fireAndForget(DefaultPayload.create("start")).subscribe();
132+
Awaitility.await().atMost(Duration.ofSeconds(50)).until(() -> counter.get() == 1);
133+
assertThat(counter).as("Server did not see the request.").hasValue(1);
134+
}
135+
136+
@Override
137+
public SampleTestRunnerConsumer yourCode() {
138+
return (bb, meterRegistry) -> {
139+
counter = new AtomicInteger();
140+
server =
141+
RSocketServer.create(
142+
(setup, sendingSocket) -> {
143+
sendingSocket.onClose().subscribe();
144+
145+
return Mono.just(
146+
new RSocket() {
147+
@Override
148+
public Mono<Payload> requestResponse(Payload payload) {
149+
payload.release();
150+
counter.incrementAndGet();
151+
return Mono.just(DefaultPayload.create("RESPONSE", "METADATA"));
152+
}
153+
154+
@Override
155+
public Flux<Payload> requestStream(Payload payload) {
156+
payload.release();
157+
counter.incrementAndGet();
158+
return Flux.range(1, 10_000)
159+
.map(i -> DefaultPayload.create("data -> " + i));
160+
}
161+
162+
@Override
163+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
164+
counter.incrementAndGet();
165+
return Flux.from(payloads);
166+
}
167+
168+
@Override
169+
public Mono<Void> fireAndForget(Payload payload) {
170+
payload.release();
171+
counter.incrementAndGet();
172+
return Mono.empty();
173+
}
174+
});
175+
})
176+
.interceptors(registry -> registry.forResponder(responderInterceptor))
177+
.bind(TcpServerTransport.create("localhost", 0))
178+
.block();
179+
180+
client =
181+
RSocketConnector.create()
182+
.interceptors(registry -> registry.forRequester(requesterInterceptor))
183+
.connect(TcpClientTransport.create(server.address()))
184+
.block();
185+
186+
testRequest();
187+
188+
testStream();
189+
190+
testRequestChannel();
191+
192+
testFireAndForget();
193+
194+
// @formatter:off
195+
SpansAssert.assertThat(bb.getFinishedSpans())
196+
.haveSameTraceId()
197+
// "request_*" + "handle" x 4
198+
.hasNumberOfSpansEqualTo(8)
199+
.hasNumberOfSpansWithNameEqualTo("handle", 4)
200+
.forAllSpansWithNameEqualTo("handle", span -> span.hasTagWithKey("rsocket.request-type"))
201+
.hasASpanWithNameIgnoreCase("request_stream")
202+
.thenASpanWithNameEqualToIgnoreCase("request_stream")
203+
.hasTag("rsocket.request-type", "REQUEST_STREAM")
204+
.backToSpans()
205+
.hasASpanWithNameIgnoreCase("request_channel")
206+
.thenASpanWithNameEqualToIgnoreCase("request_channel")
207+
.hasTag("rsocket.request-type", "REQUEST_CHANNEL")
208+
.backToSpans()
209+
.hasASpanWithNameIgnoreCase("request_fnf")
210+
.thenASpanWithNameEqualToIgnoreCase("request_fnf")
211+
.hasTag("rsocket.request-type", "REQUEST_FNF")
212+
.backToSpans()
213+
.hasASpanWithNameIgnoreCase("request_response")
214+
.thenASpanWithNameEqualToIgnoreCase("request_response")
215+
.hasTag("rsocket.request-type", "REQUEST_RESPONSE");
216+
217+
MeterRegistryAssert.assertThat(registry)
218+
.hasTimerWithNameAndTags(
219+
"rsocket.response",
220+
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_RESPONSE")))
221+
.hasTimerWithNameAndTags(
222+
"rsocket.fnf",
223+
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_FNF")))
224+
.hasTimerWithNameAndTags(
225+
"rsocket.request",
226+
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_RESPONSE")))
227+
.hasTimerWithNameAndTags(
228+
"rsocket.channel",
229+
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_CHANNEL")))
230+
.hasTimerWithNameAndTags(
231+
"rsocket.stream",
232+
Tags.of(Tag.of("error", "none"), Tag.of("rsocket.request-type", "REQUEST_STREAM")));
233+
// @formatter:on
234+
};
235+
}
236+
}

rsocket-micrometer/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ plugins {
2222

2323
dependencies {
2424
api project(':rsocket-core')
25+
api 'io.micrometer:micrometer-observation'
2526
api 'io.micrometer:micrometer-core'
27+
compileOnly 'io.micrometer:micrometer-tracing'
2628

2729
implementation 'org.slf4j:slf4j-api'
2830

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2013-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.rsocket.micrometer.observation;
18+
19+
import io.micrometer.tracing.propagation.Propagator;
20+
import io.netty.buffer.ByteBuf;
21+
import io.netty.util.CharsetUtil;
22+
import io.rsocket.metadata.CompositeMetadata;
23+
24+
public class ByteBufGetter implements Propagator.Getter<ByteBuf> {
25+
26+
@Override
27+
public String get(ByteBuf carrier, String key) {
28+
final CompositeMetadata compositeMetadata = new CompositeMetadata(carrier, false);
29+
for (CompositeMetadata.Entry entry : compositeMetadata) {
30+
if (key.equals(entry.getMimeType())) {
31+
return entry.getContent().toString(CharsetUtil.UTF_8);
32+
}
33+
}
34+
return null;
35+
}
36+
}

0 commit comments

Comments
 (0)