Skip to content

Commit f8b3132

Browse files
committed
Add support for distributed tracing
With Micrometer Observation and Tracing.
1 parent 9ff8f85 commit f8b3132

25 files changed

+1189
-39
lines changed

pom.xml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,12 @@
4545
<slf4j.version>1.7.36</slf4j.version>
4646
<logback.version>1.2.13</logback.version>
4747
<protonj2.version>1.0.0-M21-SNAPSHOT</protonj2.version>
48+
<micrometer.version>1.13.0</micrometer.version>
4849
<junit.jupiter.version>5.10.2</junit.jupiter.version>
4950
<assertj.version>3.26.0</assertj.version>
5051
<mockito.version>5.12.0</mockito.version>
5152
<amqp-client.version>5.20.0</amqp-client.version>
52-
<micrometer.version>1.13.0</micrometer.version>
53+
<micrometer-tracing-test.version>1.2.5</micrometer-tracing-test.version>
5354
<maven.compiler.plugin.version>3.13.0</maven.compiler.plugin.version>
5455
<maven-surefire-plugin.version>3.2.5</maven-surefire-plugin.version>
5556
<maven-resources-plugin.version>3.3.1</maven-resources-plugin.version>
@@ -166,6 +167,14 @@
166167
<scope>test</scope>
167168
</dependency>
168169

170+
<dependency>
171+
<groupId>io.micrometer</groupId>
172+
<artifactId>micrometer-tracing-integration-test</artifactId>
173+
<version>${micrometer-tracing-test.version}</version>
174+
<scope>test</scope>
175+
<optional>true</optional>
176+
</dependency>
177+
169178
<dependency>
170179
<groupId>com.github.spotbugs</groupId>
171180
<artifactId>spotbugs-annotations</artifactId>

src/main/java/com/rabbitmq/model/Message.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.math.BigDecimal;
2121
import java.util.UUID;
22+
import java.util.function.BiConsumer;
2223

2324
public interface Message {
2425

@@ -136,6 +137,12 @@ public interface Message {
136137
// TODO support iteration over message application properties
137138

138139
// TODO support message annotations
140+
Object annotation(String key);
141+
142+
Message annotation(String key, String value);
143+
144+
Message forEachAnnotation(BiConsumer<String, Object> action);
145+
139146
// TODO support message headers
140147

141148
MessageAddressBuilder toAddress();
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.model;
19+
20+
import java.util.function.Function;
21+
22+
public interface ObservationCollector {
23+
24+
<T> T publish(
25+
String exchange,
26+
String routingKey,
27+
Message message,
28+
ConnectionInfo connectionInfo,
29+
Function<Message, T> publishCall);
30+
31+
Consumer.MessageHandler subscribe(String queue, Consumer.MessageHandler handler);
32+
33+
interface ConnectionInfo {
34+
35+
String peerAddress();
36+
37+
int peerPort();
38+
}
39+
}

src/main/java/com/rabbitmq/model/amqp/AmqpConnection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static java.util.Collections.singletonMap;
2424

2525
import com.rabbitmq.model.*;
26+
import com.rabbitmq.model.ObservationCollector;
2627
import com.rabbitmq.model.metrics.MetricsCollector;
2728
import java.time.Duration;
2829
import java.util.ArrayList;
@@ -479,6 +480,10 @@ MetricsCollector metricsCollector() {
479480
return this.environment.metricsCollector();
480481
}
481482

483+
ObservationCollector observationCollector() {
484+
return this.environment.observationCollector();
485+
}
486+
482487
Publisher createPublisher(AmqpPublisherBuilder builder) {
483488
// TODO copy the builder properties to create the publisher
484489
AmqpPublisher publisher = new AmqpPublisher(builder);
@@ -544,6 +549,10 @@ private String currentConnectionLabel() {
544549
}
545550
}
546551

552+
Address connectionAddress() {
553+
return this.connectionAddress;
554+
}
555+
547556
private void close(Throwable cause) {
548557
if (this.closed.compareAndSet(false, true)) {
549558
this.state(CLOSING, cause);

src/main/java/com/rabbitmq/model/amqp/AmqpConsumer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
7272
super(builder.listeners());
7373
this.id = ID_SEQUENCE.getAndIncrement();
7474
this.initialCredits = builder.initialCredits();
75-
this.messageHandler = builder.messageHandler();
75+
this.messageHandler =
76+
builder
77+
.connection()
78+
.observationCollector()
79+
.subscribe(builder.queue(), builder.messageHandler());
7680
this.address = "/queue/" + builder.queue();
7781
this.nativeReceiver = createNativeReceiver(builder.connection().nativeSession(), this.address);
7882
this.initStateFromNativeReceiver(this.nativeReceiver);

src/main/java/com/rabbitmq/model/amqp/AmqpEnvironment.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.rabbitmq.model.amqp;
1919

2020
import com.rabbitmq.model.*;
21+
import com.rabbitmq.model.ObservationCollector;
2122
import com.rabbitmq.model.metrics.MetricsCollector;
2223
import com.rabbitmq.model.metrics.NoOpMetricsCollector;
2324
import java.util.ArrayList;
@@ -49,11 +50,13 @@ class AmqpEnvironment implements Environment {
4950
private volatile ScheduledFuture<?> clockRefreshFuture;
5051
private final AtomicBoolean clockRefreshSet = new AtomicBoolean(false);
5152
private final MetricsCollector metricsCollector;
53+
private final ObservationCollector observationCollector;
5254

5355
AmqpEnvironment(
5456
ExecutorService executorService,
5557
DefaultConnectionSettings<?> connectionSettings,
56-
MetricsCollector metricsCollector) {
58+
MetricsCollector metricsCollector,
59+
ObservationCollector observationCollector) {
5760
this.id = ID_SEQUENCE.getAndIncrement();
5861
connectionSettings.copyTo(this.connectionSettings);
5962
this.connectionSettings.consolidate();
@@ -71,6 +74,8 @@ class AmqpEnvironment implements Environment {
7174
Executors.newScheduledThreadPool(0, Utils.defaultThreadFactory());
7275
this.metricsCollector =
7376
metricsCollector == null ? NoOpMetricsCollector.INSTANCE : metricsCollector;
77+
this.observationCollector =
78+
observationCollector == null ? Utils.NO_OP_OBSERVATION_COLLECTOR : observationCollector;
7479
}
7580

7681
DefaultConnectionSettings<?> connectionSettings() {
@@ -128,6 +133,10 @@ MetricsCollector metricsCollector() {
128133
return this.metricsCollector;
129134
}
130135

136+
ObservationCollector observationCollector() {
137+
return this.observationCollector;
138+
}
139+
131140
void addConnection(AmqpConnection connection) {
132141
this.connections.add(connection);
133142
}

src/main/java/com/rabbitmq/model/amqp/AmqpEnvironmentBuilder.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.rabbitmq.model.ConnectionSettings;
2121
import com.rabbitmq.model.Environment;
2222
import com.rabbitmq.model.EnvironmentBuilder;
23+
import com.rabbitmq.model.ObservationCollector;
2324
import com.rabbitmq.model.metrics.MetricsCollector;
2425
import com.rabbitmq.model.metrics.NoOpMetricsCollector;
2526
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -31,6 +32,7 @@ public class AmqpEnvironmentBuilder implements EnvironmentBuilder {
3132
new DefaultEnvironmentConnectionSettings(this);
3233
private ExecutorService executorService;
3334
private MetricsCollector metricsCollector = NoOpMetricsCollector.INSTANCE;
35+
private ObservationCollector observationCollector = Utils.NO_OP_OBSERVATION_COLLECTOR;
3436

3537
public AmqpEnvironmentBuilder() {}
3638

@@ -44,14 +46,20 @@ public AmqpEnvironmentBuilder metricsCollector(MetricsCollector metricsCollector
4446
return this;
4547
}
4648

49+
public AmqpEnvironmentBuilder observationCollector(ObservationCollector observationCollector) {
50+
this.observationCollector = observationCollector;
51+
return this;
52+
}
53+
4754
@SuppressFBWarnings("EI_EXPOSE_REP")
4855
public EnvironmentConnectionSettings connectionSettings() {
4956
return this.connectionSettings;
5057
}
5158

5259
@Override
5360
public Environment build() {
54-
return new AmqpEnvironment(executorService, connectionSettings, metricsCollector);
61+
return new AmqpEnvironment(
62+
executorService, connectionSettings, metricsCollector, observationCollector);
5563
}
5664

5765
public interface EnvironmentConnectionSettings

src/main/java/com/rabbitmq/model/amqp/AmqpMessage.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,13 +179,13 @@ public Message correlationId(String correlationId) {
179179

180180
@Override
181181
public Message correlationId(long correlationId) {
182-
callOnDelegate(m -> m.correlationId(correlationId));
182+
callOnDelegate(m -> m.correlationId(UnsignedLong.valueOf(correlationId)));
183183
return this;
184184
}
185185

186186
@Override
187187
public Message correlationId(byte[] correlationId) {
188-
callOnDelegate(m -> m.correlationId(correlationId));
188+
callOnDelegate(m -> m.correlationId(new Binary(correlationId)));
189189
return this;
190190
}
191191

@@ -354,6 +354,23 @@ public boolean hasProperties() {
354354
return returnFromDelegate(org.apache.qpid.protonj2.client.Message::hasProperties);
355355
}
356356

357+
@Override
358+
public Object annotation(String key) {
359+
return returnFromDelegate(m -> m.annotation(key));
360+
}
361+
362+
@Override
363+
public Message annotation(String key, String value) {
364+
callOnDelegate(m -> m.annotation(key, value));
365+
return this;
366+
}
367+
368+
@Override
369+
public Message forEachAnnotation(BiConsumer<String, Object> action) {
370+
callOnDelegate(m -> m.forEachAnnotation(action));
371+
return this;
372+
}
373+
357374
@Override
358375
public Object removeProperty(String key) {
359376
return returnFromDelegate(m -> m.removeProperty(key));

src/main/java/com/rabbitmq/model/amqp/AmqpPublisher.java

Lines changed: 49 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
import static com.rabbitmq.model.Resource.State.OPEN;
2121

2222
import com.rabbitmq.model.Message;
23+
import com.rabbitmq.model.ObservationCollector;
2324
import com.rabbitmq.model.Publisher;
2425
import com.rabbitmq.model.metrics.MetricsCollector;
2526
import java.util.concurrent.*;
2627
import java.util.concurrent.atomic.AtomicBoolean;
2728
import java.util.concurrent.atomic.AtomicLong;
29+
import java.util.function.Function;
2830
import org.apache.qpid.protonj2.client.*;
2931
import org.apache.qpid.protonj2.client.exceptions.ClientException;
3032
import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException;
@@ -44,17 +46,38 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
4446
private final AmqpConnection connection;
4547
private final AtomicBoolean closed = new AtomicBoolean(false);
4648
private final MetricsCollector metricsCollector;
49+
private final ObservationCollector observationCollector;
50+
private final Function<Message, Tracker> publishCall;
51+
private final DefaultAddressBuilder.DestinationSpec destinationSpec;
52+
private volatile ObservationCollector.ConnectionInfo connectionInfo;
4753

4854
AmqpPublisher(AmqpPublisherBuilder builder) {
4955
super(builder.listeners());
5056
this.id = ID_SEQUENCE.getAndIncrement();
5157
this.executorService = builder.connection().executorService();
5258
this.address = builder.address();
59+
this.destinationSpec = builder.destination();
5360
this.connection = builder.connection();
5461
this.sender = this.createSender(builder.connection().nativeSession(), this.address);
5562
this.metricsCollector = this.connection.metricsCollector();
63+
this.observationCollector = this.connection.observationCollector();
5664
this.state(OPEN);
5765
this.metricsCollector.openPublisher();
66+
this.publishCall =
67+
msg -> {
68+
try {
69+
org.apache.qpid.protonj2.client.Message<?> nativeMessage =
70+
((AmqpMessage) msg).nativeMessage();
71+
return this.sender.send(nativeMessage.durable(true));
72+
} catch (ClientIllegalStateException e) {
73+
// the link is closed
74+
this.close(ExceptionUtils.convert(e));
75+
throw ExceptionUtils.convert(e);
76+
} catch (ClientException e) {
77+
throw ExceptionUtils.convert(e);
78+
}
79+
};
80+
this.connectionInfo = new Utils.ObservationConnectionInfo(this.connection.connectionAddress());
5881
}
5982

6083
@Override
@@ -70,40 +93,35 @@ public Message message(byte[] body) {
7093
@Override
7194
public void publish(Message message, Callback callback) {
7295
checkOpen();
73-
try {
74-
org.apache.qpid.protonj2.client.Message<?> nativeMessage =
75-
((AmqpMessage) message).nativeMessage();
76-
Tracker tracker = this.sender.send(nativeMessage.durable(true));
77-
this.executorService.submit(
78-
() -> {
79-
Status status;
80-
try {
81-
tracker.settlementFuture().get();
82-
status =
83-
tracker.remoteState() == DeliveryState.accepted()
84-
? Status.ACCEPTED
85-
: Status.FAILED;
86-
} catch (InterruptedException | ExecutionException e) {
87-
status = Status.FAILED;
88-
}
89-
DefaultContext defaultContext = new DefaultContext(message, status);
90-
this.metricsCollector.publishDisposition(
91-
status == Status.ACCEPTED
92-
? MetricsCollector.PublishDisposition.ACCEPTED
93-
: MetricsCollector.PublishDisposition.FAILED);
94-
callback.handle(defaultContext);
95-
});
96-
this.metricsCollector.publish();
97-
} catch (ClientIllegalStateException e) {
98-
// the link is closed
99-
this.close(ExceptionUtils.convert(e));
100-
throw ExceptionUtils.convert(e);
101-
} catch (ClientException e) {
102-
throw ExceptionUtils.convert(e);
103-
}
96+
Tracker tracker =
97+
this.observationCollector.publish(
98+
destinationSpec.exchange(),
99+
destinationSpec.routingKey(),
100+
message,
101+
this.connectionInfo,
102+
publishCall);
103+
this.executorService.submit(
104+
() -> {
105+
Status status;
106+
try {
107+
tracker.settlementFuture().get();
108+
status =
109+
tracker.remoteState() == DeliveryState.accepted() ? Status.ACCEPTED : Status.FAILED;
110+
} catch (InterruptedException | ExecutionException e) {
111+
status = Status.FAILED;
112+
}
113+
DefaultContext defaultContext = new DefaultContext(message, status);
114+
this.metricsCollector.publishDisposition(
115+
status == Status.ACCEPTED
116+
? MetricsCollector.PublishDisposition.ACCEPTED
117+
: MetricsCollector.PublishDisposition.FAILED);
118+
callback.handle(defaultContext);
119+
});
120+
this.metricsCollector.publish();
104121
}
105122

106123
void recoverAfterConnectionFailure() {
124+
this.connectionInfo = new Utils.ObservationConnectionInfo(this.connection.connectionAddress());
107125
this.sender = this.createSender(this.connection.nativeSession(false), this.address);
108126
}
109127

src/main/java/com/rabbitmq/model/amqp/AmqpPublisherBuilder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,8 @@ AddressBuilder<?> addressBuilder() {
8080
String address() {
8181
return this.addressBuilder.address();
8282
}
83+
84+
DefaultAddressBuilder.DestinationSpec destination() {
85+
return this.addressBuilder.destination();
86+
}
8387
}

0 commit comments

Comments
 (0)