Skip to content

Commit 051ef81

Browse files
committed
Support stream filtering in AMQP 0.9.1
1 parent 7bc2615 commit 051ef81

File tree

2 files changed

+175
-18
lines changed

2 files changed

+175
-18
lines changed

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 88 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@
4747
-export([update_stream_conf/2]).
4848
-export([readers/1]).
4949

50-
-export([parse_offset_arg/1]).
50+
-export([parse_offset_arg/1,
51+
filter_spec/1]).
5152

5253
-export([status/2,
5354
tracking_status/2,
@@ -72,7 +73,8 @@
7273
max :: non_neg_integer(),
7374
start_offset = 0 :: non_neg_integer(),
7475
listening_offset = 0 :: non_neg_integer(),
75-
log :: undefined | osiris_log:state()}).
76+
log :: undefined | osiris_log:state(),
77+
reader_options :: osiris:reader_options()}).
7678

7779
-record(stream_client, {stream_id :: string(),
7880
name :: term(),
@@ -83,7 +85,8 @@
8385
soft_limit :: non_neg_integer(),
8486
slow = false :: boolean(),
8587
readers = #{} :: #{term() => #stream{}},
86-
writer_id :: binary()
88+
writer_id :: binary(),
89+
filtering_supported :: boolean()
8790
}).
8891

8992
-import(rabbit_queue_type_util, [args_policy_lookup/3]).
@@ -233,7 +236,8 @@ consume(Q, #{no_ack := true}, _)
233236
consume(Q, #{limiter_active := true}, _State)
234237
when ?amqqueue_is_stream(Q) ->
235238
{error, global_qos_not_supported_for_queue_type};
236-
consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
239+
consume(Q, Spec,
240+
#stream_client{filtering_supported = FilteringSupported} = QState0) when ?amqqueue_is_stream(Q) ->
237241
%% Messages should include the offset as a custom header.
238242
case check_queue_exists_in_local_node(Q) of
239243
ok ->
@@ -258,7 +262,14 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
258262
%% really it should be sent by the stream queue process like classic queues
259263
%% do
260264
maybe_send_reply(ChPid, OkMsg),
261-
begin_stream(QState0, ConsumerTag, OffsetSpec, ConsumerPrefetchCount)
265+
FilterSpec = filter_spec(Args),
266+
case {FilterSpec, FilteringSupported} of
267+
{#{filter_spec := _}, false} ->
268+
{protocol_error, precondition_failed, "Filtering is not supported", []};
269+
_ ->
270+
begin_stream(QState0, ConsumerTag, OffsetSpec,
271+
ConsumerPrefetchCount, FilterSpec)
272+
end
262273
end;
263274
Err ->
264275
Err
@@ -292,6 +303,35 @@ parse_offset_arg({_, V}) ->
292303
parse_offset_arg(V) ->
293304
{error, {invalid_offset_arg, V}}.
294305

306+
filter_spec(Args) ->
307+
Filters = case lists:keysearch(<<"x-stream-filter">>, 1, Args) of
308+
{value, {_, array, FilterValues}} ->
309+
lists:foldl(fun({longstr, V}, Acc) ->
310+
[V] ++ Acc;
311+
(_, Acc) ->
312+
Acc
313+
end, [], FilterValues);
314+
{value, {_, longstr, FilterValue}} ->
315+
[FilterValue];
316+
_ ->
317+
undefined
318+
end,
319+
MatchUnfiltered = case lists:keysearch(<<"x-stream-match-unfiltered">>, 1, Args) of
320+
{value, {_, bool, Match}} when is_list(Filters) ->
321+
Match;
322+
_ when is_list(Filters) ->
323+
false;
324+
_ ->
325+
undefined
326+
end,
327+
case MatchUnfiltered of
328+
undefined ->
329+
#{};
330+
MU ->
331+
#{filter_spec =>
332+
#{filters => Filters, match_unfiltered => MU}}
333+
end.
334+
295335
get_local_pid(#stream_client{local_pid = Pid} = State)
296336
when is_pid(Pid) ->
297337
{Pid, State};
@@ -309,14 +349,14 @@ get_local_pid(#stream_client{stream_id = StreamId,
309349
end.
310350

311351
begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
312-
Tag, Offset, Max) ->
352+
Tag, Offset, Max, Options) ->
313353
{LocalPid, State} = get_local_pid(State0),
314354
case LocalPid of
315355
undefined ->
316356
{error, no_local_stream_replica_available};
317357
_ ->
318358
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
319-
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec),
359+
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options),
320360
NextOffset = osiris_log:next_offset(Seg0) - 1,
321361
osiris:register_offset_listener(LocalPid, NextOffset),
322362
%% TODO: avoid double calls to the same process
@@ -331,7 +371,8 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
331371
start_offset = StartOffset,
332372
listening_offset = NextOffset,
333373
log = Seg0,
334-
max = Max},
374+
max = Max,
375+
reader_options = Options},
335376
{ok, State#stream_client{local_pid = LocalPid,
336377
readers = Readers0#{Tag => Str0}}}
337378
end.
@@ -383,7 +424,8 @@ deliver(QSs, #delivery{message = Msg, confirm = Confirm} = Delivery) ->
383424
lists:foldl(
384425
fun({Q, stateless}, {Qs, Actions}) ->
385426
LeaderPid = amqqueue:get_pid(Q),
386-
ok = osiris:write(LeaderPid, msg_to_iodata(Msg)),
427+
ok = osiris:write(LeaderPid,
428+
stream_message(Msg, filtering_supported())),
387429
{Qs, Actions};
388430
({Q, S0}, {Qs, Actions}) ->
389431
{S, As} = deliver(Confirm, Delivery, S0),
@@ -397,8 +439,10 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
397439
next_seq = Seq,
398440
correlation = Correlation0,
399441
soft_limit = SftLmt,
400-
slow = Slow0} = State) ->
401-
ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg)),
442+
slow = Slow0,
443+
filtering_supported = FilteringSupported} = State) ->
444+
ok = osiris:write(LeaderPid, WriterId, Seq,
445+
stream_message(Msg, FilteringSupported)),
402446
Correlation = case MsgId of
403447
undefined ->
404448
Correlation0;
@@ -415,6 +459,25 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
415459
correlation = Correlation,
416460
slow = Slow}, Actions}.
417461

462+
stream_message(Msg, _FilteringSupported = true) ->
463+
MsgData = msg_to_iodata(Msg),
464+
case filter_header(Msg) of
465+
{_, longstr, Value} ->
466+
{Value, MsgData};
467+
_ ->
468+
MsgData
469+
end;
470+
stream_message(Msg, _FilteringSupported = false) ->
471+
msg_to_iodata(Msg).
472+
473+
filter_header(Msg) ->
474+
basic_header(<<"x-stream-filter-value">>, Msg).
475+
476+
basic_header(Key, #basic_message{content = Content}) ->
477+
Headers = rabbit_basic:extract_headers(Content),
478+
rabbit_basic:header(Key, Headers).
479+
480+
418481
-spec dequeue(_, _, _, _, client()) -> no_return().
419482
dequeue(_, _, _, _, #stream_client{name = Name}) ->
420483
{protocol_error, not_implemented, "basic.get not supported by stream queues ~ts",
@@ -462,12 +525,14 @@ handle_event(_QName, {stream_local_member_change, Pid}, #stream_client{local_pid
462525
handle_event(_QName, {stream_local_member_change, Pid}, State = #stream_client{name = QName,
463526
readers = Readers0}) ->
464527
rabbit_log:debug("Local member change event for ~tp", [QName]),
465-
Readers1 = maps:fold(fun(T, #stream{log = Log0} = S0, Acc) ->
528+
Readers1 = maps:fold(fun(T, #stream{log = Log0, reader_options = Options} = S0, Acc) ->
466529
Offset = osiris_log:next_offset(Log0),
467530
osiris_log:close(Log0),
468531
CounterSpec = {{?MODULE, QName, self()}, []},
469-
rabbit_log:debug("Re-creating Osiris reader for consumer ~tp at offset ~tp", [T, Offset]),
470-
{ok, Log1} = osiris:init_reader(Pid, Offset, CounterSpec),
532+
rabbit_log:debug("Re-creating Osiris reader for consumer ~tp at offset ~tp "
533+
" with options ~tp",
534+
[T, Offset, Options]),
535+
{ok, Log1} = osiris:init_reader(Pid, Offset, CounterSpec, Options),
471536
NextOffset = osiris_log:next_offset(Log1) - 1,
472537
rabbit_log:debug("Registering offset listener at offset ~tp", [NextOffset]),
473538
osiris:register_offset_listener(Pid, NextOffset),
@@ -775,7 +840,8 @@ init(Q) when ?is_amqqueue(Q) ->
775840
name = amqqueue:get_name(Q),
776841
leader = Leader,
777842
writer_id = WriterId,
778-
soft_limit = SoftLimit}};
843+
soft_limit = SoftLimit,
844+
filtering_supported = filtering_supported()}};
779845
{ok, stream_not_found, _} ->
780846
{error, stream_not_found};
781847
{error, coordinator_unavailable} = E ->
@@ -1056,7 +1122,8 @@ notify_decorators(Q) when ?is_amqqueue(Q) ->
10561122

10571123
resend_all(#stream_client{leader = LeaderPid,
10581124
writer_id = WriterId,
1059-
correlation = Corrs} = State) ->
1125+
correlation = Corrs,
1126+
filtering_supported = FilteringSupported} = State) ->
10601127
Msgs = lists:sort(maps:values(Corrs)),
10611128
case Msgs of
10621129
[] -> ok;
@@ -1065,7 +1132,8 @@ resend_all(#stream_client{leader = LeaderPid,
10651132
[Seq, maps:size(Corrs)])
10661133
end,
10671134
[begin
1068-
ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg))
1135+
ok = osiris:write(LeaderPid, WriterId, Seq,
1136+
stream_message(Msg, FilteringSupported))
10691137
end || {Seq, Msg} <- Msgs],
10701138
State.
10711139

@@ -1098,3 +1166,6 @@ list_with_minimum_quorum() ->
10981166
end, rabbit_amqqueue:list_local_stream_queues()).
10991167

11001168
is_stateful() -> true.
1169+
1170+
filtering_supported() ->
1171+
rabbit_feature_flags:is_enabled(stream_filtering).

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,9 @@ all_tests() ->
126126
queue_info,
127127
tracking_status,
128128
restart_stream,
129-
dead_letter_target
129+
dead_letter_target,
130+
filter_spec,
131+
filtering
130132
].
131133

132134
%% -------------------------------------------------------------------
@@ -2391,8 +2393,92 @@ dead_letter_target(Config) ->
23912393
exit(timeout)
23922394
end,
23932395
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
2396+
2397+
filter_spec(_) ->
2398+
[begin
2399+
FilterSpec = rabbit_stream_queue:filter_spec(Args),
2400+
?assert(maps:is_key(filter_spec, FilterSpec)),
2401+
#{filter_spec := #{filters := Filters, match_unfiltered := MatchUnfiltered}} = FilterSpec,
2402+
?assertEqual(lists:sort(ExpectedFilters), lists:sort(Filters)),
2403+
?assertEqual(ExpectedMatchUnfiltered, MatchUnfiltered)
2404+
end || {Args, ExpectedFilters, ExpectedMatchUnfiltered} <-
2405+
[{[{<<"x-stream-filter">>,array,[{longstr,<<"apple">>},{longstr,<<"banana">>}]}],
2406+
[<<"apple">>, <<"banana">>], false},
2407+
{[{<<"x-stream-filter">>,longstr,<<"apple">>}],
2408+
[<<"apple">>], false},
2409+
{[{<<"x-stream-filter">>,longstr,<<"apple">>}, {<<"sac">>,bool,true}],
2410+
[<<"apple">>], false},
2411+
{[{<<"x-stream-filter">>,longstr,<<"apple">>},{<<"x-stream-match-unfiltered">>,bool,true}],
2412+
[<<"apple">>], true}
2413+
]],
2414+
?assertEqual(#{}, rabbit_stream_queue:filter_spec([{<<"foo">>,longstr,<<"bar">>}])),
2415+
?assertEqual(#{}, rabbit_stream_queue:filter_spec([])),
2416+
ok.
2417+
2418+
filtering(Config) ->
2419+
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
2420+
2421+
ChPublish = rabbit_ct_client_helpers:open_channel(Config, Server),
2422+
Q = ?config(queue_name, Config),
2423+
?assertEqual({'queue.declare_ok', Q, 0, 0},
2424+
declare(ChPublish, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
2425+
2426+
#'confirm.select_ok'{} = amqp_channel:call(ChPublish, #'confirm.select'{}),
2427+
amqp_channel:register_confirm_handler(ChPublish, self()),
2428+
Publish = fun(FilterValue) ->
2429+
lists:foreach(fun(_) ->
2430+
Headers = [{<<"x-stream-filter-value">>, longstr, FilterValue}],
2431+
Msg = #amqp_msg{props = #'P_basic'{delivery_mode = 2,
2432+
headers = Headers},
2433+
payload = <<"foo">>},
2434+
ok = amqp_channel:cast(ChPublish,
2435+
#'basic.publish'{routing_key = Q},
2436+
Msg)
2437+
end,lists:seq(0, 100))
2438+
end,
2439+
Publish(<<"apple">>),
2440+
amqp_channel:wait_for_confirms(ChPublish, 5),
2441+
Publish(<<"banana">>),
2442+
amqp_channel:wait_for_confirms(ChPublish, 5),
2443+
Publish(<<"apple">>),
2444+
amqp_channel:wait_for_confirms(ChPublish, 5),
2445+
amqp_channel:close(ChPublish),
2446+
ChConsume = rabbit_ct_client_helpers:open_channel(Config, Server),
2447+
?assertMatch(#'basic.qos_ok'{},
2448+
amqp_channel:call(ChConsume, #'basic.qos'{global = false,
2449+
prefetch_count = 10})),
2450+
2451+
CTag = <<"ctag">>,
2452+
amqp_channel:subscribe(ChConsume, #'basic.consume'{queue = Q,
2453+
no_ack = false,
2454+
consumer_tag = CTag,
2455+
arguments = [{<<"x-stream-offset">>, long, 0},
2456+
{<<"x-stream-filter">>, longstr, <<"banana">>}]},
2457+
self()),
2458+
receive
2459+
#'basic.consume_ok'{consumer_tag = CTag} ->
2460+
ok
2461+
end,
2462+
2463+
receive_filtered_batch(ChConsume, 0, 100),
2464+
amqp_channel:close(ChConsume),
2465+
2466+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
23942467
%%----------------------------------------------------------------------------
23952468

2469+
receive_filtered_batch(_, Count, ExpectedSize) when Count =:= ExpectedSize ->
2470+
Count;
2471+
receive_filtered_batch(Ch, Count, ExpectedSize) ->
2472+
receive
2473+
{#'basic.deliver'{delivery_tag = DeliveryTag}, #amqp_msg{}} ->
2474+
ok = amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
2475+
multiple = false}),
2476+
receive_filtered_batch(Ch, Count + 1, ExpectedSize)
2477+
after 10000 ->
2478+
flush(),
2479+
exit({not_enough_messages, Count})
2480+
end.
2481+
23962482
delete_queues(Qs) when is_list(Qs) ->
23972483
lists:foreach(fun delete_testcase_queue/1, Qs).
23982484

0 commit comments

Comments
 (0)