Skip to content

Commit b84fc52

Browse files
committed
Add test for super stream creation/deletion
References rabbitmq/rabbitmq-server#9813
1 parent 7f8afbe commit b84fc52

File tree

13 files changed

+295
-204
lines changed

13 files changed

+295
-204
lines changed

.github/workflows/test-pr.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ jobs:
2424
cache: 'maven'
2525
- name: Start broker
2626
run: ci/start-broker.sh
27+
env:
28+
RABBITMQ_IMAGE: 'pivotalrabbitmq/rabbitmq:super-stream-frames-otp-max-bazel'
2729
- name: Test
2830
run: |
2931
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,7 @@ Response createSuperStream(
687687
"Partitions and routing keys of a super stream must have "
688688
+ "the same number of elements");
689689
}
690+
arguments = arguments == null ? Collections.emptyMap() : arguments;
690691
int length =
691692
2
692693
+ 2
@@ -831,6 +832,10 @@ public Response delete(String stream) {
831832
}
832833
}
833834

835+
Map<String, StreamMetadata> metadata(List<String> streams) {
836+
return this.metadata(streams.toArray(new String[] {}));
837+
}
838+
834839
public Map<String, StreamMetadata> metadata(String... streams) {
835840
if (streams == null || streams.length == 0) {
836841
throw new IllegalArgumentException("At least one stream must be specified");

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,37 +13,7 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16-
import static com.rabbitmq.stream.Constants.COMMAND_CLOSE;
17-
import static com.rabbitmq.stream.Constants.COMMAND_CONSUMER_UPDATE;
18-
import static com.rabbitmq.stream.Constants.COMMAND_CREATE_STREAM;
19-
import static com.rabbitmq.stream.Constants.COMMAND_CREDIT;
20-
import static com.rabbitmq.stream.Constants.COMMAND_DECLARE_PUBLISHER;
21-
import static com.rabbitmq.stream.Constants.COMMAND_DELETE_PUBLISHER;
22-
import static com.rabbitmq.stream.Constants.COMMAND_DELETE_STREAM;
23-
import static com.rabbitmq.stream.Constants.COMMAND_DELIVER;
24-
import static com.rabbitmq.stream.Constants.COMMAND_EXCHANGE_COMMAND_VERSIONS;
25-
import static com.rabbitmq.stream.Constants.COMMAND_HEARTBEAT;
26-
import static com.rabbitmq.stream.Constants.COMMAND_METADATA;
27-
import static com.rabbitmq.stream.Constants.COMMAND_METADATA_UPDATE;
28-
import static com.rabbitmq.stream.Constants.COMMAND_OPEN;
29-
import static com.rabbitmq.stream.Constants.COMMAND_PARTITIONS;
30-
import static com.rabbitmq.stream.Constants.COMMAND_PEER_PROPERTIES;
31-
import static com.rabbitmq.stream.Constants.COMMAND_PUBLISH_CONFIRM;
32-
import static com.rabbitmq.stream.Constants.COMMAND_PUBLISH_ERROR;
33-
import static com.rabbitmq.stream.Constants.COMMAND_QUERY_OFFSET;
34-
import static com.rabbitmq.stream.Constants.COMMAND_QUERY_PUBLISHER_SEQUENCE;
35-
import static com.rabbitmq.stream.Constants.COMMAND_ROUTE;
36-
import static com.rabbitmq.stream.Constants.COMMAND_SASL_AUTHENTICATE;
37-
import static com.rabbitmq.stream.Constants.COMMAND_SASL_HANDSHAKE;
38-
import static com.rabbitmq.stream.Constants.COMMAND_STREAM_STATS;
39-
import static com.rabbitmq.stream.Constants.COMMAND_SUBSCRIBE;
40-
import static com.rabbitmq.stream.Constants.COMMAND_TUNE;
41-
import static com.rabbitmq.stream.Constants.COMMAND_UNSUBSCRIBE;
42-
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_OK;
43-
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_SASL_CHALLENGE;
44-
import static com.rabbitmq.stream.Constants.RESPONSE_CODE_STREAM_NOT_AVAILABLE;
45-
import static com.rabbitmq.stream.Constants.VERSION_1;
46-
import static com.rabbitmq.stream.Constants.VERSION_2;
16+
import static com.rabbitmq.stream.Constants.*;
4717
import static com.rabbitmq.stream.impl.Utils.encodeResponseCode;
4818

4919
import com.rabbitmq.stream.ChunkChecksum;
@@ -128,6 +98,8 @@ class ServerFrameHandler {
12898
handlers.put(COMMAND_CONSUMER_UPDATE, new ConsumerUpdateFrameHandler());
12999
handlers.put(COMMAND_EXCHANGE_COMMAND_VERSIONS, new ExchangeCommandVersionsFrameHandler());
130100
handlers.put(COMMAND_STREAM_STATS, new StreamStatsFrameHandler());
101+
handlers.put(COMMAND_CREATE_SUPER_STREAM, RESPONSE_FRAME_HANDLER);
102+
handlers.put(COMMAND_DELETE_SUPER_STREAM, RESPONSE_FRAME_HANDLER);
131103
HANDLERS = new FrameHandler[maxCommandKey + 1][];
132104
handlers
133105
.entrySet()

src/test/java/com/rabbitmq/stream/Host.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.rabbitmq.stream;
1515

1616
import static java.lang.String.format;
17+
import static java.util.Arrays.asList;
1718

1819
import com.google.common.reflect.TypeToken;
1920
import com.google.gson.Gson;
@@ -151,12 +152,24 @@ public static void addUser(String username, String password) throws IOException
151152
rabbitmqctl(format("add_user %s %s", username, password));
152153
}
153154

155+
public static void setPermissions(String username, List<String> permissions) throws IOException {
156+
setPermissions(username, "/", permissions);
157+
}
158+
154159
public static void setPermissions(String username, String vhost, String permission)
155160
throws IOException {
161+
setPermissions(username, vhost, asList(permission, permission, permission));
162+
}
163+
164+
public static void setPermissions(String username, String vhost, List<String> permissions)
165+
throws IOException {
166+
if (permissions.size() != 3) {
167+
throw new IllegalArgumentException();
168+
}
156169
rabbitmqctl(
157170
format(
158171
"set_permissions --vhost %s %s '%s' '%s' '%s'",
159-
vhost, username, permission, permission, permission));
172+
vhost, username, permissions.get(0), permissions.get(1), permissions.get(2)));
160173
}
161174

162175
public static void changePassword(String username, String newPassword) throws IOException {
@@ -180,7 +193,8 @@ public static void setEnv(String parameter, String value) throws IOException {
180193
}
181194

182195
public static String rabbitmqctlCommand() {
183-
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
196+
String rabbitmqCtl = "/home/acogoluegnes/prog/rabbitmq/rabbitmq-server/sbin/rabbitmqctl";
197+
// String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
184198
if (rabbitmqCtl == null) {
185199
throw new IllegalStateException("Please define the rabbitmqctl.bin system property");
186200
}

src/test/java/com/rabbitmq/stream/impl/FilteringTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class FilteringTest {
5858
Environment environment;
5959

6060
String stream;
61+
ClientFactory cf;
6162

6263
@BeforeEach
6364
void init() throws Exception {
@@ -278,9 +279,9 @@ void superStream(TestInfo info) throws Exception {
278279
() -> {
279280
String superStream = TestUtils.streamName(info);
280281
int partitionCount = 3;
281-
Connection connection = new ConnectionFactory().newConnection();
282+
Client configurationClient = cf.get();
282283
try {
283-
TestUtils.declareSuperStreamTopology(connection, superStream, partitionCount);
284+
declareSuperStreamTopology(configurationClient, superStream, partitionCount);
284285

285286
Producer producer =
286287
environment
@@ -362,8 +363,7 @@ void superStream(TestInfo info) throws Exception {
362363
CONDITION_TIMEOUT, () -> filteredConsumedMessageCount.get() == expectedCount);
363364
assertThat(receivedMessageCount).hasValueLessThan(messageCount * 2);
364365
} finally {
365-
TestUtils.deleteSuperStreamTopology(connection, superStream, partitionCount);
366-
connection.close();
366+
deleteSuperStreamTopology(configurationClient, superStream);
367367
}
368368
});
369369
}

src/test/java/com/rabbitmq/stream/impl/RoutePartitionsTest.java

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,20 @@ public class RoutePartitionsTest {
3636

3737
TestUtils.ClientFactory cf;
3838

39-
Connection connection;
39+
Client configurationClient;
4040

4141
int partitions = 3;
4242
String superStream;
4343

4444
@BeforeEach
45-
void init(TestInfo info) throws Exception {
46-
connection = new ConnectionFactory().newConnection();
45+
void init(TestInfo info) {
46+
configurationClient = cf.get();
4747
superStream = TestUtils.streamName(info);
4848
}
4949

5050
@AfterEach
51-
void tearDown() throws Exception {
52-
deleteSuperStreamTopology(connection, superStream, partitions);
53-
connection.close();
51+
void tearDown() {
52+
deleteSuperStreamTopology(configurationClient, superStream);
5453
}
5554

5655
@Test
@@ -64,25 +63,23 @@ void partitionsShouldReturnEmptyListWhenExchangeDoesNotExist() {
6463
}
6564

6665
@Test
67-
void routeShouldReturnNullWhenNoStreamForRoutingKey() throws Exception {
68-
declareSuperStreamTopology(connection, superStream, partitions);
66+
void routeShouldReturnNullWhenNoStreamForRoutingKey() {
67+
declareSuperStreamTopology(configurationClient, superStream, partitions);
6968

7069
Client client = cf.get();
7170
assertThat(client.route("0", superStream)).hasSize(1).contains(superStream + "-0");
7271
assertThat(client.route("42", superStream)).isEmpty();
7372
}
7473

7574
@Test
76-
void partitionsShouldReturnEmptyListWhenThereIsNoBinding() throws Exception {
77-
declareSuperStreamTopology(connection, superStream, 0);
78-
75+
void partitionsShouldReturnEmptyListWhenSuperStreamDoesNotExist() {
7976
Client client = cf.get();
8077
assertThat(client.partitions(superStream)).isEmpty();
8178
}
8279

8380
@Test
84-
void routeTopologyWithPartitionCount() throws Exception {
85-
declareSuperStreamTopology(connection, superStream, 3);
81+
void routeTopologyWithPartitionCount() {
82+
declareSuperStreamTopology(configurationClient, superStream, 3);
8683

8784
Client client = cf.get();
8885
List<String> streams = client.partitions(superStream);
@@ -97,8 +94,10 @@ void routeTopologyWithPartitionCount() throws Exception {
9794

9895
@Test
9996
void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception {
100-
declareSuperStreamTopology(connection, superStream, 3);
101-
connection.createChannel().queueBind(superStream + "-1", superStream, "0");
97+
declareSuperStreamTopology(configurationClient, superStream, 3);
98+
try (Connection connection = new ConnectionFactory().newConnection()) {
99+
connection.createChannel().queueBind(superStream + "-1", superStream, "0");
100+
}
102101
Client client = cf.get();
103102
List<String> streams = client.partitions(superStream);
104103
assertThat(streams)
@@ -114,10 +113,12 @@ void routeReturnsMultipleStreamsIfMultipleBindingsForSameKey() throws Exception
114113

115114
@Test
116115
void partitionsAndRouteShouldNotReturnNonStreamQueue() throws Exception {
117-
declareSuperStreamTopology(connection, superStream, 3);
118-
Channel channel = connection.createChannel();
119-
String nonStreamQueue = channel.queueDeclare().getQueue();
120-
connection.createChannel().queueBind(nonStreamQueue, superStream, "not-a-stream");
116+
declareSuperStreamTopology(configurationClient, superStream, 3);
117+
try (Connection connection = new ConnectionFactory().newConnection()) {
118+
Channel channel = connection.createChannel();
119+
String nonStreamQueue = channel.queueDeclare().getQueue();
120+
connection.createChannel().queueBind(nonStreamQueue, superStream, "not-a-stream");
121+
}
121122
Client client = cf.get();
122123
List<String> streams = client.partitions(superStream);
123124
assertThat(streams)
@@ -131,9 +132,9 @@ void partitionsAndRouteShouldNotReturnNonStreamQueue() throws Exception {
131132
}
132133

133134
@Test
134-
void partitionsReturnsCorrectOrder() throws Exception {
135+
void partitionsReturnsCorrectOrder() {
135136
String[] partitionNames = {"z", "y", "x"};
136-
declareSuperStreamTopology(connection, superStream, partitionNames);
137+
declareSuperStreamTopology(configurationClient, superStream, partitionNames);
137138
try {
138139
Client client = cf.get();
139140
List<String> streams = client.partitions(superStream);
@@ -142,7 +143,7 @@ void partitionsReturnsCorrectOrder() throws Exception {
142143
.containsSequence(
143144
Arrays.stream(partitionNames).map(p -> superStream + "-" + p).toArray(String[]::new));
144145
} finally {
145-
deleteSuperStreamTopology(connection, superStream, partitionNames);
146+
deleteSuperStreamTopology(configurationClient, superStream);
146147
}
147148
}
148149
}

src/test/java/com/rabbitmq/stream/impl/SacClientTest.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import static java.util.stream.Collectors.toList;
2727
import static org.assertj.core.api.Assertions.assertThat;
2828

29-
import com.rabbitmq.client.Connection;
30-
import com.rabbitmq.client.ConnectionFactory;
3129
import com.rabbitmq.stream.Constants;
3230
import com.rabbitmq.stream.Host;
3331
import com.rabbitmq.stream.OffsetSpecification;
@@ -230,9 +228,9 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info)
230228
Map<Byte, AtomicInteger> receivedMessages = receivedMessages(2);
231229
String superStream = streamName(info);
232230
String consumerName = "foo";
233-
Connection c = new ConnectionFactory().newConnection();
231+
Client configurationClient = cf.get();
234232
try {
235-
TestUtils.declareSuperStreamTopology(c, superStream, 3);
233+
declareSuperStreamTopology(configurationClient, superStream, 3);
236234
// working with the second partition
237235
String partition = superStream + "-1";
238236

@@ -330,8 +328,7 @@ void singleActiveConsumerShouldRolloverWhenAnotherJoinsPartition(TestInfo info)
330328
.isEqualTo(messageCount * 2);
331329

332330
} finally {
333-
TestUtils.deleteSuperStreamTopology(c, superStream, 3);
334-
c.close();
331+
deleteSuperStreamTopology(configurationClient, superStream);
335332
}
336333
}
337334

@@ -340,11 +337,11 @@ void singleActiveConsumersShouldSpreadOnSuperStreamPartitions(TestInfo info) thr
340337
Map<Byte, Boolean> consumerStates = consumerStates(3 * 3);
341338
String superStream = streamName(info);
342339
String consumerName = "foo";
343-
Connection c = new ConnectionFactory().newConnection();
340+
Client configurationClient = cf.get();
344341
// subscription distribution
345342
// client 1: 0, 1, 2 / client 2: 3, 4, 5, / client 3: 6, 7, 8
346343
try {
347-
declareSuperStreamTopology(c, superStream, 3);
344+
declareSuperStreamTopology(configurationClient, superStream, 3);
348345
List<String> partitions =
349346
IntStream.range(0, 3).mapToObj(i -> superStream + "-" + i).collect(toList());
350347
ConsumerUpdateListener consumerUpdateListener =
@@ -422,7 +419,7 @@ void singleActiveConsumersShouldSpreadOnSuperStreamPartitions(TestInfo info) thr
422419
client.set(client3);
423420
partitions.forEach(unsubscriptionCallback);
424421
} finally {
425-
deleteSuperStreamTopology(c, superStream, 3);
422+
deleteSuperStreamTopology(configurationClient, superStream);
426423
}
427424
}
428425

@@ -485,10 +482,10 @@ void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Excep
485482
Map<Byte, Boolean> consumerStates = consumerStates(3 * 3);
486483
String superStream = streamName(info);
487484
String consumerName = "foo";
488-
Connection c = new ConnectionFactory().newConnection();
485+
Client configurationClient = cf.get();
489486
AtomicBoolean keepPublishing = new AtomicBoolean(true);
490487
try {
491-
declareSuperStreamTopology(c, superStream, 3);
488+
declareSuperStreamTopology(configurationClient, superStream, 3);
492489
// we use the second partition because a rebalancing occurs
493490
// when the second consumer joins
494491
String partitionInUse = superStream + "-1";
@@ -584,8 +581,7 @@ void superStreamRebalancingShouldWorkWhilePublishing(TestInfo info) throws Excep
584581
assertThat(response).is(ok());
585582
} finally {
586583
keepPublishing.set(false);
587-
deleteSuperStreamTopology(c, superStream, 3);
588-
c.close();
584+
deleteSuperStreamTopology(configurationClient, superStream);
589585
}
590586
}
591587

0 commit comments

Comments
 (0)