Skip to content

Commit 8bc3f77

Browse files
committed
Refactor observation publishing
Add method to annotate a created message.
1 parent b3ac933 commit 8bc3f77

20 files changed

+139
-160
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@
562562
<style>GOOGLE</style>
563563
</googleJavaFormat>
564564
</java>
565-
<ratchetFrom>origin/main</ratchetFrom>
565+
<!-- <ratchetFrom>origin/main</ratchetFrom>-->
566566
<licenseHeader> <!-- specify either content or file, but not both -->
567567
<content>// Copyright (c) $YEAR VMware, Inc. or its affiliates. All rights reserved.
568568
//

src/main/java/com/rabbitmq/stream/Codec.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,6 @@ public interface Codec {
2828

2929
MessageBuilder messageBuilder();
3030

31-
MessageBuilder messageBuilder(String stream);
32-
33-
Codec messageBuilderListener(MessageBuilderListener listener);
34-
35-
Object listenerContext(Message message);
36-
3731
class EncodedMessage {
3832

3933
private final int size;
@@ -52,10 +46,4 @@ public int getSize() {
5246
return size;
5347
}
5448
}
55-
56-
@FunctionalInterface
57-
interface MessageBuilderListener {
58-
59-
Object accept(String stream, MessageBuilder builder);
60-
}
6149
}

src/main/java/com/rabbitmq/stream/Message.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2022 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -85,4 +85,9 @@ public interface Message {
8585
* @return the message annotations
8686
*/
8787
Map<String, Object> getMessageAnnotations();
88+
89+
default Message annotate(String key, Object value) {
90+
this.getMessageAnnotations().put(key, value);
91+
return this;
92+
}
8893
}

src/main/java/com/rabbitmq/stream/ObservationCollector.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,22 @@ public interface ObservationCollector {
1818
ObservationCollector NO_OP =
1919
new ObservationCollector() {
2020
@Override
21-
public Codec register(Codec codec) {
22-
return codec;
21+
public Object prePublish(String stream, Message message) {
22+
return null;
2323
}
2424

2525
@Override
26-
public void publish(Codec codec, Message message) {}
26+
public void published(Object context, Message message) {}
2727

2828
@Override
2929
public MessageHandler subscribe(MessageHandler handler) {
3030
return handler;
3131
}
3232
};
3333

34-
Codec register(Codec codec);
34+
void published(Object context, Message message);
3535

36-
void publish(Codec codec, Message message);
36+
Object prePublish(String stream, Message message);
3737

3838
MessageHandler subscribe(MessageHandler handler);
3939
}

src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java

Lines changed: 21 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -32,19 +32,11 @@
3232
import org.apache.qpid.proton.codec.WritableBuffer;
3333

3434
public class QpidProtonCodec implements Codec {
35-
static final MessageBuilderListener NO_OP_MESSAGE_BUILDER_LISTENER = (s, mb) -> null;
35+
3636
private static final Function<String, String> MESSAGE_ANNOTATIONS_STRING_KEY_EXTRACTOR = k -> k;
3737
private static final Function<Symbol, String> MESSAGE_ANNOTATIONS_SYMBOL_KEY_EXTRACTOR =
3838
Symbol::toString;
3939

40-
public QpidProtonCodec() {
41-
this(NO_OP_MESSAGE_BUILDER_LISTENER);
42-
}
43-
44-
public QpidProtonCodec(MessageBuilderListener messageBuilderListener) {
45-
this.messageBuilderListener = messageBuilderListener;
46-
}
47-
4840
private static Map<String, Object> createApplicationProperties(
4941
org.apache.qpid.proton.message.Message message) {
5042
if (message.getApplicationProperties() != null) {
@@ -61,7 +53,7 @@ private static Map<String, Object> createMessageAnnotations(
6153
return createMapFromAmqpMap(
6254
MESSAGE_ANNOTATIONS_SYMBOL_KEY_EXTRACTOR, message.getMessageAnnotations().getValue());
6355
} else {
64-
return null;
56+
return new LinkedHashMap<>();
6557
}
6658
}
6759

@@ -113,22 +105,6 @@ private static Object convertApplicationProperty(Object value) {
113105
}
114106
}
115107

116-
private final MessageBuilderListener messageBuilderListener;
117-
118-
@Override
119-
public Codec messageBuilderListener(MessageBuilderListener listener) {
120-
return new QpidProtonCodec(listener);
121-
}
122-
123-
@Override
124-
public Object listenerContext(Message message) {
125-
if (message instanceof QpidProtonAmqpMessageWrapper) {
126-
return ((QpidProtonAmqpMessageWrapper) message).listenerContext;
127-
} else {
128-
return null;
129-
}
130-
}
131-
132108
@Override
133109
public EncodedMessage encode(Message message) {
134110
org.apache.qpid.proton.message.Message qpidMessage;
@@ -326,12 +302,7 @@ protected Object convertToQpidType(Object value) {
326302

327303
@Override
328304
public MessageBuilder messageBuilder() {
329-
return this.messageBuilder(null);
330-
}
331-
332-
@Override
333-
public MessageBuilder messageBuilder(String stream) {
334-
return new QpidProtonMessageBuilder(stream, this.messageBuilderListener);
305+
return new QpidProtonMessageBuilder();
335306
}
336307

337308
private static final class QpidProtonProperties implements Properties {
@@ -534,27 +505,30 @@ public Map<String, Object> getApplicationProperties() {
534505
public Map<String, Object> getMessageAnnotations() {
535506
return messageAnnotations;
536507
}
508+
509+
@Override
510+
public Message annotate(String key, Object value) {
511+
this.messageAnnotations.put(key, value);
512+
return this;
513+
}
537514
}
538515

539516
static class QpidProtonAmqpMessageWrapper implements Message {
540517

541518
private final boolean hasPublishingId;
542519
private final long publishingId;
543520
private final org.apache.qpid.proton.message.Message message;
544-
private final Object listenerContext;
545521
private Properties properties;
546522
private Map<String, Object> applicationProperties;
547523
private Map<String, Object> messageAnnotations;
548524

549525
QpidProtonAmqpMessageWrapper(
550526
boolean hasPublishingId,
551527
long publishingId,
552-
org.apache.qpid.proton.message.Message message,
553-
Object listenerContext) {
528+
org.apache.qpid.proton.message.Message message) {
554529
this.hasPublishingId = hasPublishingId;
555530
this.publishingId = publishingId;
556531
this.message = message;
557-
this.listenerContext = listenerContext;
558532
}
559533

560534
@Override
@@ -612,6 +586,17 @@ public Map<String, Object> getMessageAnnotations() {
612586
return null;
613587
}
614588
}
589+
590+
@Override
591+
public Message annotate(String key, Object value) {
592+
MessageAnnotations annotations = this.message.getMessageAnnotations();
593+
if (annotations == null) {
594+
annotations = new MessageAnnotations(new LinkedHashMap<>());
595+
this.message.setMessageAnnotations(annotations);
596+
}
597+
annotations.getValue().put(Symbol.getSymbol(key), value);
598+
return this;
599+
}
615600
}
616601

617602
// from

src/main/java/com/rabbitmq/stream/codec/QpidProtonMessageBuilder.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
1414
package com.rabbitmq.stream.codec;
1515

16-
import com.rabbitmq.stream.Codec;
1716
import com.rabbitmq.stream.Message;
1817
import com.rabbitmq.stream.MessageBuilder;
1918
import java.math.BigDecimal;
@@ -34,8 +33,6 @@
3433

3534
class QpidProtonMessageBuilder implements MessageBuilder {
3635

37-
private final String stream;
38-
private final Codec.MessageBuilderListener listener;
3936
private final org.apache.qpid.proton.message.Message message =
4037
org.apache.qpid.proton.message.Message.Factory.create();
4138
private final AtomicBoolean built = new AtomicBoolean(false);
@@ -45,15 +42,9 @@ class QpidProtonMessageBuilder implements MessageBuilder {
4542
private QpidProtonjApplicationPropertiesBuilder applicationPropertiesBuilder;
4643
private QpidProtonjMessageAnnotationsBuilder messageAnnotationsBuilder;
4744

48-
QpidProtonMessageBuilder(String stream, Codec.MessageBuilderListener listener) {
49-
this.stream = stream;
50-
this.listener = listener;
51-
}
52-
5345
@Override
5446
public Message build() {
5547
if (built.compareAndSet(false, true)) {
56-
Object context = listener.accept(this.stream, this);
5748
if (propertiesBuilder != null) {
5849
message.setProperties(propertiesBuilder.properties);
5950
}
@@ -66,7 +57,7 @@ public Message build() {
6657
new MessageAnnotations(messageAnnotationsBuilder.messageAnnotations));
6758
}
6859
return new QpidProtonCodec.QpidProtonAmqpMessageWrapper(
69-
hasPublishingId, publishingId, message, context);
60+
hasPublishingId, publishingId, message);
7061
} else {
7162
throw new IllegalStateException("A message builder can build only one message");
7263
}

src/main/java/com/rabbitmq/stream/codec/SimpleCodec.java

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,21 +37,6 @@ public MessageBuilder messageBuilder() {
3737
return new SimpleMessageBuilder();
3838
}
3939

40-
@Override
41-
public MessageBuilder messageBuilder(String stream) {
42-
return new SimpleMessageBuilder();
43-
}
44-
45-
@Override
46-
public Codec messageBuilderListener(MessageBuilderListener listener) {
47-
return new SimpleCodec();
48-
}
49-
50-
@Override
51-
public Object listenerContext(Message message) {
52-
return null;
53-
}
54-
5540
private static class SimpleMessage implements Message {
5641

5742
private final boolean hasPublishingId;

src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -122,21 +122,6 @@ public MessageBuilder messageBuilder() {
122122
return new SwiftMqMessageBuilder();
123123
}
124124

125-
@Override
126-
public MessageBuilder messageBuilder(String stream) {
127-
return null;
128-
}
129-
130-
@Override
131-
public Codec messageBuilderListener(MessageBuilderListener listener) {
132-
return null;
133-
}
134-
135-
@Override
136-
public Object listenerContext(Message message) {
137-
return null;
138-
}
139-
140125
@Override
141126
public EncodedMessage encode(Message message) {
142127
AMQPMessage outboundMessage;
@@ -295,7 +280,7 @@ public EncodedMessage encode(Message message) {
295280
}
296281
}
297282

298-
protected AMQPType convertToSwiftMqType(Object value) {
283+
protected static AMQPType convertToSwiftMqType(Object value) {
299284
if (value instanceof Boolean) {
300285
return ((Boolean) value).booleanValue() ? AMQPBoolean.TRUE : AMQPBoolean.FALSE;
301286
} else if (value instanceof Byte) {
@@ -663,5 +648,25 @@ public Map<String, Object> getMessageAnnotations() {
663648
return null;
664649
}
665650
}
651+
652+
@Override
653+
public Message annotate(String key, Object value) {
654+
MessageAnnotations annotations = this.message.getMessageAnnotations();
655+
Map<AMQPType, AMQPType> map;
656+
try {
657+
if (annotations == null) {
658+
map = new LinkedHashMap<>();
659+
annotations = new MessageAnnotations(map);
660+
this.message.setMessageAnnotations(annotations);
661+
} else {
662+
map = annotations.getValue();
663+
}
664+
map.put(new AMQPSymbol(key), convertToSwiftMqType(value));
665+
annotations.setValue(map);
666+
} catch (IOException e) {
667+
throw new StreamException("Error while annotating SwiftMQ message");
668+
}
669+
return this;
670+
}
666671
}
667672
}

src/main/java/com/rabbitmq/stream/impl/MessageAccumulator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2020-2021 VMware, Inc. or its affiliates. All rights reserved.
1+
// Copyright (c) 2020-2023 VMware, Inc. or its affiliates. All rights reserved.
22
//
33
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
44
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
@@ -30,12 +30,14 @@ interface AccumulatedEntity {
3030

3131
long time();
3232

33-
long publishindId();
33+
long publishingId();
3434

3535
String filterValue();
3636

3737
Object encodedEntity();
3838

3939
StreamProducer.ConfirmationCallback confirmationCallback();
40+
41+
Object observationContext();
4042
}
4143
}

0 commit comments

Comments
 (0)