Skip to content

Commit 7bc2615

Browse files
committed
Add stream filtering feature flag
1 parent 065844a commit 7bc2615

File tree

6 files changed

+189
-38
lines changed

6 files changed

+189
-38
lines changed

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,3 +113,10 @@
113113
stability => stable,
114114
depends_on => [stream_single_active_consumer]
115115
}}).
116+
117+
-rabbit_feature_flag(
118+
{stream_filtering,
119+
#{desc => "Support for stream filtering.",
120+
stability => stable,
121+
depends_on => [stream_queue]
122+
}}).

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 73 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@
9595
outstanding_requests :: #{integer() => #request{}},
9696
deliver_version :: rabbit_stream_core:command_version(),
9797
request_timeout :: pos_integer(),
98-
outstanding_requests_timer :: undefined | erlang:reference()}).
98+
outstanding_requests_timer :: undefined | erlang:reference(),
99+
filtering_supported :: boolean()}).
99100
-record(configuration,
100101
{initial_credits :: integer(),
101102
credits_required_for_unblocking :: integer(),
@@ -257,7 +258,8 @@ init([KeepaliveSup,
257258
correlation_id_sequence = 0,
258259
outstanding_requests = #{},
259260
request_timeout = RequestTimeout,
260-
deliver_version = DeliverVersion},
261+
deliver_version = DeliverVersion,
262+
filtering_supported = rabbit_stream_utils:filtering_supported()},
261263
State =
262264
#stream_connection_state{consumers = #{},
263265
blocked = false,
@@ -269,6 +271,13 @@ init([KeepaliveSup,
269271
application:get_env(rabbitmq_stream,
270272
connection_negotiation_step_timeout,
271273
10_000),
274+
Config = #configuration{
275+
initial_credits = InitialCredits,
276+
credits_required_for_unblocking = CreditsRequiredBeforeUnblocking,
277+
frame_max = FrameMax,
278+
heartbeat = Heartbeat,
279+
connection_negotiation_step_timeout = ConnectionNegotiationStepTimeout
280+
},
272281
% gen_statem process has its start_link call not return until the init function returns.
273282
% This is problematic, because we won't be able to call ranch:handshake/2
274283
% from the init callback as this would cause a deadlock to happen.
@@ -280,20 +289,7 @@ init([KeepaliveSup,
280289
#statem_data{transport = Transport,
281290
connection = Connection,
282291
connection_state = State,
283-
config =
284-
#configuration{initial_credits
285-
=
286-
InitialCredits,
287-
credits_required_for_unblocking
288-
=
289-
CreditsRequiredBeforeUnblocking,
290-
frame_max =
291-
FrameMax,
292-
heartbeat =
293-
Heartbeat,
294-
connection_negotiation_step_timeout
295-
=
296-
ConnectionNegotiationStepTimeout}});
292+
config = Config});
297293
{Error, Reason} ->
298294
rabbit_net:fast_close(RealSocket),
299295
rabbit_log_connection:warning("Closing connection because of ~tp ~tp",
@@ -1753,6 +1749,31 @@ handle_frame_post_auth(Transport,
17531749
{publish, PublisherId, MessageCount, Messages}) ->
17541750
handle_frame_post_auth(Transport, Connection, State,
17551751
{publish, ?VERSION_1, PublisherId, MessageCount, Messages});
1752+
handle_frame_post_auth(Transport,
1753+
#stream_connection{filtering_supported = false,
1754+
publishers = Publishers,
1755+
socket = S} = Connection,
1756+
State,
1757+
{publish_v2, PublisherId, MessageCount, Messages}) ->
1758+
case Publishers of
1759+
#{PublisherId := #publisher{message_counters = Counters}} ->
1760+
increase_messages_received(Counters, MessageCount),
1761+
increase_messages_errored(Counters, MessageCount),
1762+
ok;
1763+
_ ->
1764+
ok
1765+
end,
1766+
rabbit_global_counters:increase_protocol_counter(stream,
1767+
?PRECONDITION_FAILED,
1768+
1),
1769+
PublishingIds = publishing_ids_from_messages(?VERSION_2, Messages),
1770+
Command = {publish_error,
1771+
PublisherId,
1772+
?RESPONSE_CODE_PRECONDITION_FAILED,
1773+
PublishingIds},
1774+
Frame = rabbit_stream_core:frame(Command),
1775+
send(Transport, S, Frame),
1776+
{Connection, State};
17561777
handle_frame_post_auth(Transport,
17571778
Connection,
17581779
State,
@@ -1904,15 +1925,42 @@ handle_frame_post_auth(Transport,
19041925
{Connection0, State}
19051926
end;
19061927
handle_frame_post_auth(Transport,
1907-
#stream_connection{name = ConnName,
1908-
socket = Socket,
1909-
stream_subscriptions =
1910-
StreamSubscriptions,
1911-
virtual_host = VirtualHost,
1912-
user = User,
1913-
send_file_oct = SendFileOct,
1914-
transport = ConnTransport} =
1915-
Connection,
1928+
#stream_connection{filtering_supported = false} = Connection,
1929+
State,
1930+
{request, CorrelationId,
1931+
{subscribe,
1932+
SubscriptionId, _, _, _, Properties}} = Request) ->
1933+
case rabbit_stream_utils:filter_defined(Properties) of
1934+
true ->
1935+
rabbit_log:warning("Cannot create subcription ~tp, it defines a filter "
1936+
"and filtering is not active",
1937+
[SubscriptionId]),
1938+
response(Transport,
1939+
Connection,
1940+
subscribe,
1941+
CorrelationId,
1942+
?RESPONSE_CODE_PRECONDITION_FAILED),
1943+
rabbit_global_counters:increase_protocol_counter(stream,
1944+
?PRECONDITION_FAILED,
1945+
1),
1946+
{Connection, State};
1947+
false ->
1948+
handle_frame_post_auth(Transport, {ok, Connection}, State, Request)
1949+
end;
1950+
handle_frame_post_auth(Transport, #stream_connection{} = Connection, State,
1951+
{request, _,
1952+
{subscribe,
1953+
_, _, _, _, _}} = Request) ->
1954+
handle_frame_post_auth(Transport, {ok, Connection}, State, Request);
1955+
handle_frame_post_auth(Transport,
1956+
{ok, #stream_connection{
1957+
name = ConnName,
1958+
socket = Socket,
1959+
stream_subscriptions = StreamSubscriptions,
1960+
virtual_host = VirtualHost,
1961+
user = User,
1962+
send_file_oct = SendFileOct,
1963+
transport = ConnTransport} = Connection},
19161964
#stream_connection_state{consumers = Consumers} = State,
19171965
{request, CorrelationId,
19181966
{subscribe,

deps/rabbitmq_stream/src/rabbit_stream_utils.erl

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@
2929
sort_partitions/1,
3030
strip_cr_lf/1,
3131
consumer_activity_status/2,
32+
filter_defined/1,
3233
filter_spec/1,
33-
command_versions/0]).
34+
command_versions/0,
35+
filtering_supported/0]).
3436

3537
-define(MAX_PERMISSION_CACHE_SIZE, 12).
3638

@@ -302,6 +304,15 @@ consumer_activity_status(Active, Properties) ->
302304
waiting
303305
end.
304306

307+
filter_defined(SubscriptionProperties) when is_map(SubscriptionProperties) ->
308+
lists:any(fun(<<"filter.",_/binary>>) ->
309+
true;
310+
(_) ->
311+
false
312+
end, maps:keys(SubscriptionProperties));
313+
filter_defined(_) ->
314+
false.
315+
305316
filter_spec(Properties) ->
306317
Filters = maps:fold(fun(<<"filter.",_/binary>>, V, Acc) ->
307318
[V] ++ Acc;
@@ -323,8 +334,14 @@ filter_spec(Properties) ->
323334
end.
324335

325336
command_versions() ->
337+
PublishMaxVersion = case filtering_supported() of
338+
false ->
339+
?VERSION_1;
340+
true ->
341+
?VERSION_2
342+
end,
326343
[{declare_publisher, ?VERSION_1, ?VERSION_1},
327-
{publish, ?VERSION_1, ?VERSION_2},
344+
{publish, ?VERSION_1, PublishMaxVersion},
328345
{query_publisher_sequence, ?VERSION_1, ?VERSION_1},
329346
{delete_publisher, ?VERSION_1, ?VERSION_1},
330347
{subscribe, ?VERSION_1, ?VERSION_1},
@@ -340,3 +357,6 @@ command_versions() ->
340357
{route, ?VERSION_1, ?VERSION_1},
341358
{partitions, ?VERSION_1, ?VERSION_1},
342359
{stream_stats, ?VERSION_1, ?VERSION_1}].
360+
361+
filtering_supported() ->
362+
rabbit_feature_flags:is_enabled(stream_filtering).

deps/rabbitmq_stream/test/commands_SUITE.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,7 @@ subscribe(S, SubId, Stream, SubProperties, C) ->
698698
SubId,
699699
Stream,
700700
SubProperties,
701+
?RESPONSE_CODE_OK,
701702
C).
702703

703704
subscribe(S, SubId, Stream, C) ->

deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl

Lines changed: 74 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ all() ->
3333

3434
groups() ->
3535
[{single_node, [],
36-
[test_stream,
36+
[filtering_ff, %% must stay at the top, feature flag disabled for this one
37+
test_stream,
3738
test_stream_tls,
3839
test_publish_v2,
3940
test_gc_consumers,
@@ -72,6 +73,22 @@ init_per_group(Group, Config)
7273
{rabbitmq_ct_tls_verify, verify_none},
7374
{rabbitmq_stream, verify_none}
7475
]),
76+
%% filtering feature flag disabled for the first test,
77+
%% then enabled in the end_per_testcase function
78+
ExtraSetupSteps =
79+
case Group of
80+
single_node ->
81+
[fun(StepConfig) ->
82+
rabbit_ct_helpers:merge_app_env(StepConfig,
83+
{rabbit,
84+
[{forced_feature_flags_on_init,
85+
[stream_queue,
86+
stream_sac_coordinator_unblock_group,
87+
stream_single_active_consumer]}]})
88+
end];
89+
_ ->
90+
[]
91+
end,
7592
rabbit_ct_helpers:run_setup_steps(
7693
Config1,
7794
[fun(StepConfig) ->
@@ -86,6 +103,7 @@ init_per_group(Group, Config)
86103
[{connection_negotiation_step_timeout,
87104
500}]})
88105
end]
106+
++ ExtraSetupSteps
89107
++ rabbit_ct_broker_helpers:setup_steps());
90108
init_per_group(cluster = Group, Config) ->
91109
Config1 = rabbit_ct_helpers:set_config(
@@ -123,6 +141,13 @@ init_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config
123141
init_per_testcase(TestCase, Config) ->
124142
rabbit_ct_helpers:testcase_started(Config, TestCase).
125143

144+
end_per_testcase(filtering_ff = TestCase, Config) ->
145+
_ = rabbit_ct_broker_helpers:rpc(Config,
146+
0,
147+
rabbit_feature_flags,
148+
enable,
149+
[stream_filtering]),
150+
rabbit_ct_helpers:testcase_finished(Config, TestCase);
126151
end_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config) ->
127152
ok = rabbit_ct_broker_helpers:rpc(Config,
128153
0,
@@ -133,6 +158,32 @@ end_per_testcase(close_connection_on_consumer_update_timeout = TestCase, Config)
133158
end_per_testcase(TestCase, Config) ->
134159
rabbit_ct_helpers:testcase_finished(Config, TestCase).
135160

161+
filtering_ff(Config) ->
162+
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
163+
Transport = gen_tcp,
164+
Port = get_stream_port(Config),
165+
Opts = [{active, false}, {mode, binary}],
166+
{ok, S} = Transport:connect("localhost", Port, Opts),
167+
C0 = rabbit_stream_core:init(0),
168+
C1 = test_peer_properties(Transport, S, C0),
169+
C2 = test_authenticate(Transport, S, C1),
170+
C3 = test_create_stream(Transport, S, Stream, C2),
171+
PublisherId = 42,
172+
C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3),
173+
Body = <<"hello">>,
174+
C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
175+
publish_error, C4),
176+
SubscriptionId = 42,
177+
C6 = test_subscribe(Transport, S, SubscriptionId, Stream,
178+
#{<<"filter.0">> => <<"foo">>},
179+
?RESPONSE_CODE_PRECONDITION_FAILED,
180+
C5),
181+
182+
C7 = test_delete_stream(Transport, S, Stream, C6),
183+
_C8 = test_close(Transport, S, C7),
184+
closed = wait_for_socket_close(Transport, S, 10),
185+
ok.
186+
136187
test_global_counters(Config) ->
137188
Stream = atom_to_binary(?FUNCTION_NAME, utf8),
138189
test_server(gen_tcp, Stream, Config),
@@ -188,11 +239,14 @@ test_publish_v2(Config) ->
188239
PublisherId = 42,
189240
C4 = test_declare_publisher(Transport, S, PublisherId, Stream, C3),
190241
Body = <<"hello">>,
191-
C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body, C4),
192-
C6 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body, C5),
242+
C5 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
243+
publish_confirm, C4),
244+
C6 = test_publish_confirm(Transport, S, publish_v2, PublisherId, Body,
245+
publish_confirm, C5),
193246
SubscriptionId = 42,
194247
C7 = test_subscribe(Transport, S, SubscriptionId, Stream,
195248
#{<<"filter.0">> => <<"foo">>},
249+
?RESPONSE_CODE_OK,
196250
C6),
197251
C8 = test_deliver(Transport, S, SubscriptionId, 0, Body, C7),
198252
C8b = test_deliver(Transport, S, SubscriptionId, 1, Body, C8),
@@ -419,6 +473,7 @@ close_connection_on_consumer_update_timeout(Config) ->
419473
C4 = test_subscribe(Transport, S, SubId, Stream,
420474
#{<<"single-active-consumer">> => <<"true">>,
421475
<<"name">> => <<"foo">>},
476+
?RESPONSE_CODE_OK,
422477
C3),
423478
{Cmd, _C5} = receive_commands(Transport, S, C4),
424479
?assertMatch({request, _, {consumer_update, SubId, true}}, Cmd),
@@ -681,18 +736,21 @@ test_declare_publisher(Transport, S, PublisherId, Stream, C0) ->
681736
C.
682737

683738
test_publish_confirm(Transport, S, PublisherId, Body, C0) ->
684-
test_publish_confirm(Transport, S, publish, PublisherId, Body, C0).
739+
test_publish_confirm(Transport, S, publish, PublisherId, Body,
740+
publish_confirm, C0).
685741

686-
test_publish_confirm(Transport, S, publish = PublishCmd, PublisherId, Body, C0) ->
742+
test_publish_confirm(Transport, S, publish = PublishCmd, PublisherId, Body,
743+
ExpectedConfirmCommand,C0) ->
687744
BodySize = byte_size(Body),
688745
Messages = [<<1:64, 0:1, BodySize:31, Body:BodySize/binary>>],
689746
PublishFrame =
690747
rabbit_stream_core:frame({PublishCmd, PublisherId, 1, Messages}),
691748
ok = Transport:send(S, PublishFrame),
692749
{Cmd, C} = receive_commands(Transport, S, C0),
693-
?assertMatch({publish_confirm, PublisherId, [1]}, Cmd),
750+
?assertMatch({ExpectedConfirmCommand, PublisherId, [1]}, Cmd),
694751
C;
695-
test_publish_confirm(Transport, S, publish_v2 = PublishCmd, PublisherId, Body, C0) ->
752+
test_publish_confirm(Transport, S, publish_v2 = PublishCmd, PublisherId, Body,
753+
ExpectedConfirmCommand, C0) ->
696754
BodySize = byte_size(Body),
697755
FilterValue = <<"foo">>,
698756
FilterValueSize = byte_size(FilterValue),
@@ -702,7 +760,12 @@ test_publish_confirm(Transport, S, publish_v2 = PublishCmd, PublisherId, Body, C
702760
rabbit_stream_core:frame({PublishCmd, PublisherId, 1, Messages}),
703761
ok = Transport:send(S, PublishFrame),
704762
{Cmd, C} = receive_commands(Transport, S, C0),
705-
?assertMatch({publish_confirm, PublisherId, [1]}, Cmd),
763+
case ExpectedConfirmCommand of
764+
publish_confirm ->
765+
?assertMatch({ExpectedConfirmCommand, PublisherId, [1]}, Cmd);
766+
publish_error ->
767+
?assertMatch({ExpectedConfirmCommand, PublisherId, _, [1]}, Cmd)
768+
end,
706769
C.
707770

708771
test_subscribe(Transport, S, SubscriptionId, Stream, C0) ->
@@ -711,21 +774,23 @@ test_subscribe(Transport, S, SubscriptionId, Stream, C0) ->
711774
SubscriptionId,
712775
Stream,
713776
#{<<"random">> => <<"thing">>},
777+
?RESPONSE_CODE_OK,
714778
C0).
715779

716780
test_subscribe(Transport,
717781
S,
718782
SubscriptionId,
719783
Stream,
720784
SubscriptionProperties,
785+
ExpectedResponseCode,
721786
C0) ->
722787
SubCmd =
723788
{request, 1,
724789
{subscribe, SubscriptionId, Stream, 0, 10, SubscriptionProperties}},
725790
SubscribeFrame = rabbit_stream_core:frame(SubCmd),
726791
ok = Transport:send(S, SubscribeFrame),
727792
{Cmd, C} = receive_commands(Transport, S, C0),
728-
?assertMatch({response, 1, {subscribe, ?RESPONSE_CODE_OK}}, Cmd),
793+
?assertMatch({response, 1, {subscribe, ExpectedResponseCode}}, Cmd),
729794
C.
730795

731796
test_unsubscribe(Transport, Socket, SubscriptionId, C0) ->

0 commit comments

Comments
 (0)