Skip to content

Add support for observability #384

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ci/publish-documentation-to-github-pages.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@

. $(pwd)/release-versions.txt

./mvnw clean test-compile exec:java \
-Dexec.mainClass=io.micrometer.docs.DocsGeneratorCommand \
-Dexec.classpathScope="test" \
-Dexec.args='src/main/java/com/rabbitmq/stream/observation/micrometer .* target/micrometer-observation-docs'

MESSAGE=$(git log -1 --pretty=%B)
./mvnw clean buildnumber:create pre-site --no-transfer-progress
./mvnw buildnumber:create pre-site --no-transfer-progress

./mvnw javadoc:javadoc -Dmaven.javadoc.skip=false --no-transfer-progress

Expand Down
22 changes: 21 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
<gson.version>2.10.1</gson.version>
<vavr.version>0.10.4</vavr.version>
<paho.version>1.2.5</paho.version>
<micrometer-tracing-test.version>1.1.3</micrometer-tracing-test.version>
<micrometer-docs-generator.version>1.0.2</micrometer-docs-generator.version>
<maven.compiler.plugin.version>3.11.0</maven.compiler.plugin.version>
<maven-surefire-plugin.version>3.1.2</maven-surefire-plugin.version>
<spring-boot-maven-plugin.version>2.7.6</spring-boot-maven-plugin.version>
Expand Down Expand Up @@ -281,7 +283,7 @@
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>${paho.version}</version>
<scope>test</scope>
<scope>test</scope>
</dependency>

<dependency>
Expand Down Expand Up @@ -312,6 +314,20 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-tracing-integration-test</artifactId>
<version>${micrometer-tracing-test.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-docs-generator</artifactId>
<version>${micrometer-docs-generator.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
Expand Down Expand Up @@ -393,6 +409,9 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>${maven-resources-plugin.version}</version>
<configuration>
<propertiesEncoding>${project.build.sourceEncoding}</propertiesEncoding>
</configuration>
</plugin>

<plugin>
Expand Down Expand Up @@ -457,6 +476,7 @@
<idseparator>-</idseparator>
<source-highlighter>coderay</source-highlighter>
<test-examples>../../test/java/com/rabbitmq/stream/docs</test-examples>
<build-directory>${project.build.directory}</build-directory>
</attributes>

</configuration>
Expand Down
26 changes: 26 additions & 0 deletions src/docs/asciidoc/appendixes.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
ifndef::build-directory[:build-directory: ../../../target]
:test-examples: ../../test/java/com/rabbitmq/stream/docs

[appendix]
== Micrometer Observation

It is possible to use https://micrometer.io/docs/observation[Micrometer Observation] to instrument publishing and consuming in the stream Java client.
Micrometer Observation provides https://spring.io/blog/2022/10/12/observability-with-spring-boot-3[metrics, tracing, and log correlation with one single API].

The stream Java client provides an `ObservationCollector` abstraction and an implementation for Micrometer Observation.
The following snippet shows how to create and set up the Micrometer `ObservationCollector` implementation with an existing `ObservationRegistry`:

.Configuring Micrometer Observation
[source,java,indent=0]
--------
include::{test-examples}/EnvironmentUsage.java[tag=micrometer-observation]
--------
<1> Configure Micrometer `ObservationCollector` with builder
<2> Set Micrometer `ObservationRegistry`

The next sections document the conventions, spans, and metrics made available by the instrumentation.
They are automatically generated from the source code with the https://github.com/micrometer-metrics/micrometer-docs-generator[Micrometer documentation generator].

include::{build-directory}/micrometer-observation-docs/_conventions.adoc[]
include::{build-directory}/micrometer-observation-docs/_spans.adoc[]
include::{build-directory}/micrometer-observation-docs/_metrics.adoc[]
5 changes: 4 additions & 1 deletion src/docs/asciidoc/index.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
= RabbitMQ Stream Java Client
:revnumber: {project-version}
:revremark: ({build-number})
:appendix-caption: Appendix
ifndef::imagesdir[:imagesdir: images]
ifndef::sourcedir[:sourcedir: ../../main/java]
:source-highlighter: prettify
Expand Down Expand Up @@ -28,4 +29,6 @@ include::advanced-topics.adoc[]

include::building.adoc[]

include::performance-tool.adoc[]
include::performance-tool.adoc[]

include::appendixes.adoc[]
2 changes: 1 addition & 1 deletion src/main/java/com/rabbitmq/stream/Codec.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ public interface EnvironmentBuilder {
*/
EnvironmentBuilder metricsCollector(MetricsCollector metricsCollector);

EnvironmentBuilder observationCollector(ObservationCollector<?> observationCollector);

/**
* The maximum number of producers allocated to a single connection.
*
Expand Down
29 changes: 28 additions & 1 deletion src/main/java/com/rabbitmq/stream/Message.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
Expand Down Expand Up @@ -85,4 +85,31 @@ public interface Message {
* @return the message annotations
*/
Map<String, Object> getMessageAnnotations();

/**
* Add a message annotation to the message.
*
* @param key the message annotation key
* @param value the message annotation value
* @return the modified message
* @since 0.12.0
*/
default Message annotate(String key, Object value) {
this.getMessageAnnotations().put(key, value);
return this;
}

/**
* Create a copy of the message.
*
* <p>The message copy contains the exact same instances of the original bare message (body,
* properties, application properties), only the message annotations are actually copied and can
* be modified independently.
*
* @return the message copy
* @since 0.12.0
*/
default Message copy() {
return this;
}
}
83 changes: 83 additions & 0 deletions src/main/java/com/rabbitmq/stream/ObservationCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
// please see LICENSE-APACHE2.
//
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
// either express or implied. See the LICENSE file for specific language governing
// rights and limitations of this software.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].
package com.rabbitmq.stream;

/**
* API to instrument operations in the stream client. The supported operations are publishing, and
* asynchronous delivery.
*
* <p>Implementations can gather information and send it to tracing backends. This allows e.g.
* following the processing steps of a given message through different systems.
*
* <p>This is considered an SPI and is susceptible to change at any time.
*
* @since 0.12.0
* @see EnvironmentBuilder#observationCollector(ObservationCollector)
* @see com.rabbitmq.stream.observation.micrometer.MicrometerObservationCollectorBuilder
*/
public interface ObservationCollector<T> {

ObservationCollector<Void> NO_OP =
new ObservationCollector<Void>() {
@Override
public Void prePublish(String stream, Message message) {
return null;
}

@Override
public void published(Void context, Message message) {}

@Override
public MessageHandler subscribe(MessageHandler handler) {
return handler;
}
};

/**
* Start observation.
*
* <p>Implementations are expecting to return an observation context that will be passed in to the
* {@link #published(Object, Message)} callback.
*
* @param stream the stream to publish to
* @param message the message to publish
* @return observation context
*/
T prePublish(String stream, Message message);

/**
* Callback when the message is about to be published.
*
* @param context the observation context
* @param message the message to publish
*/
void published(T context, Message message);

/**
* Decorate consumer registration.
*
* @param handler the original handler
* @return a decorated handler
*/
MessageHandler subscribe(MessageHandler handler);

/**
* Says whether the implementation does nothing or not.
*
* @return true if the implementation is a no-op
*/
default boolean isNoop() {
return this == NO_OP;
}
}
54 changes: 51 additions & 3 deletions src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
Expand Down Expand Up @@ -32,6 +32,7 @@
import org.apache.qpid.proton.codec.WritableBuffer;

public class QpidProtonCodec implements Codec {

private static final Function<String, String> MESSAGE_ANNOTATIONS_STRING_KEY_EXTRACTOR = k -> k;
private static final Function<Symbol, String> MESSAGE_ANNOTATIONS_SYMBOL_KEY_EXTRACTOR =
Symbol::toString;
Expand All @@ -52,7 +53,7 @@ private static Map<String, Object> createMessageAnnotations(
return createMapFromAmqpMap(
MESSAGE_ANNOTATIONS_SYMBOL_KEY_EXTRACTOR, message.getMessageAnnotations().getValue());
} else {
return null;
return new LinkedHashMap<>();
}
}

Expand Down Expand Up @@ -258,7 +259,7 @@ public Message decode(byte[] data) {
createMessageAnnotations(message));
}

protected Properties createProperties(org.apache.qpid.proton.message.Message message) {
protected static Properties createProperties(org.apache.qpid.proton.message.Message message) {
if (message.getProperties() != null) {
return new QpidProtonProperties(message.getProperties());
} else {
Expand Down Expand Up @@ -504,6 +505,21 @@ public Map<String, Object> getApplicationProperties() {
public Map<String, Object> getMessageAnnotations() {
return messageAnnotations;
}

@Override
public Message annotate(String key, Object value) {
this.messageAnnotations.put(key, value);
return this;
}

@Override
public Message copy() {
return new QpidProtonMessage(
message,
createProperties(message),
createApplicationProperties(message),
createMessageAnnotations(message));
}
}

static class QpidProtonAmqpMessageWrapper implements Message {
Expand Down Expand Up @@ -579,6 +595,38 @@ public Map<String, Object> getMessageAnnotations() {
return null;
}
}

@Override
public Message annotate(String key, Object value) {
MessageAnnotations annotations = this.message.getMessageAnnotations();
if (annotations == null) {
annotations = new MessageAnnotations(new LinkedHashMap<>());
this.message.setMessageAnnotations(annotations);
}
annotations.getValue().put(Symbol.getSymbol(key), value);
return this;
}

@Override
public Message copy() {
org.apache.qpid.proton.message.Message copy =
org.apache.qpid.proton.message.Message.Factory.create();
copy.setProperties(this.message.getProperties());
copy.setBody(this.message.getBody());
copy.setApplicationProperties(this.message.getApplicationProperties());
if (this.message.getMessageAnnotations() != null) {
Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
Map<Symbol, Object> annotationCopy;
if (annotations == null) {
annotationCopy = null;
} else {
annotationCopy = new LinkedHashMap<>(annotations.size());
annotationCopy.putAll(annotations);
}
copy.setMessageAnnotations(new MessageAnnotations(annotationCopy));
}
return new QpidProtonAmqpMessageWrapper(this.hasPublishingId, this.publishingId, copy);
}
}

// from
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/rabbitmq/stream/codec/SimpleCodec.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
Expand Down
Loading