Skip to content

Commit b58ce23

Browse files
committed
Support super stream creation/deletion
1 parent feecf46 commit b58ce23

File tree

2 files changed

+143
-71
lines changed

2 files changed

+143
-71
lines changed

src/main/java/com/rabbitmq/stream/Constants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public final class Constants {
7272
public static final short COMMAND_CONSUMER_UPDATE = 26;
7373
public static final short COMMAND_EXCHANGE_COMMAND_VERSIONS = 27;
7474
public static final short COMMAND_STREAM_STATS = 28;
75+
public static final short COMMAND_CREATE_SUPER_STREAM = 29;
76+
public static final short COMMAND_DELETE_SUPER_STREAM = 30;
7577

7678
public static final short VERSION_1 = 1;
7779
public static final short VERSION_2 = 2;

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 141 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import static com.rabbitmq.stream.impl.Utils.noOpConsumer;
2323
import static java.lang.String.format;
2424
import static java.lang.String.join;
25+
import static java.util.Arrays.asList;
2526
import static java.util.concurrent.TimeUnit.SECONDS;
27+
import static java.util.stream.StreamSupport.stream;
2628

2729
import com.rabbitmq.stream.AuthenticationFailureException;
2830
import com.rabbitmq.stream.ByteCapacity;
@@ -83,16 +85,10 @@
8385
import java.net.ConnectException;
8486
import java.net.InetSocketAddress;
8587
import java.net.SocketAddress;
88+
import java.nio.charset.Charset;
8689
import java.nio.charset.StandardCharsets;
8790
import java.time.Duration;
88-
import java.util.ArrayList;
89-
import java.util.Collections;
90-
import java.util.HashMap;
91-
import java.util.HashSet;
92-
import java.util.List;
93-
import java.util.Map;
94-
import java.util.Objects;
95-
import java.util.Set;
91+
import java.util.*;
9692
import java.util.concurrent.ConcurrentHashMap;
9793
import java.util.concurrent.ConcurrentMap;
9894
import java.util.concurrent.CopyOnWriteArrayList;
@@ -127,6 +123,7 @@
127123
*/
128124
public class Client implements AutoCloseable {
129125

126+
private static final Charset CHARSET = StandardCharsets.UTF_8;
130127
public static final int DEFAULT_PORT = 5552;
131128
public static final int DEFAULT_TLS_PORT = 5551;
132129
static final OutboundEntityWriteCallback OUTBOUND_MESSAGE_WRITE_CALLBACK =
@@ -446,12 +443,7 @@ int maxFrameSize() {
446443
}
447444

448445
private Map<String, String> peerProperties() {
449-
int clientPropertiesSize = 4; // size of the map, always there
450-
if (!clientProperties.isEmpty()) {
451-
for (Map.Entry<String, String> entry : clientProperties.entrySet()) {
452-
clientPropertiesSize += 2 + entry.getKey().length() + 2 + entry.getValue().length();
453-
}
454-
}
446+
int clientPropertiesSize = mapSize(this.clientProperties);
455447
int length = 2 + 2 + 4 + clientPropertiesSize;
456448
int correlationId = correlationSequence.incrementAndGet();
457449
try {
@@ -460,13 +452,7 @@ private Map<String, String> peerProperties() {
460452
bb.writeShort(encodeRequestCode(COMMAND_PEER_PROPERTIES));
461453
bb.writeShort(VERSION_1);
462454
bb.writeInt(correlationId);
463-
bb.writeInt(clientProperties.size());
464-
for (Map.Entry<String, String> entry : clientProperties.entrySet()) {
465-
bb.writeShort(entry.getKey().length())
466-
.writeBytes(entry.getKey().getBytes(StandardCharsets.UTF_8))
467-
.writeShort(entry.getValue().length())
468-
.writeBytes(entry.getValue().getBytes(StandardCharsets.UTF_8));
469-
}
455+
writeMap(bb, this.clientProperties);
470456
OutstandingRequest<Map<String, String>> request = outstandingRequest();
471457
outstandingRequests.put(correlationId, request);
472458
channel.writeAndFlush(bb);
@@ -540,7 +526,7 @@ private SaslAuthenticateResponse sendSaslAuthenticate(
540526
bb.writeShort(VERSION_1);
541527
bb.writeInt(correlationId);
542528
bb.writeShort(saslMechanism.getName().length());
543-
bb.writeBytes(saslMechanism.getName().getBytes(StandardCharsets.UTF_8));
529+
bb.writeBytes(saslMechanism.getName().getBytes(CHARSET));
544530
if (challengeResponse == null) {
545531
bb.writeInt(-1);
546532
} else {
@@ -570,7 +556,7 @@ private Map<String, String> open(String virtualHost) {
570556
bb.writeShort(VERSION_1);
571557
bb.writeInt(correlationId);
572558
bb.writeShort(virtualHost.length());
573-
bb.writeBytes(virtualHost.getBytes(StandardCharsets.UTF_8));
559+
bb.writeBytes(virtualHost.getBytes(CHARSET));
574560
OutstandingRequest<OpenResponse> request = outstandingRequest();
575561
outstandingRequests.put(correlationId, request);
576562
channel.writeAndFlush(bb);
@@ -612,7 +598,7 @@ private void sendClose(short code, String reason) {
612598
bb.writeInt(correlationId);
613599
bb.writeShort(code);
614600
bb.writeShort(reason.length());
615-
bb.writeBytes(reason.getBytes(StandardCharsets.UTF_8));
601+
bb.writeBytes(reason.getBytes(CHARSET));
616602
OutstandingRequest<Response> request = outstandingRequest();
617603
outstandingRequests.put(correlationId, request);
618604
channel.writeAndFlush(bb);
@@ -662,10 +648,7 @@ public Response create(String stream) {
662648
}
663649

664650
public Response create(String stream, Map<String, String> arguments) {
665-
int length = 2 + 2 + 4 + 2 + stream.length() + 4;
666-
for (Map.Entry<String, String> argument : arguments.entrySet()) {
667-
length = length + 2 + argument.getKey().length() + 2 + argument.getValue().length();
668-
}
651+
int length = 2 + 2 + 4 + 2 + stream.length() + mapSize(arguments);
669652
int correlationId = correlationSequence.incrementAndGet();
670653
try {
671654
ByteBuf bb = allocate(length + 4);
@@ -674,14 +657,8 @@ public Response create(String stream, Map<String, String> arguments) {
674657
bb.writeShort(VERSION_1);
675658
bb.writeInt(correlationId);
676659
bb.writeShort(stream.length());
677-
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
678-
bb.writeInt(arguments.size());
679-
for (Map.Entry<String, String> argument : arguments.entrySet()) {
680-
bb.writeShort(argument.getKey().length());
681-
bb.writeBytes(argument.getKey().getBytes(StandardCharsets.UTF_8));
682-
bb.writeShort(argument.getValue().length());
683-
bb.writeBytes(argument.getValue().getBytes(StandardCharsets.UTF_8));
684-
}
660+
bb.writeBytes(stream.getBytes(CHARSET));
661+
writeMap(bb, arguments);
685662
OutstandingRequest<Response> request = outstandingRequest();
686663
outstandingRequests.put(correlationId, request);
687664
channel.writeAndFlush(bb);
@@ -696,6 +673,116 @@ public Response create(String stream, Map<String, String> arguments) {
696673
}
697674
}
698675

676+
Response createSuperStream(
677+
String superStream,
678+
List<String> partitions,
679+
List<String> routingKeys,
680+
Map<String, String> arguments) {
681+
if (partitions.isEmpty() || routingKeys.isEmpty()) {
682+
throw new IllegalArgumentException(
683+
"Partitions and routing keys of a super stream cannot be empty");
684+
}
685+
if (partitions.size() != routingKeys.size()) {
686+
throw new IllegalArgumentException(
687+
"Partitions and routing keys of a super stream must have "
688+
+ "the same number of elements");
689+
}
690+
int length =
691+
2
692+
+ 2
693+
+ 4
694+
+ 2
695+
+ superStream.length()
696+
+ collectionSize(partitions)
697+
+ collectionSize(routingKeys)
698+
+ mapSize(arguments);
699+
int correlationId = correlationSequence.incrementAndGet();
700+
try {
701+
ByteBuf bb = allocate(length + 4);
702+
bb.writeInt(length);
703+
bb.writeShort(encodeRequestCode(COMMAND_CREATE_SUPER_STREAM));
704+
bb.writeShort(VERSION_1);
705+
bb.writeInt(correlationId);
706+
bb.writeShort(superStream.length());
707+
bb.writeBytes(superStream.getBytes(CHARSET));
708+
writeCollection(bb, partitions);
709+
writeCollection(bb, routingKeys);
710+
writeMap(bb, arguments);
711+
OutstandingRequest<Response> request = outstandingRequest();
712+
outstandingRequests.put(correlationId, request);
713+
channel.writeAndFlush(bb);
714+
request.block();
715+
return request.response.get();
716+
} catch (StreamException e) {
717+
outstandingRequests.remove(correlationId);
718+
throw e;
719+
} catch (RuntimeException e) {
720+
outstandingRequests.remove(correlationId);
721+
throw new StreamException(format("Error while creating super stream '%s'", superStream), e);
722+
}
723+
}
724+
725+
Response deleteSuperStream(String superStream) {
726+
int length = 2 + 2 + 4 + 2 + superStream.length();
727+
int correlationId = correlationSequence.incrementAndGet();
728+
try {
729+
ByteBuf bb = allocate(length + 4);
730+
bb.writeInt(length);
731+
bb.writeShort(encodeRequestCode(COMMAND_DELETE_SUPER_STREAM));
732+
bb.writeShort(VERSION_1);
733+
bb.writeInt(correlationId);
734+
bb.writeShort(superStream.length());
735+
bb.writeBytes(superStream.getBytes(CHARSET));
736+
OutstandingRequest<Response> request = outstandingRequest();
737+
outstandingRequests.put(correlationId, request);
738+
channel.writeAndFlush(bb);
739+
request.block();
740+
return request.response.get();
741+
} catch (StreamException e) {
742+
outstandingRequests.remove(correlationId);
743+
throw e;
744+
} catch (RuntimeException e) {
745+
outstandingRequests.remove(correlationId);
746+
throw new StreamException(format("Error while deleting stream '%s'", superStream), e);
747+
}
748+
}
749+
750+
private static int collectionSize(Collection<String> elements) {
751+
return 4 + elements.stream().mapToInt(v -> 2 + v.length()).sum();
752+
}
753+
754+
private static int arraySize(String... elements) {
755+
return 4 + collectionSize(asList(elements));
756+
}
757+
758+
private static int mapSize(Map<String, String> elements) {
759+
return 4
760+
+ elements.entrySet().stream()
761+
.mapToInt(e -> 2 + e.getKey().length() + 2 + e.getValue().length())
762+
.sum();
763+
}
764+
765+
private static ByteBuf writeCollection(ByteBuf bb, Collection<String> elements) {
766+
bb.writeInt(elements.size());
767+
elements.forEach(e -> bb.writeShort(e.length()).writeBytes(e.getBytes(CHARSET)));
768+
return bb;
769+
}
770+
771+
private static ByteBuf writeArray(ByteBuf bb, String... elements) {
772+
return writeCollection(bb, asList(elements));
773+
}
774+
775+
private static ByteBuf writeMap(ByteBuf bb, Map<String, String> elements) {
776+
bb.writeInt(elements.size());
777+
elements.forEach(
778+
(key, value) ->
779+
bb.writeShort(key.length())
780+
.writeBytes(key.getBytes(CHARSET))
781+
.writeShort(value.length())
782+
.writeBytes(value.getBytes(CHARSET)));
783+
return bb;
784+
}
785+
699786
ByteBuf allocate(ByteBufAllocator allocator, int capacity) {
700787
if (frameSizeCopped && capacity > this.maxFrameSize()) {
701788
throw new IllegalArgumentException(
@@ -729,7 +816,7 @@ public Response delete(String stream) {
729816
bb.writeShort(VERSION_1);
730817
bb.writeInt(correlationId);
731818
bb.writeShort(stream.length());
732-
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
819+
bb.writeBytes(stream.getBytes(CHARSET));
733820
OutstandingRequest<Response> request = outstandingRequest();
734821
outstandingRequests.put(correlationId, request);
735822
channel.writeAndFlush(bb);
@@ -748,23 +835,15 @@ public Map<String, StreamMetadata> metadata(String... streams) {
748835
if (streams == null || streams.length == 0) {
749836
throw new IllegalArgumentException("At least one stream must be specified");
750837
}
751-
int length = 2 + 2 + 4 + 4; // API code, version, correlation ID, size of array
752-
for (String stream : streams) {
753-
length += 2;
754-
length += stream.length();
755-
}
838+
int length = 2 + 2 + 4 + arraySize(streams); // API code, version, correlation ID, array size
756839
int correlationId = correlationSequence.incrementAndGet();
757840
try {
758841
ByteBuf bb = allocate(length + 4);
759842
bb.writeInt(length);
760843
bb.writeShort(encodeRequestCode(COMMAND_METADATA));
761844
bb.writeShort(VERSION_1);
762845
bb.writeInt(correlationId);
763-
bb.writeInt(streams.length);
764-
for (String stream : streams) {
765-
bb.writeShort(stream.length());
766-
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
767-
}
846+
writeArray(bb, streams);
768847
OutstandingRequest<Map<String, StreamMetadata>> request = outstandingRequest();
769848
outstandingRequests.put(correlationId, request);
770849
channel.writeAndFlush(bb);
@@ -800,10 +879,10 @@ public Response declarePublisher(byte publisherId, String publisherReference, St
800879
bb.writeByte(publisherId);
801880
bb.writeShort(publisherReferenceSize);
802881
if (publisherReferenceSize > 0) {
803-
bb.writeBytes(publisherReference.getBytes(StandardCharsets.UTF_8));
882+
bb.writeBytes(publisherReference.getBytes(CHARSET));
804883
}
805884
bb.writeShort(stream.length());
806-
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
885+
bb.writeBytes(stream.getBytes(CHARSET));
807886
OutstandingRequest<Response> request = outstandingRequest();
808887
outstandingRequests.put(correlationId, request);
809888
channel.writeAndFlush(bb);
@@ -1142,10 +1221,7 @@ public Response subscribe(
11421221
}
11431222
int propertiesSize = 0;
11441223
if (properties != null && !properties.isEmpty()) {
1145-
propertiesSize = 4; // size of the map
1146-
for (Map.Entry<String, String> entry : properties.entrySet()) {
1147-
propertiesSize += 2 + entry.getKey().length() + 2 + entry.getValue().length();
1148-
}
1224+
propertiesSize = mapSize(properties);
11491225
}
11501226
length += propertiesSize;
11511227
int correlationId = correlationSequence.getAndIncrement();
@@ -1157,20 +1233,14 @@ public Response subscribe(
11571233
bb.writeInt(correlationId);
11581234
bb.writeByte(subscriptionId);
11591235
bb.writeShort(stream.length());
1160-
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
1236+
bb.writeBytes(stream.getBytes(CHARSET));
11611237
bb.writeShort(offsetSpecification.getType());
11621238
if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
11631239
bb.writeLong(offsetSpecification.getOffset());
11641240
}
11651241
bb.writeShort(initialCredits);
11661242
if (properties != null && !properties.isEmpty()) {
1167-
bb.writeInt(properties.size());
1168-
for (Map.Entry<String, String> entry : properties.entrySet()) {
1169-
bb.writeShort(entry.getKey().length())
1170-
.writeBytes(entry.getKey().getBytes(StandardCharsets.UTF_8))
1171-
.writeShort(entry.getValue().length())
1172-
.writeBytes(entry.getValue().getBytes(StandardCharsets.UTF_8));
1173-
}
1243+
writeMap(bb, properties);
11741244
}
11751245
OutstandingRequest<Response> request = outstandingRequest();
11761246
outstandingRequests.put(correlationId, request);
@@ -1205,9 +1275,9 @@ public void storeOffset(String reference, String stream, long offset) {
12051275
bb.writeShort(encodeRequestCode(COMMAND_STORE_OFFSET));
12061276
bb.writeShort(VERSION_1);
12071277
bb.writeShort(reference.length());
1208-
bb.writeBytes(reference.getBytes(StandardCharsets.UTF_8));
1278+
bb.writeBytes(reference.getBytes(CHARSET));
12091279
bb.writeShort(stream.length());
1210-
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
1280+
bb.writeBytes(stream.getBytes(CHARSET));
12111281
bb.writeLong(offset);
12121282
channel.writeAndFlush(bb);
12131283
}
@@ -1230,9 +1300,9 @@ public QueryOffsetResponse queryOffset(String reference, String stream) {
12301300
bb.writeShort(VERSION_1);
12311301
bb.writeInt(correlationId);
12321302
bb.writeShort(reference.length());
1233-
bb.writeBytes(reference.getBytes(StandardCharsets.UTF_8));
1303+
bb.writeBytes(reference.getBytes(CHARSET));
12341304
bb.writeShort(stream.length());
1235-
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
1305+
bb.writeBytes(stream.getBytes(CHARSET));
12361306
OutstandingRequest<QueryOffsetResponse> request = outstandingRequest();
12371307
outstandingRequests.put(correlationId, request);
12381308
channel.writeAndFlush(bb);
@@ -1271,9 +1341,9 @@ public long queryPublisherSequence(String publisherReference, String stream) {
12711341
bb.writeShort(VERSION_1);
12721342
bb.writeInt(correlationId);
12731343
bb.writeShort(publisherReference.length());
1274-
bb.writeBytes(publisherReference.getBytes(StandardCharsets.UTF_8));
1344+
bb.writeBytes(publisherReference.getBytes(CHARSET));
12751345
bb.writeShort(stream.length());
1276-
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
1346+
bb.writeBytes(stream.getBytes(CHARSET));
12771347
OutstandingRequest<QueryPublisherSequenceResponse> request = outstandingRequest();
12781348
outstandingRequests.put(correlationId, request);
12791349
channel.writeAndFlush(bb);
@@ -1436,9 +1506,9 @@ public List<String> route(String routingKey, String superStream) {
14361506
bb.writeShort(VERSION_1);
14371507
bb.writeInt(correlationId);
14381508
bb.writeShort(routingKey.length());
1439-
bb.writeBytes(routingKey.getBytes(StandardCharsets.UTF_8));
1509+
bb.writeBytes(routingKey.getBytes(CHARSET));
14401510
bb.writeShort(superStream.length());
1441-
bb.writeBytes(superStream.getBytes(StandardCharsets.UTF_8));
1511+
bb.writeBytes(superStream.getBytes(CHARSET));
14421512
OutstandingRequest<List<String>> request = outstandingRequest();
14431513
outstandingRequests.put(correlationId, request);
14441514
channel.writeAndFlush(bb);
@@ -1471,7 +1541,7 @@ public List<String> partitions(String superStream) {
14711541
bb.writeShort(VERSION_1);
14721542
bb.writeInt(correlationId);
14731543
bb.writeShort(superStream.length());
1474-
bb.writeBytes(superStream.getBytes(StandardCharsets.UTF_8));
1544+
bb.writeBytes(superStream.getBytes(CHARSET));
14751545
OutstandingRequest<List<String>> request = outstandingRequest();
14761546
outstandingRequests.put(correlationId, request);
14771547
channel.writeAndFlush(bb);
@@ -1532,7 +1602,7 @@ StreamStatsResponse streamStats(String stream) {
15321602
bb.writeShort(VERSION_1);
15331603
bb.writeInt(correlationId);
15341604
bb.writeShort(stream.length());
1535-
bb.writeBytes(stream.getBytes(StandardCharsets.UTF_8));
1605+
bb.writeBytes(stream.getBytes(CHARSET));
15361606
OutstandingRequest<StreamStatsResponse> request = outstandingRequest();
15371607
outstandingRequests.put(correlationId, request);
15381608
channel.writeAndFlush(bb);

0 commit comments

Comments
 (0)