Skip to content

Commit ab2c565

Browse files
authored
Merge pull request #3423 from rabbitmq/streams-sort-partitions
Sort stream partitions using binding parameter
2 parents b140395 + faee725 commit ab2c565

File tree

5 files changed

+129
-10
lines changed

5 files changed

+129
-10
lines changed

deps/rabbitmq_stream/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ suites = [
8080
PACKAGE,
8181
name = "config_schema_SUITE",
8282
),
83+
rabbitmq_integration_suite(
84+
PACKAGE,
85+
name = "rabbit_stream_utils_SUITE",
86+
),
8387
rabbitmq_integration_suite(
8488
PACKAGE,
8589
name = "rabbit_stream_SUITE",

deps/rabbitmq_stream/src/rabbit_stream_manager.erl

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -369,8 +369,10 @@ handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From,
369369
[] ->
370370
{ok, no_route};
371371
Routes ->
372-
%% FIXME filter non-stream resources
373-
{ok, [Stream || #resource{name = Stream} <- Routes]}
372+
{ok,
373+
[Stream
374+
|| #resource{name = Stream} = R <- Routes,
375+
is_resource_stream_queue(R)]}
374376
end
375377
catch
376378
exit:Error ->
@@ -383,9 +385,13 @@ handle_call({partitions, VirtualHost, SuperStream}, _From, State) ->
383385
ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream),
384386
Res = try
385387
rabbit_exchange:lookup_or_die(ExchangeName),
386-
%% FIXME make sure queue is a stream
387-
%% TODO bindings could be sorted by partition number, by using a binding argument
388-
%% this would make the spreading of messages stable
388+
UnorderedBindings =
389+
[Binding
390+
|| Binding = #binding{destination = D}
391+
<- rabbit_binding:list_for_source(ExchangeName),
392+
is_resource_stream_queue(D)],
393+
OrderedBindings =
394+
rabbit_stream_utils:sort_partitions(UnorderedBindings),
389395
{ok,
390396
lists:foldl(fun (#binding{destination =
391397
#resource{kind = queue, name = Q}},
@@ -394,7 +400,7 @@ handle_call({partitions, VirtualHost, SuperStream}, _From, State) ->
394400
(_Binding, Acc) ->
395401
Acc
396402
end,
397-
[], rabbit_binding:list_for_source(ExchangeName))}
403+
[], OrderedBindings)}
398404
catch
399405
exit:Error ->
400406
rabbit_log:error("Error while looking up exchange ~p, ~p",
@@ -445,3 +451,13 @@ is_stream_queue(Q) ->
445451
_ ->
446452
false
447453
end.
454+
455+
is_resource_stream_queue(#resource{kind = queue} = Resource) ->
456+
case rabbit_amqqueue:lookup(Resource) of
457+
{ok, Q} ->
458+
is_stream_queue(Q);
459+
_ ->
460+
false
461+
end;
462+
is_resource_stream_queue(_) ->
463+
false.

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,8 @@ callback_mode() ->
174174
terminate(Reason, State,
175175
#statem_data{transport = Transport,
176176
connection = #stream_connection{socket = Socket},
177-
connection_state = ConnectionState} = StatemData) ->
177+
connection_state = ConnectionState} =
178+
StatemData) ->
178179
close(Transport, Socket, ConnectionState),
179180
rabbit_networking:unregister_non_amqp_connection(self()),
180181
notify_connection_closed(StatemData),
@@ -1053,7 +1054,8 @@ close_sent(enter, _OldState,
10531054
StateTimeout}}) ->
10541055
{keep_state_and_data, {state_timeout, StateTimeout, close}};
10551056
close_sent(state_timeout, close, #statem_data{}) ->
1056-
rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.",
1057+
rabbit_log_connection:warning("Closing connection because of timeout in state "
1058+
"'~s' likely due to lack of client action.",
10571059
[?FUNCTION_NAME]),
10581060
stop;
10591061
close_sent(info, {tcp, S, Data},
@@ -1081,7 +1083,8 @@ close_sent(info, {tcp_closed, S}, _StatemData) ->
10811083
[S, self()]),
10821084
stop;
10831085
close_sent(info, {tcp_error, S, Reason}, #statem_data{}) ->
1084-
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]",
1086+
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] "
1087+
"[~w]",
10851088
[Reason, S, self()]),
10861089
stop;
10871090
close_sent(info, {resource_alarm, IsThereAlarm},

deps/rabbitmq_stream/src/rabbit_stream_utils.erl

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,13 @@
2525
check_configure_permitted/3,
2626
check_write_permitted/3,
2727
check_read_permitted/3,
28-
extract_stream_list/2]).
28+
extract_stream_list/2,
29+
sort_partitions/1]).
2930

3031
-define(MAX_PERMISSION_CACHE_SIZE, 12).
3132

33+
-include_lib("rabbit_common/include/rabbit.hrl").
34+
3235
enforce_correct_stream_name(Name) ->
3336
% from rabbit_channel
3437
StrippedName =
@@ -213,3 +216,23 @@ extract_stream_list(<<>>, Streams) ->
213216
extract_stream_list(<<Length:16, Stream:Length/binary, Rest/binary>>,
214217
Streams) ->
215218
extract_stream_list(Rest, [Stream | Streams]).
219+
220+
-spec sort_partitions([#binding{}]) -> [#binding{}].
221+
sort_partitions(Partitions) ->
222+
lists:sort(fun(#binding{args = Args1}, #binding{args = Args2}) ->
223+
Arg1 =
224+
rabbit_misc:table_lookup(Args1,
225+
<<"x-stream-partition-order">>),
226+
Arg2 =
227+
rabbit_misc:table_lookup(Args2,
228+
<<"x-stream-partition-order">>),
229+
case {Arg1, Arg2} of
230+
{{_, Order1}, {_, Order2}} ->
231+
rabbit_data_coercion:to_integer(Order1)
232+
=< rabbit_data_coercion:to_integer(Order2);
233+
{undefined, {_, _Order2}} -> false;
234+
{{_, _Order1}, undefined} -> true;
235+
_ -> true
236+
end
237+
end,
238+
Partitions).
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
-module(rabbit_stream_utils_SUITE).
2+
3+
-compile(nowarn_export_all).
4+
-compile(export_all).
5+
6+
-include_lib("eunit/include/eunit.hrl").
7+
-include_lib("rabbit_common/include/rabbit.hrl").
8+
9+
%%%===================================================================
10+
%%% Common Test callbacks
11+
%%%===================================================================
12+
13+
all() ->
14+
[{group, tests}].
15+
16+
suite() ->
17+
[{timetrap, {seconds, 30}}].
18+
19+
groups() ->
20+
[{tests, [], [sort_partitions]}].
21+
22+
init_per_suite(Config) ->
23+
Config.
24+
25+
end_per_suite(_Config) ->
26+
ok.
27+
28+
group(_GroupName) ->
29+
[].
30+
31+
init_per_group(_GroupName, Config) ->
32+
Config.
33+
34+
end_per_group(_GroupName, _Config) ->
35+
ok.
36+
37+
init_per_testcase(_TestCase, Config) ->
38+
Config.
39+
40+
end_per_testcase(_TestCase, _Config) ->
41+
ok.
42+
43+
%%%===================================================================
44+
%%% Test cases
45+
%%%===================================================================
46+
47+
sort_partitions(_Config) ->
48+
[] = rabbit_stream_utils:sort_partitions([]),
49+
?assertEqual([<<"a">>, <<"b">>, <<"c">>],
50+
[S
51+
|| #binding{destination = #resource{name = S}}
52+
<- rabbit_stream_utils:sort_partitions([binding(<<"c">>,
53+
2),
54+
binding(<<"b">>,
55+
1),
56+
binding(<<"a">>,
57+
0)])]),
58+
?assertEqual([<<"a">>, <<"c">>, <<"no-order-field">>],
59+
[S
60+
|| #binding{destination = #resource{name = S}}
61+
<- rabbit_stream_utils:sort_partitions([binding(<<"c">>,
62+
10),
63+
binding(<<"no-order-field">>),
64+
binding(<<"a">>,
65+
0)])]),
66+
ok.
67+
68+
binding(Destination, Order) ->
69+
#binding{destination = #resource{name = Destination},
70+
args = [{<<"x-stream-partition-order">>, signedint, Order}]}.
71+
72+
binding(Destination) ->
73+
#binding{destination = #resource{name = Destination}, args = []}.

0 commit comments

Comments
 (0)