Skip to content

Commit 11ec03e

Browse files
authored
Merge pull request #384 from rabbitmq/micrometer-observation
Add support for observability Fixes #379
2 parents feeaf81 + 1e7c1c2 commit 11ec03e

36 files changed

+1382
-130
lines changed

ci/publish-documentation-to-github-pages.sh

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@
22

33
. $(pwd)/release-versions.txt
44

5+
./mvnw clean test-compile exec:java \
6+
-Dexec.mainClass=io.micrometer.docs.DocsGeneratorCommand \
7+
-Dexec.classpathScope="test" \
8+
-Dexec.args='src/main/java/com/rabbitmq/stream/observation/micrometer .* target/micrometer-observation-docs'
9+
510
MESSAGE=$(git log -1 --pretty=%B)
6-
./mvnw clean buildnumber:create pre-site --no-transfer-progress
11+
./mvnw buildnumber:create pre-site --no-transfer-progress
712

813
./mvnw javadoc:javadoc -Dmaven.javadoc.skip=false --no-transfer-progress
914

pom.xml

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@
7474
<gson.version>2.10.1</gson.version>
7575
<vavr.version>0.10.4</vavr.version>
7676
<paho.version>1.2.5</paho.version>
77+
<micrometer-tracing-test.version>1.1.3</micrometer-tracing-test.version>
78+
<micrometer-docs-generator.version>1.0.2</micrometer-docs-generator.version>
7779
<maven.compiler.plugin.version>3.11.0</maven.compiler.plugin.version>
7880
<maven-surefire-plugin.version>3.1.2</maven-surefire-plugin.version>
7981
<spring-boot-maven-plugin.version>2.7.6</spring-boot-maven-plugin.version>
@@ -281,7 +283,7 @@
281283
<groupId>org.eclipse.paho</groupId>
282284
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
283285
<version>${paho.version}</version>
284-
<scope>test</scope>
286+
<scope>test</scope>
285287
</dependency>
286288

287289
<dependency>
@@ -312,6 +314,20 @@
312314
<scope>test</scope>
313315
</dependency>
314316

317+
<dependency>
318+
<groupId>io.micrometer</groupId>
319+
<artifactId>micrometer-tracing-integration-test</artifactId>
320+
<version>${micrometer-tracing-test.version}</version>
321+
<scope>test</scope>
322+
</dependency>
323+
324+
<dependency>
325+
<groupId>io.micrometer</groupId>
326+
<artifactId>micrometer-docs-generator</artifactId>
327+
<version>${micrometer-docs-generator.version}</version>
328+
<scope>test</scope>
329+
</dependency>
330+
315331
<dependency>
316332
<groupId>org.openjdk.jmh</groupId>
317333
<artifactId>jmh-core</artifactId>
@@ -393,6 +409,9 @@
393409
<groupId>org.apache.maven.plugins</groupId>
394410
<artifactId>maven-resources-plugin</artifactId>
395411
<version>${maven-resources-plugin.version}</version>
412+
<configuration>
413+
<propertiesEncoding>${project.build.sourceEncoding}</propertiesEncoding>
414+
</configuration>
396415
</plugin>
397416

398417
<plugin>
@@ -457,6 +476,7 @@
457476
<idseparator>-</idseparator>
458477
<source-highlighter>coderay</source-highlighter>
459478
<test-examples>../../test/java/com/rabbitmq/stream/docs</test-examples>
479+
<build-directory>${project.build.directory}</build-directory>
460480
</attributes>
461481

462482
</configuration>

src/docs/asciidoc/appendixes.adoc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
ifndef::build-directory[:build-directory: ../../../target]
2+
:test-examples: ../../test/java/com/rabbitmq/stream/docs
3+
4+
[appendix]
5+
== Micrometer Observation
6+
7+
It is possible to use https://micrometer.io/docs/observation[Micrometer Observation] to instrument publishing and consuming in the stream Java client.
8+
Micrometer Observation provides https://spring.io/blog/2022/10/12/observability-with-spring-boot-3[metrics, tracing, and log correlation with one single API].
9+
10+
The stream Java client provides an `ObservationCollector` abstraction and an implementation for Micrometer Observation.
11+
The following snippet shows how to create and set up the Micrometer `ObservationCollector` implementation with an existing `ObservationRegistry`:
12+
13+
.Configuring Micrometer Observation
14+
[source,java,indent=0]
15+
--------
16+
include::{test-examples}/EnvironmentUsage.java[tag=micrometer-observation]
17+
--------
18+
<1> Configure Micrometer `ObservationCollector` with builder
19+
<2> Set Micrometer `ObservationRegistry`
20+
21+
The next sections document the conventions, spans, and metrics made available by the instrumentation.
22+
They are automatically generated from the source code with the https://github.com/micrometer-metrics/micrometer-docs-generator[Micrometer documentation generator].
23+
24+
include::{build-directory}/micrometer-observation-docs/_conventions.adoc[]
25+
include::{build-directory}/micrometer-observation-docs/_spans.adoc[]
26+
include::{build-directory}/micrometer-observation-docs/_metrics.adoc[]

src/docs/asciidoc/index.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
= RabbitMQ Stream Java Client
22
:revnumber: {project-version}
33
:revremark: ({build-number})
4+
:appendix-caption: Appendix
45
ifndef::imagesdir[:imagesdir: images]
56
ifndef::sourcedir[:sourcedir: ../../main/java]
67
:source-highlighter: prettify
@@ -28,4 +29,6 @@ include::advanced-topics.adoc[]
2829

2930
include::building.adoc[]
3031

31-
include::performance-tool.adoc[]
32+
include::performance-tool.adoc[]
33+
34+
include::appendixes.adoc[]

src/main/java/com/rabbitmq/stream/Codec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").

src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@ public interface EnvironmentBuilder {
222222
*/
223223
EnvironmentBuilder metricsCollector(MetricsCollector metricsCollector);
224224

225+
EnvironmentBuilder observationCollector(ObservationCollector<?> observationCollector);
226+
225227
/**
226228
* The maximum number of producers allocated to a single connection.
227229
*

src/main/java/com/rabbitmq/stream/Message.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -85,4 +85,31 @@ public interface Message {
8585
* @return the message annotations
8686
*/
8787
Map<String, Object> getMessageAnnotations();
88+
89+
/**
90+
* Add a message annotation to the message.
91+
*
92+
* @param key the message annotation key
93+
* @param value the message annotation value
94+
* @return the modified message
95+
* @since 0.12.0
96+
*/
97+
default Message annotate(String key, Object value) {
98+
this.getMessageAnnotations().put(key, value);
99+
return this;
100+
}
101+
102+
/**
103+
* Create a copy of the message.
104+
*
105+
* <p>The message copy contains the exact same instances of the original bare message (body,
106+
* properties, application properties), only the message annotations are actually copied and can
107+
* be modified independently.
108+
*
109+
* @return the message copy
110+
* @since 0.12.0
111+
*/
112+
default Message copy() {
113+
return this;
114+
}
88115
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream;
15+
16+
/**
17+
* API to instrument operations in the stream client. The supported operations are publishing, and
18+
* asynchronous delivery.
19+
*
20+
* <p>Implementations can gather information and send it to tracing backends. This allows e.g.
21+
* following the processing steps of a given message through different systems.
22+
*
23+
* <p>This is considered an SPI and is susceptible to change at any time.
24+
*
25+
* @since 0.12.0
26+
* @see EnvironmentBuilder#observationCollector(ObservationCollector)
27+
* @see com.rabbitmq.stream.observation.micrometer.MicrometerObservationCollectorBuilder
28+
*/
29+
public interface ObservationCollector<T> {
30+
31+
ObservationCollector<Void> NO_OP =
32+
new ObservationCollector<Void>() {
33+
@Override
34+
public Void prePublish(String stream, Message message) {
35+
return null;
36+
}
37+
38+
@Override
39+
public void published(Void context, Message message) {}
40+
41+
@Override
42+
public MessageHandler subscribe(MessageHandler handler) {
43+
return handler;
44+
}
45+
};
46+
47+
/**
48+
* Start observation.
49+
*
50+
* <p>Implementations are expecting to return an observation context that will be passed in to the
51+
* {@link #published(Object, Message)} callback.
52+
*
53+
* @param stream the stream to publish to
54+
* @param message the message to publish
55+
* @return observation context
56+
*/
57+
T prePublish(String stream, Message message);
58+
59+
/**
60+
* Callback when the message is about to be published.
61+
*
62+
* @param context the observation context
63+
* @param message the message to publish
64+
*/
65+
void published(T context, Message message);
66+
67+
/**
68+
* Decorate consumer registration.
69+
*
70+
* @param handler the original handler
71+
* @return a decorated handler
72+
*/
73+
MessageHandler subscribe(MessageHandler handler);
74+
75+
/**
76+
* Says whether the implementation does nothing or not.
77+
*
78+
* @return true if the implementation is a no-op
79+
*/
80+
default boolean isNoop() {
81+
return this == NO_OP;
82+
}
83+
}

src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -32,6 +32,7 @@
3232
import org.apache.qpid.proton.codec.WritableBuffer;
3333

3434
public class QpidProtonCodec implements Codec {
35+
3536
private static final Function<String, String> MESSAGE_ANNOTATIONS_STRING_KEY_EXTRACTOR = k -> k;
3637
private static final Function<Symbol, String> MESSAGE_ANNOTATIONS_SYMBOL_KEY_EXTRACTOR =
3738
Symbol::toString;
@@ -52,7 +53,7 @@ private static Map<String, Object> createMessageAnnotations(
5253
return createMapFromAmqpMap(
5354
MESSAGE_ANNOTATIONS_SYMBOL_KEY_EXTRACTOR, message.getMessageAnnotations().getValue());
5455
} else {
55-
return null;
56+
return new LinkedHashMap<>();
5657
}
5758
}
5859

@@ -258,7 +259,7 @@ public Message decode(byte[] data) {
258259
createMessageAnnotations(message));
259260
}
260261

261-
protected Properties createProperties(org.apache.qpid.proton.message.Message message) {
262+
protected static Properties createProperties(org.apache.qpid.proton.message.Message message) {
262263
if (message.getProperties() != null) {
263264
return new QpidProtonProperties(message.getProperties());
264265
} else {
@@ -504,6 +505,21 @@ public Map<String, Object> getApplicationProperties() {
504505
public Map<String, Object> getMessageAnnotations() {
505506
return messageAnnotations;
506507
}
508+
509+
@Override
510+
public Message annotate(String key, Object value) {
511+
this.messageAnnotations.put(key, value);
512+
return this;
513+
}
514+
515+
@Override
516+
public Message copy() {
517+
return new QpidProtonMessage(
518+
message,
519+
createProperties(message),
520+
createApplicationProperties(message),
521+
createMessageAnnotations(message));
522+
}
507523
}
508524

509525
static class QpidProtonAmqpMessageWrapper implements Message {
@@ -579,6 +595,38 @@ public Map<String, Object> getMessageAnnotations() {
579595
return null;
580596
}
581597
}
598+
599+
@Override
600+
public Message annotate(String key, Object value) {
601+
MessageAnnotations annotations = this.message.getMessageAnnotations();
602+
if (annotations == null) {
603+
annotations = new MessageAnnotations(new LinkedHashMap<>());
604+
this.message.setMessageAnnotations(annotations);
605+
}
606+
annotations.getValue().put(Symbol.getSymbol(key), value);
607+
return this;
608+
}
609+
610+
@Override
611+
public Message copy() {
612+
org.apache.qpid.proton.message.Message copy =
613+
org.apache.qpid.proton.message.Message.Factory.create();
614+
copy.setProperties(this.message.getProperties());
615+
copy.setBody(this.message.getBody());
616+
copy.setApplicationProperties(this.message.getApplicationProperties());
617+
if (this.message.getMessageAnnotations() != null) {
618+
Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
619+
Map<Symbol, Object> annotationCopy;
620+
if (annotations == null) {
621+
annotationCopy = null;
622+
} else {
623+
annotationCopy = new LinkedHashMap<>(annotations.size());
624+
annotationCopy.putAll(annotations);
625+
}
626+
copy.setMessageAnnotations(new MessageAnnotations(annotationCopy));
627+
}
628+
return new QpidProtonAmqpMessageWrapper(this.hasPublishingId, this.publishingId, copy);
629+
}
582630
}
583631

584632
// from

src/main/java/com/rabbitmq/stream/codec/QpidProtonMessageBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").

src/main/java/com/rabbitmq/stream/codec/SimpleCodec.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").

0 commit comments

Comments
 (0)