Skip to content

Commit 2eb9b5f

Browse files
committed
Make sure partitions and binding keys counts are equal
1 parent 9beda57 commit 2eb9b5f

File tree

3 files changed

+29
-10
lines changed

3 files changed

+29
-10
lines changed

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ handle_call({create_super_stream,
229229
BindingKeys,
230230
Username},
231231
_From, State) ->
232-
case validate_super_stream_creation(VirtualHost, Name, Partitions) of
232+
case validate_super_stream_creation(VirtualHost, Name, Partitions, BindingKeys) of
233233
{error, Reason} ->
234234
{reply, {error, Reason}, State};
235235
ok ->
@@ -655,7 +655,10 @@ super_stream_partitions(VirtualHost, SuperStream) ->
655655
{error, stream_not_found}
656656
end.
657657

658-
validate_super_stream_creation(VirtualHost, Name, Partitions) ->
658+
validate_super_stream_creation(_VirtualHost, _Name, Partitions, BindingKeys)
659+
when length(Partitions) =/= length(BindingKeys) ->
660+
{error, {validation_failed, "There must be the same number of partitions and binding keys"}};
661+
validate_super_stream_creation(VirtualHost, Name, Partitions, _BindingKeys) ->
659662
case exchange_exists(VirtualHost, Name) of
660663
{error, validation_failed} ->
661664
{error,

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ test_super_stream_creation_deletion(Config) ->
332332
ok = T:send(S, PartitionsFrame),
333333
{Cmd2, _} = receive_commands(T, S, C),
334334
?assertMatch({response, 1, {partitions, ?RESPONSE_CODE_OK, Partitions}},
335-
Cmd2),
335+
Cmd2),
336336
[begin
337337
RouteFrame = frame({request, 1, {route, Rk, Ss}}),
338338
ok = T:send(S, RouteFrame),
@@ -351,7 +351,15 @@ test_super_stream_creation_deletion(Config) ->
351351
ok = T:send(S, PartitionsFrame),
352352
{Cmd4, _} = receive_commands(T, S, C),
353353
?assertMatch({response, 1, {partitions, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, []}},
354-
Cmd4),
354+
Cmd4),
355+
356+
%% not the same number of partitions and binding keys
357+
SsCreationBadFrame = frame({request, 1, {create_super_stream, Ss,
358+
[<<"s1">>, <<"s2">>], [<<"bk1">>], #{}}}),
359+
ok = T:send(S, SsCreationBadFrame),
360+
{Cmd5, _} = receive_commands(T, S, C),
361+
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}},
362+
Cmd5),
355363

356364
test_close(T, S, C),
357365
closed = wait_for_socket_close(T, S, 10),

deps/rabbitmq_stream/test/rabbit_stream_manager_SUITE.erl

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,14 @@ lookup_member(Config) ->
9999
?assertEqual({ok, deleted}, delete_stream(Config, Stream)).
100100

101101
manage_super_stream(Config) ->
102-
% create super stream
102+
%% create super stream
103103
?assertEqual(ok,
104104
create_super_stream(Config,
105105
<<"invoices">>,
106106
[<<"invoices-0">>, <<"invoices-1">>,
107107
<<"invoices-2">>],
108108
[<<"0">>, <<"1">>, <<"2">>])),
109-
% get the correct partitions
109+
%% get the correct partitions
110110
?assertEqual({ok,
111111
[<<"invoices-0">>, <<"invoices-1">>, <<"invoices-2">>]},
112112
partitions(Config, <<"invoices">>)),
@@ -117,21 +117,21 @@ manage_super_stream(Config) ->
117117
<- [{<<"invoices-0">>, <<"0">>}, {<<"invoices-1">>, <<"1">>},
118118
{<<"invoices-2">>, <<"2">>}]],
119119

120-
% get an error if trying to re-create it
120+
%% get an error if trying to re-create it
121121
?assertMatch({error, _},
122122
create_super_stream(Config,
123123
<<"invoices">>,
124124
[<<"invoices-0">>, <<"invoices-1">>,
125125
<<"invoices-2">>],
126126
[<<"0">>, <<"1">>, <<"2">>])),
127127

128-
% can delete it
128+
%% can delete it
129129
?assertEqual(ok, delete_super_stream(Config, <<"invoices">>)),
130130

131-
% create a stream with the same name as a potential partition
131+
%% create a stream with the same name as a potential partition
132132
?assertMatch({ok, _}, create_stream(Config, <<"invoices-1">>)),
133133

134-
% cannot create the super stream because a partition already exists
134+
%% cannot create the super stream because a partition already exists
135135
?assertMatch({error, _},
136136
create_super_stream(Config,
137137
<<"invoices">>,
@@ -140,6 +140,14 @@ manage_super_stream(Config) ->
140140
[<<"0">>, <<"1">>, <<"2">>])),
141141

142142
?assertMatch({ok, _}, delete_stream(Config, <<"invoices-1">>)),
143+
144+
%% not the same number of partitions and binding keys
145+
?assertMatch({error, {validation_failed, _}},
146+
create_super_stream(Config,
147+
<<"invoices">>,
148+
[<<"invoices-0">>, <<"invoices-1">>],
149+
[<<"0">>])),
150+
143151
ok.
144152

145153
partition_index(Config) ->

0 commit comments

Comments
 (0)