Skip to content

Commit a9eb8b5

Browse files
authored
Merge pull request #8207 from rabbitmq/stream-chunk-filtering
Stream server side filtering
2 parents 168b031 + b89976a commit a9eb8b5

17 files changed

+716
-121
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
%% This Source Code Form is subject to the terms of the Mozilla Public
1+
% This Source Code Form is subject to the terms of the Mozilla Public
22
%% License, v. 2.0. If a copy of the MPL was not distributed with this
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
@@ -834,26 +834,27 @@ check_arguments_key(QueueName, QueueType, Args, InvalidArgs) ->
834834
end, Args).
835835

836836
declare_args() ->
837-
[{<<"x-expires">>, fun check_expires_arg/2},
838-
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
839-
{<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2},
837+
[{<<"x-expires">>, fun check_expires_arg/2},
838+
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
839+
{<<"x-dead-letter-exchange">>, fun check_dlxname_arg/2},
840840
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
841-
{<<"x-dead-letter-strategy">>, fun check_dlxstrategy_arg/2},
842-
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
843-
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
844-
{<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2},
845-
{<<"x-max-in-memory-bytes">>, fun check_non_neg_int_arg/2},
846-
{<<"x-max-priority">>, fun check_max_priority_arg/2},
847-
{<<"x-overflow">>, fun check_overflow/2},
848-
{<<"x-queue-mode">>, fun check_queue_mode/2},
849-
{<<"x-queue-version">>, fun check_queue_version/2},
850-
{<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2},
851-
{<<"x-queue-type">>, fun check_queue_type/2},
852-
{<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2},
853-
{<<"x-max-age">>, fun check_max_age_arg/2},
854-
{<<"x-stream-max-segment-size-bytes">>, fun check_non_neg_int_arg/2},
855-
{<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2},
856-
{<<"x-queue-leader-locator">>, fun check_queue_leader_locator_arg/2}].
841+
{<<"x-dead-letter-strategy">>, fun check_dlxstrategy_arg/2},
842+
{<<"x-max-length">>, fun check_non_neg_int_arg/2},
843+
{<<"x-max-length-bytes">>, fun check_non_neg_int_arg/2},
844+
{<<"x-max-in-memory-length">>, fun check_non_neg_int_arg/2},
845+
{<<"x-max-in-memory-bytes">>, fun check_non_neg_int_arg/2},
846+
{<<"x-max-priority">>, fun check_max_priority_arg/2},
847+
{<<"x-overflow">>, fun check_overflow/2},
848+
{<<"x-queue-mode">>, fun check_queue_mode/2},
849+
{<<"x-queue-version">>, fun check_queue_version/2},
850+
{<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2},
851+
{<<"x-queue-type">>, fun check_queue_type/2},
852+
{<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2},
853+
{<<"x-max-age">>, fun check_max_age_arg/2},
854+
{<<"x-stream-max-segment-size-bytes">>, fun check_non_neg_int_arg/2},
855+
{<<"x-stream-filter-size-bytes">>, fun check_non_neg_int_arg/2},
856+
{<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2},
857+
{<<"x-queue-leader-locator">>, fun check_queue_leader_locator_arg/2}].
857858

858859
consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
859860
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2},

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,10 @@
113113
stability => stable,
114114
depends_on => [stream_single_active_consumer]
115115
}}).
116+
117+
-rabbit_feature_flag(
118+
{stream_filtering,
119+
#{desc => "Support for stream filtering.",
120+
stability => stable,
121+
depends_on => [stream_queue]
122+
}}).

deps/rabbit/src/rabbit_policies.erl

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ register() ->
4444
{policy_validator, <<"delivery-limit">>},
4545
{policy_validator, <<"max-age">>},
4646
{policy_validator, <<"stream-max-segment-size-bytes">>},
47+
{policy_validator, <<"stream-filter-size-bytes">>},
4748
{policy_validator, <<"queue-leader-locator">>},
4849
{policy_validator, <<"initial-cluster-size">>},
4950
{operator_policy_validator, <<"expires">>},
@@ -195,7 +196,13 @@ validate_policy0(<<"stream-max-segment-size-bytes">>, Value)
195196
when is_integer(Value), Value >= 0, Value =< ?MAX_STREAM_MAX_SEGMENT_SIZE ->
196197
ok;
197198
validate_policy0(<<"stream-max-segment-size-bytes">>, Value) ->
198-
{error, "~tp is not a valid segment size", [Value]}.
199+
{error, "~tp is not a valid segment size", [Value]};
200+
201+
validate_policy0(<<"stream-filter-size-bytes">>, Value)
202+
when is_integer(Value), Value >= 16, Value =< 255 ->
203+
ok;
204+
validate_policy0(<<"stream-filter-size-bytes">>, Value) ->
205+
{error, "~tp is not a valid filter size. Valid range is 16-255", [Value]}.
199206

200207
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
201208
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 115 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@
4747
-export([update_stream_conf/2]).
4848
-export([readers/1]).
4949

50-
-export([parse_offset_arg/1]).
50+
-export([parse_offset_arg/1,
51+
filter_spec/1]).
5152

5253
-export([status/2,
5354
tracking_status/2,
@@ -72,7 +73,8 @@
7273
max :: non_neg_integer(),
7374
start_offset = 0 :: non_neg_integer(),
7475
listening_offset = 0 :: non_neg_integer(),
75-
log :: undefined | osiris_log:state()}).
76+
log :: undefined | osiris_log:state(),
77+
reader_options :: map()}).
7678

7779
-record(stream_client, {stream_id :: string(),
7880
name :: term(),
@@ -83,7 +85,8 @@
8385
soft_limit :: non_neg_integer(),
8486
slow = false :: boolean(),
8587
readers = #{} :: #{term() => #stream{}},
86-
writer_id :: binary()
88+
writer_id :: binary(),
89+
filtering_supported :: boolean()
8790
}).
8891

8992
-import(rabbit_queue_type_util, [args_policy_lookup/3]).
@@ -110,7 +113,9 @@ declare(Q0, _Node) when ?amqqueue_is_stream(Q0) ->
110113
[fun rabbit_queue_type_util:check_auto_delete/1,
111114
fun rabbit_queue_type_util:check_exclusive/1,
112115
fun rabbit_queue_type_util:check_non_durable/1,
113-
fun rabbit_stream_queue:check_max_segment_size_bytes/1],
116+
fun check_max_segment_size_bytes/1,
117+
fun check_filter_size/1
118+
],
114119
Q0) of
115120
ok ->
116121
create_stream(Q0);
@@ -130,6 +135,18 @@ check_max_segment_size_bytes(Q) ->
130135
ok
131136
end.
132137

138+
check_filter_size(Q) ->
139+
Args = amqqueue:get_arguments(Q),
140+
case rabbit_misc:table_lookup(Args, <<"x-stream-filter-size-bytes">>) of
141+
undefined ->
142+
ok;
143+
{_Type, Val} when Val > 255 orelse Val < 16 ->
144+
{protocol_error, precondition_failed,
145+
"Invalid value for x-stream-filter-size-bytes", []};
146+
_ ->
147+
ok
148+
end.
149+
133150
create_stream(Q0) ->
134151
Arguments = amqqueue:get_arguments(Q0),
135152
QName = amqqueue:get_name(Q0),
@@ -219,7 +236,8 @@ consume(Q, #{no_ack := true}, _)
219236
consume(Q, #{limiter_active := true}, _State)
220237
when ?amqqueue_is_stream(Q) ->
221238
{error, global_qos_not_supported_for_queue_type};
222-
consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
239+
consume(Q, Spec,
240+
#stream_client{filtering_supported = FilteringSupported} = QState0) when ?amqqueue_is_stream(Q) ->
223241
%% Messages should include the offset as a custom header.
224242
case check_queue_exists_in_local_node(Q) of
225243
ok ->
@@ -244,7 +262,14 @@ consume(Q, Spec, QState0) when ?amqqueue_is_stream(Q) ->
244262
%% really it should be sent by the stream queue process like classic queues
245263
%% do
246264
maybe_send_reply(ChPid, OkMsg),
247-
begin_stream(QState0, ConsumerTag, OffsetSpec, ConsumerPrefetchCount)
265+
FilterSpec = filter_spec(Args),
266+
case {FilterSpec, FilteringSupported} of
267+
{#{filter_spec := _}, false} ->
268+
{protocol_error, precondition_failed, "Filtering is not supported", []};
269+
_ ->
270+
begin_stream(QState0, ConsumerTag, OffsetSpec,
271+
ConsumerPrefetchCount, FilterSpec)
272+
end
248273
end;
249274
Err ->
250275
Err
@@ -278,6 +303,35 @@ parse_offset_arg({_, V}) ->
278303
parse_offset_arg(V) ->
279304
{error, {invalid_offset_arg, V}}.
280305

306+
filter_spec(Args) ->
307+
Filters = case lists:keysearch(<<"x-stream-filter">>, 1, Args) of
308+
{value, {_, array, FilterValues}} ->
309+
lists:foldl(fun({longstr, V}, Acc) ->
310+
[V] ++ Acc;
311+
(_, Acc) ->
312+
Acc
313+
end, [], FilterValues);
314+
{value, {_, longstr, FilterValue}} ->
315+
[FilterValue];
316+
_ ->
317+
undefined
318+
end,
319+
MatchUnfiltered = case lists:keysearch(<<"x-stream-match-unfiltered">>, 1, Args) of
320+
{value, {_, bool, Match}} when is_list(Filters) ->
321+
Match;
322+
_ when is_list(Filters) ->
323+
false;
324+
_ ->
325+
undefined
326+
end,
327+
case MatchUnfiltered of
328+
undefined ->
329+
#{};
330+
MU ->
331+
#{filter_spec =>
332+
#{filters => Filters, match_unfiltered => MU}}
333+
end.
334+
281335
get_local_pid(#stream_client{local_pid = Pid} = State)
282336
when is_pid(Pid) ->
283337
{Pid, State};
@@ -295,14 +349,14 @@ get_local_pid(#stream_client{stream_id = StreamId,
295349
end.
296350

297351
begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
298-
Tag, Offset, Max) ->
352+
Tag, Offset, Max, Options) ->
299353
{LocalPid, State} = get_local_pid(State0),
300354
case LocalPid of
301355
undefined ->
302356
{error, no_local_stream_replica_available};
303357
_ ->
304358
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
305-
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec),
359+
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options),
306360
NextOffset = osiris_log:next_offset(Seg0) - 1,
307361
osiris:register_offset_listener(LocalPid, NextOffset),
308362
%% TODO: avoid double calls to the same process
@@ -317,7 +371,8 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
317371
start_offset = StartOffset,
318372
listening_offset = NextOffset,
319373
log = Seg0,
320-
max = Max},
374+
max = Max,
375+
reader_options = Options},
321376
{ok, State#stream_client{local_pid = LocalPid,
322377
readers = Readers0#{Tag => Str0}}}
323378
end.
@@ -369,7 +424,8 @@ deliver(QSs, #delivery{message = Msg, confirm = Confirm} = Delivery) ->
369424
lists:foldl(
370425
fun({Q, stateless}, {Qs, Actions}) ->
371426
LeaderPid = amqqueue:get_pid(Q),
372-
ok = osiris:write(LeaderPid, msg_to_iodata(Msg)),
427+
ok = osiris:write(LeaderPid,
428+
stream_message(Msg, filtering_supported())),
373429
{Qs, Actions};
374430
({Q, S0}, {Qs, Actions}) ->
375431
{S, As} = deliver(Confirm, Delivery, S0),
@@ -383,8 +439,10 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
383439
next_seq = Seq,
384440
correlation = Correlation0,
385441
soft_limit = SftLmt,
386-
slow = Slow0} = State) ->
387-
ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg)),
442+
slow = Slow0,
443+
filtering_supported = FilteringSupported} = State) ->
444+
ok = osiris:write(LeaderPid, WriterId, Seq,
445+
stream_message(Msg, FilteringSupported)),
388446
Correlation = case MsgId of
389447
undefined ->
390448
Correlation0;
@@ -401,6 +459,25 @@ deliver(_Confirm, #delivery{message = Msg, msg_seq_no = MsgId},
401459
correlation = Correlation,
402460
slow = Slow}, Actions}.
403461

462+
stream_message(Msg, _FilteringSupported = true) ->
463+
MsgData = msg_to_iodata(Msg),
464+
case filter_header(Msg) of
465+
{_, longstr, Value} ->
466+
{Value, MsgData};
467+
_ ->
468+
MsgData
469+
end;
470+
stream_message(Msg, _FilteringSupported = false) ->
471+
msg_to_iodata(Msg).
472+
473+
filter_header(Msg) ->
474+
basic_header(<<"x-stream-filter-value">>, Msg).
475+
476+
basic_header(Key, #basic_message{content = Content}) ->
477+
Headers = rabbit_basic:extract_headers(Content),
478+
rabbit_basic:header(Key, Headers).
479+
480+
404481
-spec dequeue(_, _, _, _, client()) -> no_return().
405482
dequeue(_, _, _, _, #stream_client{name = Name}) ->
406483
{protocol_error, not_implemented, "basic.get not supported by stream queues ~ts",
@@ -448,12 +525,14 @@ handle_event(_QName, {stream_local_member_change, Pid}, #stream_client{local_pid
448525
handle_event(_QName, {stream_local_member_change, Pid}, State = #stream_client{name = QName,
449526
readers = Readers0}) ->
450527
rabbit_log:debug("Local member change event for ~tp", [QName]),
451-
Readers1 = maps:fold(fun(T, #stream{log = Log0} = S0, Acc) ->
528+
Readers1 = maps:fold(fun(T, #stream{log = Log0, reader_options = Options} = S0, Acc) ->
452529
Offset = osiris_log:next_offset(Log0),
453530
osiris_log:close(Log0),
454531
CounterSpec = {{?MODULE, QName, self()}, []},
455-
rabbit_log:debug("Re-creating Osiris reader for consumer ~tp at offset ~tp", [T, Offset]),
456-
{ok, Log1} = osiris:init_reader(Pid, Offset, CounterSpec),
532+
rabbit_log:debug("Re-creating Osiris reader for consumer ~tp at offset ~tp "
533+
" with options ~tp",
534+
[T, Offset, Options]),
535+
{ok, Log1} = osiris:init_reader(Pid, Offset, CounterSpec, Options),
457536
NextOffset = osiris_log:next_offset(Log1) - 1,
458537
rabbit_log:debug("Registering offset listener at offset ~tp", [NextOffset]),
459538
osiris:register_offset_listener(Pid, NextOffset),
@@ -761,7 +840,8 @@ init(Q) when ?is_amqqueue(Q) ->
761840
name = amqqueue:get_name(Q),
762841
leader = Leader,
763842
writer_id = WriterId,
764-
soft_limit = SoftLimit}};
843+
soft_limit = SoftLimit,
844+
filtering_supported = filtering_supported()}};
765845
{ok, stream_not_found, _} ->
766846
{error, stream_not_found};
767847
{error, coordinator_unavailable} = E ->
@@ -865,34 +945,29 @@ delete_replica(VHost, Name, Node) ->
865945
make_stream_conf(Q) ->
866946
QName = amqqueue:get_name(Q),
867947
Name = stream_name(QName),
868-
%% MaxLength = args_policy_lookup(<<"max-length">>, policy_precedence/2, Q),
869-
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun policy_precedence/2, Q),
870-
MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun policy_precedence/2, Q)),
871-
MaxSegmentSizeBytes = args_policy_lookup(<<"stream-max-segment-size-bytes">>, fun policy_precedence/2, Q),
872948
Formatter = {?MODULE, format_osiris_event, [QName]},
873-
Retention = lists:filter(fun({_, R}) ->
874-
R =/= undefined
875-
end, [{max_bytes, MaxBytes},
876-
{max_age, MaxAge}]),
877-
add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes,
878-
#{reference => QName,
879-
name => Name,
880-
retention => Retention,
881-
event_formatter => Formatter,
882-
epoch => 1}).
949+
update_stream_conf(Q, #{reference => QName,
950+
name => Name,
951+
event_formatter => Formatter,
952+
epoch => 1}).
883953

884954
update_stream_conf(undefined, #{} = Conf) ->
885955
Conf;
886956
update_stream_conf(Q, #{} = Conf) when ?is_amqqueue(Q) ->
887957
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun policy_precedence/2, Q),
888958
MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun policy_precedence/2, Q)),
889-
MaxSegmentSizeBytes = args_policy_lookup(<<"stream-max-segment-size-bytes">>, fun policy_precedence/2, Q),
959+
MaxSegmentSizeBytes = args_policy_lookup(<<"stream-max-segment-size-bytes">>,
960+
fun policy_precedence/2, Q),
961+
FilterSizeBytes = args_policy_lookup(<<"stream-filter-size-bytes">>,
962+
fun policy_precedence/2, Q),
890963
Retention = lists:filter(fun({_, R}) ->
891964
R =/= undefined
892965
end, [{max_bytes, MaxBytes},
893966
{max_age, MaxAge}]),
894-
add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes,
895-
Conf#{retention => Retention}).
967+
add_if_defined(
968+
filter_size, FilterSizeBytes,
969+
add_if_defined(max_segment_size_bytes, MaxSegmentSizeBytes,
970+
Conf#{retention => Retention})).
896971

897972
add_if_defined(_, undefined, Map) ->
898973
Map;
@@ -1047,7 +1122,8 @@ notify_decorators(Q) when ?is_amqqueue(Q) ->
10471122

10481123
resend_all(#stream_client{leader = LeaderPid,
10491124
writer_id = WriterId,
1050-
correlation = Corrs} = State) ->
1125+
correlation = Corrs,
1126+
filtering_supported = FilteringSupported} = State) ->
10511127
Msgs = lists:sort(maps:values(Corrs)),
10521128
case Msgs of
10531129
[] -> ok;
@@ -1056,7 +1132,8 @@ resend_all(#stream_client{leader = LeaderPid,
10561132
[Seq, maps:size(Corrs)])
10571133
end,
10581134
[begin
1059-
ok = osiris:write(LeaderPid, WriterId, Seq, msg_to_iodata(Msg))
1135+
ok = osiris:write(LeaderPid, WriterId, Seq,
1136+
stream_message(Msg, FilteringSupported))
10601137
end || {Seq, Msg} <- Msgs],
10611138
State.
10621139

@@ -1089,3 +1166,6 @@ list_with_minimum_quorum() ->
10891166
end, rabbit_amqqueue:list_local_stream_queues()).
10901167

10911168
is_stateful() -> true.
1169+
1170+
filtering_supported() ->
1171+
rabbit_feature_flags:is_enabled(stream_filtering).

0 commit comments

Comments
 (0)