Skip to content

Commit 06c6a93

Browse files
committed
Add support for observability
WIP References #379
1 parent 9a79038 commit 06c6a93

29 files changed

+976
-60
lines changed

pom.xml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
<gson.version>2.10.1</gson.version>
7575
<vavr.version>0.10.4</vavr.version>
7676
<paho.version>1.2.5</paho.version>
77+
<micrometer-tracing-test.version>1.1.3</micrometer-tracing-test.version>
7778
<maven.compiler.plugin.version>3.11.0</maven.compiler.plugin.version>
7879
<maven-surefire-plugin.version>3.1.2</maven-surefire-plugin.version>
7980
<spring-boot-maven-plugin.version>2.7.6</spring-boot-maven-plugin.version>
@@ -281,7 +282,7 @@
281282
<groupId>org.eclipse.paho</groupId>
282283
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
283284
<version>${paho.version}</version>
284-
<scope>test</scope>
285+
<scope>test</scope>
285286
</dependency>
286287

287288
<dependency>
@@ -312,6 +313,13 @@
312313
<scope>test</scope>
313314
</dependency>
314315

316+
<dependency>
317+
<groupId>io.micrometer</groupId>
318+
<artifactId>micrometer-tracing-integration-test</artifactId>
319+
<version>${micrometer-tracing-test.version}</version>
320+
<scope>test</scope>
321+
</dependency>
322+
315323
<dependency>
316324
<groupId>org.openjdk.jmh</groupId>
317325
<artifactId>jmh-core</artifactId>
@@ -554,7 +562,7 @@
554562
<style>GOOGLE</style>
555563
</googleJavaFormat>
556564
</java>
557-
<!-- <ratchetFrom>origin/main</ratchetFrom>-->
565+
<ratchetFrom>origin/main</ratchetFrom>
558566
<licenseHeader> <!-- specify either content or file, but not both -->
559567
<content>// Copyright (c) $YEAR VMware, Inc. or its affiliates. All rights reserved.
560568
//

src/main/java/com/rabbitmq/stream/Codec.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -28,6 +28,12 @@ public interface Codec {
2828

2929
MessageBuilder messageBuilder();
3030

31+
MessageBuilder messageBuilder(String stream);
32+
33+
Codec messageBuilderListener(MessageBuilderListener listener);
34+
35+
Object listenerContext(Message message);
36+
3137
class EncodedMessage {
3238

3339
private final int size;
@@ -46,4 +52,10 @@ public int getSize() {
4652
return size;
4753
}
4854
}
55+
56+
@FunctionalInterface
57+
interface MessageBuilderListener {
58+
59+
Object accept(String stream, MessageBuilder builder);
60+
}
4961
}

src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,8 @@ public interface EnvironmentBuilder {
222222
*/
223223
EnvironmentBuilder metricsCollector(MetricsCollector metricsCollector);
224224

225+
EnvironmentBuilder observationCollector(ObservationCollector observationCollector);
226+
225227
/**
226228
* The maximum number of producers allocated to a single connection.
227229
*
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream;
15+
16+
public interface ObservationCollector {
17+
18+
ObservationCollector NO_OP =
19+
new ObservationCollector() {
20+
@Override
21+
public Codec register(Codec codec) {
22+
return codec;
23+
}
24+
25+
@Override
26+
public void publish(Codec codec, Message message) {}
27+
28+
@Override
29+
public MessageHandler subscribe(MessageHandler handler) {
30+
return handler;
31+
}
32+
};
33+
34+
Codec register(Codec codec);
35+
36+
void publish(Codec codec, Message message);
37+
38+
MessageHandler subscribe(MessageHandler handler);
39+
}

src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -32,10 +32,19 @@
3232
import org.apache.qpid.proton.codec.WritableBuffer;
3333

3434
public class QpidProtonCodec implements Codec {
35+
private static final MessageBuilderListener NO_OP_MESSAGE_BUILDER_LISTENER = (s, mb) -> null;
3536
private static final Function<String, String> MESSAGE_ANNOTATIONS_STRING_KEY_EXTRACTOR = k -> k;
3637
private static final Function<Symbol, String> MESSAGE_ANNOTATIONS_SYMBOL_KEY_EXTRACTOR =
3738
Symbol::toString;
3839

40+
public QpidProtonCodec() {
41+
this(NO_OP_MESSAGE_BUILDER_LISTENER);
42+
}
43+
44+
public QpidProtonCodec(MessageBuilderListener messageBuilderListener) {
45+
this.messageBuilderListener = messageBuilderListener;
46+
}
47+
3948
private static Map<String, Object> createApplicationProperties(
4049
org.apache.qpid.proton.message.Message message) {
4150
if (message.getApplicationProperties() != null) {
@@ -104,6 +113,22 @@ private static Object convertApplicationProperty(Object value) {
104113
}
105114
}
106115

116+
private final MessageBuilderListener messageBuilderListener;
117+
118+
@Override
119+
public Codec messageBuilderListener(MessageBuilderListener listener) {
120+
return new QpidProtonCodec(listener);
121+
}
122+
123+
@Override
124+
public Object listenerContext(Message message) {
125+
if (message instanceof QpidProtonAmqpMessageWrapper) {
126+
return ((QpidProtonAmqpMessageWrapper) message).listenerContext;
127+
} else {
128+
return null;
129+
}
130+
}
131+
107132
@Override
108133
public EncodedMessage encode(Message message) {
109134
org.apache.qpid.proton.message.Message qpidMessage;
@@ -301,7 +326,12 @@ protected Object convertToQpidType(Object value) {
301326

302327
@Override
303328
public MessageBuilder messageBuilder() {
304-
return new QpidProtonMessageBuilder();
329+
return this.messageBuilder(null);
330+
}
331+
332+
@Override
333+
public MessageBuilder messageBuilder(String stream) {
334+
return new QpidProtonMessageBuilder(stream, this.messageBuilderListener);
305335
}
306336

307337
private static final class QpidProtonProperties implements Properties {
@@ -511,17 +541,20 @@ static class QpidProtonAmqpMessageWrapper implements Message {
511541
private final boolean hasPublishingId;
512542
private final long publishingId;
513543
private final org.apache.qpid.proton.message.Message message;
544+
private final Object listenerContext;
514545
private Properties properties;
515546
private Map<String, Object> applicationProperties;
516547
private Map<String, Object> messageAnnotations;
517548

518549
QpidProtonAmqpMessageWrapper(
519550
boolean hasPublishingId,
520551
long publishingId,
521-
org.apache.qpid.proton.message.Message message) {
552+
org.apache.qpid.proton.message.Message message,
553+
Object listenerContext) {
522554
this.hasPublishingId = hasPublishingId;
523555
this.publishingId = publishingId;
524556
this.message = message;
557+
this.listenerContext = listenerContext;
525558
}
526559

527560
@Override

src/main/java/com/rabbitmq/stream/codec/QpidProtonMessageBuilder.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -13,6 +13,7 @@
1313
1414
package com.rabbitmq.stream.codec;
1515

16+
import com.rabbitmq.stream.Codec;
1617
import com.rabbitmq.stream.Message;
1718
import com.rabbitmq.stream.MessageBuilder;
1819
import java.math.BigDecimal;
@@ -33,6 +34,8 @@
3334

3435
class QpidProtonMessageBuilder implements MessageBuilder {
3536

37+
private final String stream;
38+
private final Codec.MessageBuilderListener listener;
3639
private final org.apache.qpid.proton.message.Message message =
3740
org.apache.qpid.proton.message.Message.Factory.create();
3841
private final AtomicBoolean built = new AtomicBoolean(false);
@@ -42,9 +45,15 @@ class QpidProtonMessageBuilder implements MessageBuilder {
4245
private QpidProtonjApplicationPropertiesBuilder applicationPropertiesBuilder;
4346
private QpidProtonjMessageAnnotationsBuilder messageAnnotationsBuilder;
4447

48+
QpidProtonMessageBuilder(String stream, Codec.MessageBuilderListener listener) {
49+
this.stream = stream;
50+
this.listener = listener;
51+
}
52+
4553
@Override
4654
public Message build() {
4755
if (built.compareAndSet(false, true)) {
56+
Object context = listener.accept(this.stream, this);
4857
if (propertiesBuilder != null) {
4958
message.setProperties(propertiesBuilder.properties);
5059
}
@@ -57,7 +66,7 @@ public Message build() {
5766
new MessageAnnotations(messageAnnotationsBuilder.messageAnnotations));
5867
}
5968
return new QpidProtonCodec.QpidProtonAmqpMessageWrapper(
60-
hasPublishingId, publishingId, message);
69+
hasPublishingId, publishingId, message, context);
6170
} else {
6271
throw new IllegalStateException("A message builder can build only one message");
6372
}

src/main/java/com/rabbitmq/stream/codec/SimpleCodec.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -37,6 +37,21 @@ public MessageBuilder messageBuilder() {
3737
return new SimpleMessageBuilder();
3838
}
3939

40+
@Override
41+
public MessageBuilder messageBuilder(String stream) {
42+
return null;
43+
}
44+
45+
@Override
46+
public Codec messageBuilderListener(MessageBuilderListener listener) {
47+
return null;
48+
}
49+
50+
@Override
51+
public Object listenerContext(Message message) {
52+
return null;
53+
}
54+
4055
private static class SimpleMessage implements Message {
4156

4257
private final boolean hasPublishingId;

src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -122,6 +122,21 @@ public MessageBuilder messageBuilder() {
122122
return new SwiftMqMessageBuilder();
123123
}
124124

125+
@Override
126+
public MessageBuilder messageBuilder(String stream) {
127+
return null;
128+
}
129+
130+
@Override
131+
public Codec messageBuilderListener(MessageBuilderListener listener) {
132+
return null;
133+
}
134+
135+
@Override
136+
public Object listenerContext(Message message) {
137+
return null;
138+
}
139+
125140
@Override
126141
public EncodedMessage encode(Message message) {
127142
AMQPMessage outboundMessage;

src/main/java/com/rabbitmq/stream/impl/SimpleMessageAccumulator.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -13,11 +13,7 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16-
import com.rabbitmq.stream.Codec;
17-
import com.rabbitmq.stream.ConfirmationHandler;
18-
import com.rabbitmq.stream.ConfirmationStatus;
19-
import com.rabbitmq.stream.Message;
20-
import com.rabbitmq.stream.StreamException;
16+
import com.rabbitmq.stream.*;
2117
import java.util.concurrent.BlockingQueue;
2218
import java.util.concurrent.LinkedBlockingQueue;
2319
import java.util.concurrent.TimeUnit;
@@ -31,18 +27,20 @@ class SimpleMessageAccumulator implements MessageAccumulator {
3127
protected final BlockingQueue<AccumulatedEntity> messages;
3228
protected final Clock clock;
3329
private final int capacity;
34-
private final Codec codec;
30+
protected final Codec codec;
3531
private final int maxFrameSize;
3632
private final ToLongFunction<Message> publishSequenceFunction;
3733
private final Function<Message, String> filterValueExtractor;
34+
final ObservationCollector observationCollector;
3835

3936
SimpleMessageAccumulator(
4037
int capacity,
4138
Codec codec,
4239
int maxFrameSize,
4340
ToLongFunction<Message> publishSequenceFunction,
4441
Function<Message, String> filterValueExtractor,
45-
Clock clock) {
42+
Clock clock,
43+
ObservationCollector observationCollector) {
4644
this.capacity = capacity;
4745
this.messages = new LinkedBlockingQueue<>(capacity);
4846
this.codec = codec;
@@ -51,6 +49,7 @@ class SimpleMessageAccumulator implements MessageAccumulator {
5149
this.filterValueExtractor =
5250
filterValueExtractor == null ? NULL_FILTER_VALUE_EXTRACTOR : filterValueExtractor;
5351
this.clock = clock;
52+
this.observationCollector = observationCollector;
5453
}
5554

5655
public boolean add(Message message, ConfirmationHandler confirmationHandler) {
@@ -79,7 +78,11 @@ public boolean add(Message message, ConfirmationHandler confirmationHandler) {
7978

8079
@Override
8180
public AccumulatedEntity get() {
82-
return this.messages.poll();
81+
AccumulatedEntity entity = this.messages.poll();
82+
if (entity != null) {
83+
this.observationCollector.publish(this.codec, entity.confirmationCallback().message());
84+
}
85+
return entity;
8386
}
8487

8588
@Override
@@ -155,5 +158,10 @@ public int handle(boolean confirmed, short code) {
155158
confirmationHandler.handle(new ConfirmationStatus(message, confirmed, code));
156159
return 1;
157160
}
161+
162+
@Override
163+
public Message message() {
164+
return this.message;
165+
}
158166
}
159167
}

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,8 @@ public Consumer build() {
219219
};
220220
}
221221

222+
handler = this.environment.observationCollector().subscribe(handler);
223+
222224
Consumer consumer;
223225
if (this.stream != null) {
224226
consumer =

0 commit comments

Comments
 (0)