Skip to content

Commit 768865f

Browse files
committed
Copy message if necessary for observation collector
When a message is routed to several streams in a super stream producer. References #379
1 parent 3a537dc commit 768865f

File tree

6 files changed

+162
-24
lines changed

6 files changed

+162
-24
lines changed

src/main/java/com/rabbitmq/stream/Message.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,8 @@ default Message annotate(String key, Object value) {
9090
this.getMessageAnnotations().put(key, value);
9191
return this;
9292
}
93+
94+
default Message copy() {
95+
return this;
96+
}
9397
}

src/main/java/com/rabbitmq/stream/ObservationCollector.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,8 @@ public MessageHandler subscribe(MessageHandler handler) {
3636
T prePublish(String stream, Message message);
3737

3838
MessageHandler subscribe(MessageHandler handler);
39+
40+
default boolean isNoop() {
41+
return this == NO_OP;
42+
}
3943
}

src/main/java/com/rabbitmq/stream/codec/QpidProtonCodec.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ public Message decode(byte[] data) {
259259
createMessageAnnotations(message));
260260
}
261261

262-
protected Properties createProperties(org.apache.qpid.proton.message.Message message) {
262+
protected static Properties createProperties(org.apache.qpid.proton.message.Message message) {
263263
if (message.getProperties() != null) {
264264
return new QpidProtonProperties(message.getProperties());
265265
} else {
@@ -511,6 +511,15 @@ public Message annotate(String key, Object value) {
511511
this.messageAnnotations.put(key, value);
512512
return this;
513513
}
514+
515+
@Override
516+
public Message copy() {
517+
return new QpidProtonMessage(
518+
message,
519+
createProperties(message),
520+
createApplicationProperties(message),
521+
createMessageAnnotations(message));
522+
}
514523
}
515524

516525
static class QpidProtonAmqpMessageWrapper implements Message {
@@ -597,6 +606,27 @@ public Message annotate(String key, Object value) {
597606
annotations.getValue().put(Symbol.getSymbol(key), value);
598607
return this;
599608
}
609+
610+
@Override
611+
public Message copy() {
612+
org.apache.qpid.proton.message.Message copy =
613+
org.apache.qpid.proton.message.Message.Factory.create();
614+
copy.setProperties(this.message.getProperties());
615+
copy.setBody(this.message.getBody());
616+
copy.setApplicationProperties(this.message.getApplicationProperties());
617+
if (this.message.getMessageAnnotations() != null) {
618+
Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
619+
Map<Symbol, Object> annotationCopy;
620+
if (annotations == null) {
621+
annotationCopy = null;
622+
} else {
623+
annotationCopy = new LinkedHashMap<>(annotations.size());
624+
annotationCopy.putAll(annotations);
625+
}
626+
copy.setMessageAnnotations(new MessageAnnotations(annotationCopy));
627+
}
628+
return new QpidProtonAmqpMessageWrapper(this.hasPublishingId, this.publishingId, copy);
629+
}
600630
}
601631

602632
// from

src/main/java/com/rabbitmq/stream/codec/SwiftMqCodec.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ public EncodedMessage encode(Message message) {
276276
outboundMessage.writeContent(output);
277277
return new EncodedMessage(output.getCount(), output.getBuffer());
278278
} catch (IOException e) {
279-
throw new StreamException("Error while writing AMQP 1.0 message to output stream");
279+
throw new StreamException("Error while writing AMQP 1.0 message to output stream", e);
280280
}
281281
}
282282

@@ -355,7 +355,7 @@ protected Message createMessage(byte[] data) {
355355
try {
356356
amqpMessage = new AMQPMessage(data);
357357
} catch (Exception e) {
358-
throw new StreamException("Error while decoding AMQP 1.0 message");
358+
throw new StreamException("Error while decoding AMQP 1.0 message", e);
359359
}
360360

361361
Object body = extractBody(amqpMessage);
@@ -664,9 +664,31 @@ public Message annotate(String key, Object value) {
664664
map.put(new AMQPSymbol(key), convertToSwiftMqType(value));
665665
annotations.setValue(map);
666666
} catch (IOException e) {
667-
throw new StreamException("Error while annotating SwiftMQ message");
667+
throw new StreamException("Error while annotating SwiftMQ message", e);
668668
}
669669
return this;
670670
}
671+
672+
@Override
673+
public Message copy() {
674+
AMQPMessage copy = new AMQPMessage();
675+
copy.setProperties(this.message.getProperties());
676+
if (this.message.getData() != null) {
677+
this.message.getData().forEach(copy::addData);
678+
}
679+
copy.setApplicationProperties(this.message.getApplicationProperties());
680+
MessageAnnotations annotations = this.message.getMessageAnnotations();
681+
if (annotations != null) {
682+
Map<AMQPType, AMQPType> annotationCopy = null;
683+
try {
684+
annotationCopy = new LinkedHashMap<>(annotations.getValue().size());
685+
annotationCopy.putAll(annotations.getValue());
686+
copy.setMessageAnnotations(new MessageAnnotations(annotationCopy));
687+
} catch (IOException e) {
688+
throw new StreamException("Error while copying SwiftMQ message annotations", e);
689+
}
690+
}
691+
return new SwiftMqAmqpMessageWrapper(this.hasPublishingId, this.publishingId, copy);
692+
}
671693
}
672694
}

src/main/java/com/rabbitmq/stream/impl/SuperStreamProducer.java

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,7 @@
1515

1616
import static com.rabbitmq.stream.impl.Utils.namedFunction;
1717

18-
import com.rabbitmq.stream.Codec;
19-
import com.rabbitmq.stream.ConfirmationHandler;
20-
import com.rabbitmq.stream.ConfirmationStatus;
21-
import com.rabbitmq.stream.Constants;
22-
import com.rabbitmq.stream.Message;
23-
import com.rabbitmq.stream.MessageBuilder;
24-
import com.rabbitmq.stream.Producer;
25-
import com.rabbitmq.stream.RoutingStrategy;
18+
import com.rabbitmq.stream.*;
2619
import com.rabbitmq.stream.RoutingStrategy.Metadata;
2720
import java.util.List;
2821
import java.util.Map;
@@ -47,6 +40,13 @@ class SuperStreamProducer implements Producer {
4740
private final Metadata superStreamMetadata;
4841
private final AtomicBoolean closed = new AtomicBoolean(false);
4942

43+
/**
44+
* passthrough, except when observation collector is not no-op and a message must be sent to
45+
* several streams. In this it creates a copy of the message with distinct message annotations for
46+
* each message, so that the collector can populate these messages annotations without collision
47+
*/
48+
private final MessageInterceptor messageInterceptor;
49+
5050
SuperStreamProducer(
5151
StreamProducerBuilder producerBuilder,
5252
String name,
@@ -62,6 +62,10 @@ class SuperStreamProducer implements Producer {
6262
this.producerBuilder = producerBuilder.duplicate();
6363
this.producerBuilder.stream(null);
6464
this.producerBuilder.resetRouting();
65+
this.messageInterceptor =
66+
environment.observationCollector().isNoop()
67+
? (i, msg) -> msg
68+
: (i, msg) -> i == 0 ? msg : msg.copy();
6569
}
6670

6771
@Override
@@ -111,17 +115,12 @@ public void send(Message message, ConfirmationHandler confirmationHandler) {
111115
if (streams.isEmpty()) {
112116
confirmationHandler.handle(
113117
new ConfirmationStatus(message, false, Constants.CODE_NO_ROUTE_FOUND));
118+
} else if (streams.size() == 1) {
119+
producer(streams.get(0)).send(message, confirmationHandler);
114120
} else {
115-
for (String stream : streams) {
116-
Producer producer =
117-
producers.computeIfAbsent(
118-
stream,
119-
stream1 -> {
120-
Producer p =
121-
producerBuilder.duplicate().superStream(null).stream(stream1).build();
122-
return p;
123-
});
124-
producer.send(message, confirmationHandler);
121+
for (int i = 0; i < streams.size(); i++) {
122+
Producer producer = producer(streams.get(i));
123+
producer(streams.get(i)).send(messageInterceptor.apply(i, message), confirmationHandler);
125124
}
126125
}
127126
} else {
@@ -130,6 +129,15 @@ public void send(Message message, ConfirmationHandler confirmationHandler) {
130129
}
131130
}
132131

132+
private Producer producer(String stream) {
133+
return producers.computeIfAbsent(
134+
stream,
135+
stream1 -> {
136+
Producer p = producerBuilder.duplicate().superStream(null).stream(stream1).build();
137+
return p;
138+
});
139+
}
140+
133141
private boolean canSend() {
134142
return !this.closed.get();
135143
}
@@ -191,4 +199,10 @@ public List<String> route(String routingKey) {
191199
routingKey1)));
192200
}
193201
}
202+
203+
@FunctionalInterface
204+
private interface MessageInterceptor {
205+
206+
Message apply(int partitionIndex, Message message);
207+
}
194208
}

src/test/java/com/rabbitmq/stream/codec/CodecsTest.java

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.function.Consumer;
4040
import java.util.function.Function;
4141
import java.util.function.Supplier;
42+
import java.util.function.UnaryOperator;
4243
import java.util.stream.Stream;
4344
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
4445
import org.assertj.core.api.InstanceOfAssertFactories;
@@ -57,8 +58,19 @@ static Iterable<CodecCouple> codecsCouples() {
5758
List<CodecCouple> couples = new ArrayList<>();
5859
for (Codec serializer : codecs) {
5960
for (Codec deserializer : codecs) {
60-
couples.add(new CodecCouple(serializer, deserializer, () -> serializer.messageBuilder()));
61-
couples.add(new CodecCouple(serializer, deserializer, () -> new WrapperMessageBuilder()));
61+
couples.add(new CodecCouple(serializer, deserializer, serializer::messageBuilder));
62+
couples.add(new CodecCouple(serializer, deserializer, WrapperMessageBuilder::new));
63+
}
64+
}
65+
return couples;
66+
}
67+
68+
static Iterable<CodecCouple> codecsCombinations() {
69+
List<Codec> codecs = asList(new QpidProtonCodec(), new SwiftMqCodec());
70+
List<CodecCouple> couples = new ArrayList<>();
71+
for (Codec serializer : codecs) {
72+
for (Codec deserializer : codecs) {
73+
couples.add(new CodecCouple(serializer, deserializer, serializer::messageBuilder));
6274
}
6375
}
6476
return couples;
@@ -559,6 +571,58 @@ void publishingIdShouldNotBeSetOnMessageIfNotSetOnMessageBuilder(MessageBuilder
559571
assertThat(message.getPublishingId()).isEqualTo(0);
560572
}
561573

574+
@ParameterizedTest
575+
@MethodSource("codecsCombinations")
576+
void copy(CodecCouple codecCouple) {
577+
Codec serializer = codecCouple.serializer;
578+
Codec deserializer = codecCouple.deserializer;
579+
byte[] body = "hello".getBytes(StandardCharsets.UTF_8);
580+
581+
Message message =
582+
serializer
583+
.messageBuilder()
584+
.addData(body)
585+
.messageAnnotations()
586+
.entry("foo", "bar")
587+
.messageBuilder()
588+
.build();
589+
Message copy = message.copy();
590+
591+
message.annotate("original", "original value");
592+
copy.annotate("copy", "copy value");
593+
594+
assertThat(message.getMessageAnnotations())
595+
.hasSize(2)
596+
.containsEntry("foo", "bar")
597+
.containsEntry("original", "original value");
598+
599+
assertThat(copy.getMessageAnnotations())
600+
.hasSize(2)
601+
.containsEntry("foo", "bar")
602+
.containsEntry("copy", "copy value");
603+
604+
UnaryOperator<Message> encodeDecode =
605+
msg -> {
606+
EncodedMessage encoded = serializer.encode(msg);
607+
byte[] encodedData = new byte[encoded.getSize()];
608+
System.arraycopy(encoded.getData(), 0, encodedData, 0, encoded.getSize());
609+
return deserializer.decode(encodedData);
610+
};
611+
612+
message = encodeDecode.apply(message);
613+
614+
assertThat(message.getMessageAnnotations())
615+
.hasSize(2)
616+
.containsEntry("foo", "bar")
617+
.containsEntry("original", "original value");
618+
619+
copy = encodeDecode.apply(copy);
620+
assertThat(copy.getMessageAnnotations())
621+
.hasSize(2)
622+
.containsEntry("foo", "bar")
623+
.containsEntry("copy", "copy value");
624+
}
625+
562626
MessageTestConfiguration test(
563627
Function<MessageBuilder, MessageBuilder> messageOperation,
564628
Consumer<Message> messageExpectation) {

0 commit comments

Comments
 (0)