Skip to content

Commit 53ba9ee

Browse files
committed
Simplify
1 parent e760aed commit 53ba9ee

File tree

3 files changed

+94
-110
lines changed

3 files changed

+94
-110
lines changed

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -549,22 +549,21 @@ deliver0(MsgId, Msg,
549549
correlation = Correlation,
550550
slow = Slow}, Actions}.
551551

552-
553-
stream_message(Msg, _FilteringSupported = true) ->
554-
ConvertedMsg = mc:convert(mc_amqp, Msg),
555-
MsgData = amqp10_msg_to_iodata(ConvertedMsg),
556-
case mc:x_header(<<"x-stream-filter-value">>, ConvertedMsg) of
557-
undefined ->
558-
MsgData;
559-
{utf8, Value} ->
560-
{Value, MsgData}
561-
end;
562-
stream_message(Msg, _FilteringSupported = false) ->
563-
amqp10_msg_to_iodata(mc:convert(mc_amqp, Msg)).
564-
565-
amqp10_msg_to_iodata(Msg) ->
566-
Sections = mc:protocol_state(Msg),
567-
mc_amqp:serialize(Sections).
552+
stream_message(Msg, FilteringSupported) ->
553+
McAmqp = mc:convert(mc_amqp, Msg),
554+
Sections = mc:protocol_state(McAmqp),
555+
MsgData = mc_amqp:serialize(Sections),
556+
case FilteringSupported of
557+
true ->
558+
case mc:x_header(<<"x-stream-filter-value">>, McAmqp) of
559+
undefined ->
560+
MsgData;
561+
{utf8, Value} ->
562+
{Value, MsgData}
563+
end;
564+
false ->
565+
MsgData
566+
end.
568567

569568
-spec dequeue(_, _, _, _, client()) -> no_return().
570569
dequeue(_, _, _, _, #stream_client{name = Name}) ->

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session.erl

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1844,7 +1844,8 @@ encode_frames(T, Msg, MaxContentLen, Transfers) ->
18441844
source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
18451845
source_filters_to_consumer_args(
18461846
[<<"rabbitmq:stream-offset-spec">>,
1847-
<<"rabbitmq:stream-filter">>, <<"rabbitmq:stream-match-unfiltered">>],
1847+
<<"rabbitmq:stream-filter">>,
1848+
<<"rabbitmq:stream-match-unfiltered">>],
18481849
KVList,
18491850
[]);
18501851
source_filters_to_consumer_args(_Source) ->
@@ -1868,13 +1869,13 @@ source_filters_to_consumer_args([<<"rabbitmq:stream-offset-spec">> = H | T], KVL
18681869
source_filters_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList, Acc) ->
18691870
Key = {symbol, H},
18701871
Arg = case keyfind_unpack_described(Key, KVList) of
1871-
{_, {list, Filters}} when is_list(Filters) ->
1872-
FilterValues = lists:foldl(fun({utf8, Filter}, Fs) ->
1873-
[{longstr, Filter}] ++ Fs;
1874-
(_, Fs) ->
1875-
Fs
1876-
end, [], Filters),
1877-
[{<<"x-stream-filter">>, array, FilterValues}];
1872+
{_, {list, Filters0}} when is_list(Filters0) ->
1873+
Filters = lists:foldl(fun({utf8, Filter}, L) ->
1874+
[{longstr, Filter} | L];
1875+
(_, L) ->
1876+
L
1877+
end, [], Filters0),
1878+
[{<<"x-stream-filter">>, array, Filters}];
18781879
{_, {utf8, Filter}} ->
18791880
[{<<"x-stream-filter">>, longstr, Filter}];
18801881
_ ->

deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl

Lines changed: 70 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -577,10 +577,9 @@ roundtrip_with_drain(Config, QueueType, QName)
577577
% create a receiver link
578578
TerminusDurability = none,
579579
Filter = consume_from_first(QueueType),
580-
Properties = #{},
581580
{ok, Receiver} = amqp10_client:attach_receiver_link(
582581
Session, <<"test-receiver">>, Address, unsettled,
583-
TerminusDurability, Filter, Properties),
582+
TerminusDurability, Filter),
584583

585584
% grant credit and drain
586585
ok = amqp10_client:flow_link_credit(Receiver, 1, never, true),
@@ -2433,36 +2432,28 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config)
24332432

24342433

24352434
stream_filtering(Config) ->
2436-
Host = ?config(rmq_hostname, Config),
2437-
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
2438-
Stream = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-" ++ integer_to_list(rand:uniform(10000))),
2435+
Stream = atom_to_binary(?FUNCTION_NAME),
24392436
Address = <<"/amq/queue/", Stream/binary>>,
2440-
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
2441-
Args = [{<<"x-queue-type">>, longstr, <<"stream">>}],
2442-
amqp_channel:call(Ch, #'queue.declare'{queue = Stream,
2443-
durable = true,
2444-
arguments = Args}),
2445-
2446-
%% we are going to publish several waves of messages with and without filter values.
2447-
%% we will then create subscriptions with various filter options
2448-
%% and make sure we receive only what we asked for and not all the messages.
2449-
2450-
WaveCount = 1000,
2451-
OpnConf = #{address => Host,
2452-
port => Port,
2453-
transfer_limit_margin => 10 * WaveCount,
2454-
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
2455-
sasl => {plain, <<"guest">>, <<"guest">>}},
2437+
Ch = rabbit_ct_client_helpers:open_channel(Config),
2438+
amqp_channel:call(Ch, #'queue.declare'{
2439+
queue = Stream,
2440+
durable = true,
2441+
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}),
2442+
ok = rabbit_ct_client_helpers:close_channel(Ch),
24562443

2444+
OpnConf = connection_config(Config),
24572445
{ok, Connection} = amqp10_client:open_connection(OpnConf),
24582446
{ok, Session} = amqp10_client:begin_session(Connection),
24592447
SenderLinkName = <<"test-sender">>,
24602448
{ok, Sender} = amqp10_client:attach_sender_link(Session,
24612449
SenderLinkName,
24622450
Address),
2463-
24642451
wait_for_credit(Sender),
24652452

2453+
%% We are going to publish several waves of messages with and without filter values.
2454+
%% We will then create subscriptions with various filter options
2455+
%% and make sure we receive only what we asked for and not all the messages.
2456+
WaveCount = 1000,
24662457
%% logic to publish a wave of messages with or without a filter value
24672458
Publish = fun(FilterValue) ->
24682459
lists:foreach(fun(Seq) ->
@@ -2475,7 +2466,7 @@ stream_filtering(Config) ->
24752466
#{<<"x-stream-filter-value">> => FilterValue}}
24762467
end,
24772468
FilterBin = rabbit_data_coercion:to_binary(FilterValue),
2478-
SeqBin = rabbit_data_coercion:to_binary(Seq),
2469+
SeqBin = integer_to_binary(Seq),
24792470
DTag = <<FilterBin/binary, SeqBin/binary>>,
24802471
Msg0 = amqp10_msg:new(DTag, <<"my-body">>, false),
24812472
Msg1 = amqp10_msg:set_application_properties(AppProps, Msg0),
@@ -2485,103 +2476,96 @@ stream_filtering(Config) ->
24852476
end, lists:seq(1, WaveCount))
24862477
end,
24872478

2488-
%% publishing messages with the "apple" filter value
2489-
Publish("apple"),
2490-
%% publishing messages with no filter value
2479+
%% Publish messages with the "apple" filter value.
2480+
Publish(<<"apple">>),
2481+
%% Publish messages with no filter value.
24912482
Publish(undefined),
2492-
%% publishing messages with the "orange" filter value
2493-
Publish("orange"),
2483+
%% Publish messages with the "orange" filter value.
2484+
Publish(<<"orange">>),
24942485
ok = amqp10_client:detach_link(Sender),
24952486

24962487
% filtering on "apple"
24972488
TerminusDurability = none,
2498-
Properties = #{},
2499-
{ok, AppleReceiver} =
2500-
amqp10_client:attach_receiver_link(Session, <<"test-receiver">>,
2501-
Address, settled,
2502-
TerminusDurability,
2503-
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
2504-
<<"rabbitmq:stream-filter">> => <<"apple">>},
2505-
Properties),
2489+
{ok, AppleReceiver} = amqp10_client:attach_receiver_link(
2490+
Session,
2491+
<<"test-receiver-1">>,
2492+
Address,
2493+
unsettled,
2494+
TerminusDurability,
2495+
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
2496+
<<"rabbitmq:stream-filter">> => <<"apple">>}),
25062497
ok = amqp10_client:flow_link_credit(AppleReceiver, 100, 10),
2507-
25082498
AppleMessages = receive_all_messages(AppleReceiver, []),
25092499
%% we should get less than all the waves combined
25102500
?assert(length(AppleMessages) < WaveCount * 3),
25112501
%% client-side filtering
2512-
AppleFilteredMessages =
2513-
lists:filter(fun(Msg) ->
2514-
maps:get(<<"filter">>, amqp10_msg:application_properties(Msg)) =:= <<"apple">>
2515-
end, AppleMessages),
2516-
?assert(length(AppleFilteredMessages) =:= WaveCount),
2502+
AppleFilteredMessages = lists:filter(fun(Msg) ->
2503+
AP = amqp10_msg:application_properties(Msg),
2504+
maps:get(<<"filter">>, AP) =:= <<"apple">>
2505+
end, AppleMessages),
2506+
?assertEqual(WaveCount, length(AppleFilteredMessages)),
25172507
ok = amqp10_client:detach_link(AppleReceiver),
25182508

25192509
%% filtering on "apple" and "orange"
2520-
TerminusDurability = none,
2521-
Properties = #{},
2522-
{ok, AppleOrangeReceiver} =
2523-
amqp10_client:attach_receiver_link(Session, <<"test-receiver">>,
2524-
Address, settled,
2525-
TerminusDurability,
2526-
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
2527-
<<"rabbitmq:stream-filter">> => [<<"apple">>, <<"orange">>]},
2528-
Properties),
2510+
{ok, AppleOrangeReceiver} = amqp10_client:attach_receiver_link(
2511+
Session,
2512+
<<"test-receiver-2">>,
2513+
Address,
2514+
unsettled,
2515+
TerminusDurability,
2516+
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
2517+
<<"rabbitmq:stream-filter">> => [<<"apple">>, <<"orange">>]}),
25292518
ok = amqp10_client:flow_link_credit(AppleOrangeReceiver, 100, 10),
2530-
25312519
AppleOrangeMessages = receive_all_messages(AppleOrangeReceiver, []),
25322520
%% we should get less than all the waves combined
25332521
?assert(length(AppleOrangeMessages) < WaveCount * 3),
25342522
%% client-side filtering
2535-
AppleOrangeFilteredMessages =
2536-
lists:filter(fun(Msg) ->
2537-
AP = amqp10_msg:application_properties(Msg),
2538-
maps:get(<<"filter">>, AP) =:= <<"apple">> orelse
2539-
maps:get(<<"filter">>, AP) =:= <<"orange">>
2540-
end, AppleOrangeMessages),
2541-
?assert(length(AppleOrangeFilteredMessages) =:= WaveCount * 2),
2523+
AppleOrangeFilteredMessages = lists:filter(fun(Msg) ->
2524+
AP = amqp10_msg:application_properties(Msg),
2525+
Filter = maps:get(<<"filter">>, AP),
2526+
Filter =:= <<"apple">> orelse Filter =:= <<"orange">>
2527+
end, AppleOrangeMessages),
2528+
?assertEqual(WaveCount * 2, length(AppleOrangeFilteredMessages)),
25422529
ok = amqp10_client:detach_link(AppleOrangeReceiver),
25432530

25442531
%% filtering on "apple" and messages without a filter value
2545-
{ok, AppleUnfilteredReceiver} =
2546-
amqp10_client:attach_receiver_link(Session, <<"test-receiver">>,
2547-
Address, settled,
2548-
TerminusDurability,
2549-
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
2550-
<<"rabbitmq:stream-filter">> => <<"apple">>,
2551-
<<"rabbitmq:stream-match-unfiltered">> => {boolean, true}},
2552-
Properties),
2532+
{ok, AppleUnfilteredReceiver} = amqp10_client:attach_receiver_link(
2533+
Session,
2534+
<<"test-receiver-3">>,
2535+
Address,
2536+
unsettled,
2537+
TerminusDurability,
2538+
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
2539+
<<"rabbitmq:stream-filter">> => <<"apple">>,
2540+
<<"rabbitmq:stream-match-unfiltered">> => {boolean, true}}),
25532541
ok = amqp10_client:flow_link_credit(AppleUnfilteredReceiver, 100, 10),
25542542

25552543
AppleUnfilteredMessages = receive_all_messages(AppleUnfilteredReceiver, []),
25562544
%% we should get less than all the waves combined
25572545
?assert(length(AppleUnfilteredMessages) < WaveCount * 3),
25582546
%% client-side filtering
2559-
AppleUnfilteredFilteredMessages =
2560-
lists:filter(fun(Msg) ->
2561-
AP = amqp10_msg:application_properties(Msg),
2562-
maps:is_key(<<"filter">>, AP) =:= false orelse
2563-
maps:get(<<"filter">>, AP) =:= <<"apple">>
2564-
end, AppleUnfilteredMessages),
2565-
?assert(length(AppleUnfilteredFilteredMessages) =:= WaveCount * 2),
2547+
AppleUnfilteredFilteredMessages = lists:filter(fun(Msg) ->
2548+
AP = amqp10_msg:application_properties(Msg),
2549+
not maps:is_key(<<"filter">>, AP) orelse
2550+
maps:get(<<"filter">>, AP) =:= <<"apple">>
2551+
end, AppleUnfilteredMessages),
2552+
?assertEqual(WaveCount * 2, length(AppleUnfilteredFilteredMessages)),
25662553
ok = amqp10_client:detach_link(AppleUnfilteredReceiver),
25672554

25682555
delete_queue(Config, Stream),
2569-
ok = amqp10_client:close_connection(Connection),
2570-
ok.
2571-
2572-
receive_all_messages(Receiver, Acc) ->
2573-
receive
2574-
{amqp10_msg, Receiver, InMsg} ->
2575-
ok = amqp10_client:accept_msg(Receiver, InMsg),
2576-
receive_all_messages(Receiver, [InMsg] ++ Acc)
2577-
after 1000 ->
2578-
Acc
2579-
end.
2580-
2556+
ok = amqp10_client:close_connection(Connection).
25812557

25822558
%% internal
25832559
%%
25842560

2561+
receive_all_messages(Receiver, Acc) ->
2562+
receive {amqp10_msg, Receiver, Msg} ->
2563+
ok = amqp10_client:accept_msg(Receiver, Msg),
2564+
receive_all_messages(Receiver, [Msg | Acc])
2565+
after 500 ->
2566+
lists:reverse(Acc)
2567+
end.
2568+
25852569
connection_config(Config) ->
25862570
connection_config(0, Config).
25872571

0 commit comments

Comments
 (0)