Skip to content

Commit e760aed

Browse files
acogoluegnesansd
authored andcommitted
Support stream filtering in AMQP 1.0
Use the x-stream-filter-value message annotation to carry the filter value in a published message. Use the rabbitmq:stream-filter and rabbitmq:stream-match-unfiltered filters when creating a receiver that wants to filter out messages from a stream.
1 parent 7031a0e commit e760aed

File tree

4 files changed

+208
-19
lines changed

4 files changed

+208
-19
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -708,7 +708,7 @@ filter_value_type(V)
708708
when is_integer(V) andalso V >= 0 ->
709709
{uint, V};
710710
filter_value_type(VList) when is_list(VList) ->
711-
[filter_value_type(V) || V <- VList];
711+
{list, [filter_value_type(V) || V <- VList]};
712712
filter_value_type({T, _} = V) when is_atom(T) ->
713713
%% looks like an already tagged type, just pass it through
714714
V.
@@ -1215,7 +1215,8 @@ translate_filters_legacy_amqp_no_local_filter_test() ->
12151215
{map,
12161216
[{
12171217
{symbol, <<"apache.org:no-local-filter:list">>},
1218-
{described, {symbol, <<"apache.org:no-local-filter:list">>}, [{utf8, <<"foo">>}, {utf8, <<"bar">>}]}
1218+
{described, {symbol, <<"apache.org:no-local-filter:list">>},
1219+
{list, [{utf8, <<"foo">>}, {utf8, <<"bar">>}]}}
12191220
}]
12201221
} = translate_filters(#{<<"apache.org:no-local-filter:list">> => [<<"foo">>, <<"bar">>]}).
12211222

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -549,19 +549,21 @@ deliver0(MsgId, Msg,
549549
correlation = Correlation,
550550
slow = Slow}, Actions}.
551551

552+
552553
stream_message(Msg, _FilteringSupported = true) ->
553-
MsgData = msg_to_iodata(Msg),
554-
case mc:x_header(<<"x-stream-filter-value">>, Msg) of
554+
ConvertedMsg = mc:convert(mc_amqp, Msg),
555+
MsgData = amqp10_msg_to_iodata(ConvertedMsg),
556+
case mc:x_header(<<"x-stream-filter-value">>, ConvertedMsg) of
555557
undefined ->
556558
MsgData;
557559
{utf8, Value} ->
558560
{Value, MsgData}
559561
end;
560562
stream_message(Msg, _FilteringSupported = false) ->
561-
msg_to_iodata(Msg).
563+
amqp10_msg_to_iodata(mc:convert(mc_amqp, Msg)).
562564

563-
msg_to_iodata(Msg0) ->
564-
Sections = mc:protocol_state(mc:convert(mc_amqp, Msg0)),
565+
amqp10_msg_to_iodata(Msg) ->
566+
Sections = mc:protocol_state(Msg),
565567
mc_amqp:serialize(Sections).
566568

567569
-spec dequeue(_, _, _, _, client()) -> no_return().

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session.erl

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1842,20 +1842,57 @@ encode_frames(T, Msg, MaxContentLen, Transfers) ->
18421842
end.
18431843

18441844
source_filters_to_consumer_args(#'v1_0.source'{filter = {map, KVList}}) ->
1845-
Key = {symbol, <<"rabbitmq:stream-offset-spec">>},
1846-
case keyfind_unpack_described(Key, KVList) of
1847-
{_, {timestamp, Ts}} ->
1848-
[{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps
1849-
{_, {utf8, Spec}} ->
1850-
[{<<"x-stream-offset">>, longstr, Spec}]; %% next, last, first and "10m" etc
1851-
{_, {_, Offset}} when is_integer(Offset) ->
1852-
[{<<"x-stream-offset">>, long, Offset}]; %% integer offset
1853-
_ ->
1854-
[]
1855-
end;
1845+
source_filters_to_consumer_args(
1846+
[<<"rabbitmq:stream-offset-spec">>,
1847+
<<"rabbitmq:stream-filter">>, <<"rabbitmq:stream-match-unfiltered">>],
1848+
KVList,
1849+
[]);
18561850
source_filters_to_consumer_args(_Source) ->
18571851
[].
18581852

1853+
source_filters_to_consumer_args([], _KVList, Acc) ->
1854+
Acc;
1855+
source_filters_to_consumer_args([<<"rabbitmq:stream-offset-spec">> = H | T], KVList, Acc) ->
1856+
Key = {symbol, H},
1857+
Arg = case keyfind_unpack_described(Key, KVList) of
1858+
{_, {timestamp, Ts}} ->
1859+
[{<<"x-stream-offset">>, timestamp, Ts div 1000}]; %% 0.9.1 uses second based timestamps
1860+
{_, {utf8, Spec}} ->
1861+
[{<<"x-stream-offset">>, longstr, Spec}]; %% next, last, first and "10m" etc
1862+
{_, {_, Offset}} when is_integer(Offset) ->
1863+
[{<<"x-stream-offset">>, long, Offset}]; %% integer offset
1864+
_ ->
1865+
[]
1866+
end,
1867+
source_filters_to_consumer_args(T, KVList, Arg ++ Acc);
1868+
source_filters_to_consumer_args([<<"rabbitmq:stream-filter">> = H | T], KVList, Acc) ->
1869+
Key = {symbol, H},
1870+
Arg = case keyfind_unpack_described(Key, KVList) of
1871+
{_, {list, Filters}} when is_list(Filters) ->
1872+
FilterValues = lists:foldl(fun({utf8, Filter}, Fs) ->
1873+
[{longstr, Filter}] ++ Fs;
1874+
(_, Fs) ->
1875+
Fs
1876+
end, [], Filters),
1877+
[{<<"x-stream-filter">>, array, FilterValues}];
1878+
{_, {utf8, Filter}} ->
1879+
[{<<"x-stream-filter">>, longstr, Filter}];
1880+
_ ->
1881+
[]
1882+
end,
1883+
source_filters_to_consumer_args(T, KVList, Arg ++ Acc);
1884+
source_filters_to_consumer_args([<<"rabbitmq:stream-match-unfiltered">> = H | T], KVList, Acc) ->
1885+
Key = {symbol, H},
1886+
Arg = case keyfind_unpack_described(Key, KVList) of
1887+
{_, {boolean, MU}} ->
1888+
[{<<"x-stream-match-unfiltered">>, bool, MU}];
1889+
_ ->
1890+
[]
1891+
end,
1892+
source_filters_to_consumer_args(T, KVList, Arg ++ Acc);
1893+
source_filters_to_consumer_args([_ | T], KVList, Acc) ->
1894+
source_filters_to_consumer_args(T, KVList, Acc).
1895+
18591896
keyfind_unpack_described(Key, KvList) ->
18601897
%% filterset values _should_ be described values
18611898
%% they aren't always however for historical reasons so we need this bit of

deps/rabbitmq_amqp1_0/test/amqp10_client_SUITE.erl

Lines changed: 150 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ groups() ->
8181
resource_alarm_after_session_begin,
8282
max_message_size_client_to_server,
8383
max_message_size_server_to_client,
84-
receive_transfer_flow_order
84+
receive_transfer_flow_order,
85+
stream_filtering
8586
]},
8687

8788
{cluster_size_3, [shuffle],
@@ -2430,6 +2431,154 @@ queue_and_client_different_nodes(QueueLeaderNode, ClientNode, QueueType, Config)
24302431
ok = rabbit_ct_client_helpers:close_channel(Ch),
24312432
ok = amqp10_client:close_connection(Connection).
24322433

2434+
2435+
stream_filtering(Config) ->
2436+
Host = ?config(rmq_hostname, Config),
2437+
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
2438+
Stream = list_to_binary(atom_to_list(?FUNCTION_NAME) ++ "-" ++ integer_to_list(rand:uniform(10000))),
2439+
Address = <<"/amq/queue/", Stream/binary>>,
2440+
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
2441+
Args = [{<<"x-queue-type">>, longstr, <<"stream">>}],
2442+
amqp_channel:call(Ch, #'queue.declare'{queue = Stream,
2443+
durable = true,
2444+
arguments = Args}),
2445+
2446+
%% we are going to publish several waves of messages with and without filter values.
2447+
%% we will then create subscriptions with various filter options
2448+
%% and make sure we receive only what we asked for and not all the messages.
2449+
2450+
WaveCount = 1000,
2451+
OpnConf = #{address => Host,
2452+
port => Port,
2453+
transfer_limit_margin => 10 * WaveCount,
2454+
container_id => atom_to_binary(?FUNCTION_NAME, utf8),
2455+
sasl => {plain, <<"guest">>, <<"guest">>}},
2456+
2457+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
2458+
{ok, Session} = amqp10_client:begin_session(Connection),
2459+
SenderLinkName = <<"test-sender">>,
2460+
{ok, Sender} = amqp10_client:attach_sender_link(Session,
2461+
SenderLinkName,
2462+
Address),
2463+
2464+
wait_for_credit(Sender),
2465+
2466+
%% logic to publish a wave of messages with or without a filter value
2467+
Publish = fun(FilterValue) ->
2468+
lists:foreach(fun(Seq) ->
2469+
{AppProps, Anns} =
2470+
case FilterValue of
2471+
undefined ->
2472+
{#{}, #{}};
2473+
_ ->
2474+
{#{<<"filter">> => FilterValue},
2475+
#{<<"x-stream-filter-value">> => FilterValue}}
2476+
end,
2477+
FilterBin = rabbit_data_coercion:to_binary(FilterValue),
2478+
SeqBin = rabbit_data_coercion:to_binary(Seq),
2479+
DTag = <<FilterBin/binary, SeqBin/binary>>,
2480+
Msg0 = amqp10_msg:new(DTag, <<"my-body">>, false),
2481+
Msg1 = amqp10_msg:set_application_properties(AppProps, Msg0),
2482+
Msg2 = amqp10_msg:set_message_annotations(Anns, Msg1),
2483+
ok = amqp10_client:send_msg(Sender, Msg2),
2484+
ok = wait_for_settlement(DTag)
2485+
end, lists:seq(1, WaveCount))
2486+
end,
2487+
2488+
%% publishing messages with the "apple" filter value
2489+
Publish("apple"),
2490+
%% publishing messages with no filter value
2491+
Publish(undefined),
2492+
%% publishing messages with the "orange" filter value
2493+
Publish("orange"),
2494+
ok = amqp10_client:detach_link(Sender),
2495+
2496+
% filtering on "apple"
2497+
TerminusDurability = none,
2498+
Properties = #{},
2499+
{ok, AppleReceiver} =
2500+
amqp10_client:attach_receiver_link(Session, <<"test-receiver">>,
2501+
Address, settled,
2502+
TerminusDurability,
2503+
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
2504+
<<"rabbitmq:stream-filter">> => <<"apple">>},
2505+
Properties),
2506+
ok = amqp10_client:flow_link_credit(AppleReceiver, 100, 10),
2507+
2508+
AppleMessages = receive_all_messages(AppleReceiver, []),
2509+
%% we should get less than all the waves combined
2510+
?assert(length(AppleMessages) < WaveCount * 3),
2511+
%% client-side filtering
2512+
AppleFilteredMessages =
2513+
lists:filter(fun(Msg) ->
2514+
maps:get(<<"filter">>, amqp10_msg:application_properties(Msg)) =:= <<"apple">>
2515+
end, AppleMessages),
2516+
?assert(length(AppleFilteredMessages) =:= WaveCount),
2517+
ok = amqp10_client:detach_link(AppleReceiver),
2518+
2519+
%% filtering on "apple" and "orange"
2520+
TerminusDurability = none,
2521+
Properties = #{},
2522+
{ok, AppleOrangeReceiver} =
2523+
amqp10_client:attach_receiver_link(Session, <<"test-receiver">>,
2524+
Address, settled,
2525+
TerminusDurability,
2526+
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
2527+
<<"rabbitmq:stream-filter">> => [<<"apple">>, <<"orange">>]},
2528+
Properties),
2529+
ok = amqp10_client:flow_link_credit(AppleOrangeReceiver, 100, 10),
2530+
2531+
AppleOrangeMessages = receive_all_messages(AppleOrangeReceiver, []),
2532+
%% we should get less than all the waves combined
2533+
?assert(length(AppleOrangeMessages) < WaveCount * 3),
2534+
%% client-side filtering
2535+
AppleOrangeFilteredMessages =
2536+
lists:filter(fun(Msg) ->
2537+
AP = amqp10_msg:application_properties(Msg),
2538+
maps:get(<<"filter">>, AP) =:= <<"apple">> orelse
2539+
maps:get(<<"filter">>, AP) =:= <<"orange">>
2540+
end, AppleOrangeMessages),
2541+
?assert(length(AppleOrangeFilteredMessages) =:= WaveCount * 2),
2542+
ok = amqp10_client:detach_link(AppleOrangeReceiver),
2543+
2544+
%% filtering on "apple" and messages without a filter value
2545+
{ok, AppleUnfilteredReceiver} =
2546+
amqp10_client:attach_receiver_link(Session, <<"test-receiver">>,
2547+
Address, settled,
2548+
TerminusDurability,
2549+
#{<<"rabbitmq:stream-offset-spec">> => <<"first">>,
2550+
<<"rabbitmq:stream-filter">> => <<"apple">>,
2551+
<<"rabbitmq:stream-match-unfiltered">> => {boolean, true}},
2552+
Properties),
2553+
ok = amqp10_client:flow_link_credit(AppleUnfilteredReceiver, 100, 10),
2554+
2555+
AppleUnfilteredMessages = receive_all_messages(AppleUnfilteredReceiver, []),
2556+
%% we should get less than all the waves combined
2557+
?assert(length(AppleUnfilteredMessages) < WaveCount * 3),
2558+
%% client-side filtering
2559+
AppleUnfilteredFilteredMessages =
2560+
lists:filter(fun(Msg) ->
2561+
AP = amqp10_msg:application_properties(Msg),
2562+
maps:is_key(<<"filter">>, AP) =:= false orelse
2563+
maps:get(<<"filter">>, AP) =:= <<"apple">>
2564+
end, AppleUnfilteredMessages),
2565+
?assert(length(AppleUnfilteredFilteredMessages) =:= WaveCount * 2),
2566+
ok = amqp10_client:detach_link(AppleUnfilteredReceiver),
2567+
2568+
delete_queue(Config, Stream),
2569+
ok = amqp10_client:close_connection(Connection),
2570+
ok.
2571+
2572+
receive_all_messages(Receiver, Acc) ->
2573+
receive
2574+
{amqp10_msg, Receiver, InMsg} ->
2575+
ok = amqp10_client:accept_msg(Receiver, InMsg),
2576+
receive_all_messages(Receiver, [InMsg] ++ Acc)
2577+
after 1000 ->
2578+
Acc
2579+
end.
2580+
2581+
24332582
%% internal
24342583
%%
24352584

0 commit comments

Comments
 (0)