Skip to content

Commit 04c94e9

Browse files
committed
Require all stable feature flags added up to 3.13.0
Since feature flag `message_containers` introduced in 3.13.0 is required in 4.0, we can also require all other feature flags introduced in or before 3.13.0 and remove their compatibility code for 4.0: * restart_streams * stream_sac_coordinator_unblock_group * stream_filtering * stream_update_config_command
1 parent 6985886 commit 04c94e9

11 files changed

+78
-287
lines changed

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,22 +101,22 @@
101101
{restart_streams,
102102
#{desc => "Support for restarting streams with optional preferred next leader argument."
103103
"Used to implement stream leader rebalancing",
104-
stability => stable,
104+
stability => required,
105105
depends_on => [stream_queue]
106106
}}).
107107

108108
-rabbit_feature_flag(
109109
{stream_sac_coordinator_unblock_group,
110110
#{desc => "Bug fix to unblock a group of consumers in a super stream partition",
111111
doc_url => "https://github.com/rabbitmq/rabbitmq-server/issues/7743",
112-
stability => stable,
112+
stability => required,
113113
depends_on => [stream_single_active_consumer]
114114
}}).
115115

116116
-rabbit_feature_flag(
117117
{stream_filtering,
118118
#{desc => "Support for stream filtering.",
119-
stability => stable,
119+
stability => required,
120120
depends_on => [stream_queue]
121121
}}).
122122

@@ -153,7 +153,7 @@
153153
{stream_update_config_command,
154154
#{desc => "A new internal command that is used to update streams as "
155155
"part of a policy.",
156-
stability => stable,
156+
stability => required,
157157
depends_on => [stream_queue]
158158
}}).
159159

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -174,19 +174,14 @@ restart_stream(QRes, Options)
174174
restart_stream(Q, Options)
175175
when ?is_amqqueue(Q) andalso
176176
?amqqueue_is_stream(Q) ->
177-
case rabbit_feature_flags:is_enabled(restart_streams) of
178-
true ->
179-
rabbit_log:info("restarting stream ~s in vhost ~s with options ~p",
180-
[maps:get(name, amqqueue:get_type_state(Q)), amqqueue:get_vhost(Q), Options]),
181-
#{name := StreamId} = amqqueue:get_type_state(Q),
182-
case process_command({restart_stream, StreamId, Options}) of
183-
{ok, {ok, LeaderPid}, _} ->
184-
{ok, node(LeaderPid)};
185-
Err ->
186-
Err
187-
end;
188-
false ->
189-
{error, {feature_flag_not_enabled, restart_stream}}
177+
rabbit_log:info("restarting stream ~s in vhost ~s with options ~p",
178+
[maps:get(name, amqqueue:get_type_state(Q)), amqqueue:get_vhost(Q), Options]),
179+
#{name := StreamId} = amqqueue:get_type_state(Q),
180+
case process_command({restart_stream, StreamId, Options}) of
181+
{ok, {ok, LeaderPid}, _} ->
182+
{ok, node(LeaderPid)};
183+
Err ->
184+
Err
190185
end.
191186

192187
delete_stream(Q, ActingUser)
@@ -254,22 +249,17 @@ policy_changed(Q) when ?is_amqqueue(Q) ->
254249
{ok, ok, ra:server_id()} | {error, not_supported | term()}.
255250
update_config(Q, Config)
256251
when ?is_amqqueue(Q) andalso is_map(Config) ->
257-
case rabbit_feature_flags:is_enabled(stream_update_config_command) of
258-
true ->
259-
%% there are the only a few configuration keys that are safe to
260-
%% update
261-
StreamId = maps:get(name, amqqueue:get_type_state(Q)),
262-
case maps:with([filter_size,
263-
retention,
264-
writer_mod,
265-
replica_mod], Config) of
266-
Conf when map_size(Conf) > 0 ->
267-
process_command({update_config, StreamId, Conf});
268-
_ ->
269-
{error, no_updatable_keys}
270-
end;
271-
false ->
272-
{error, feature_not_enabled}
252+
%% there are the only a few configuration keys that are safe to
253+
%% update
254+
StreamId = maps:get(name, amqqueue:get_type_state(Q)),
255+
case maps:with([filter_size,
256+
retention,
257+
writer_mod,
258+
replica_mod], Config) of
259+
Conf when map_size(Conf) > 0 ->
260+
process_command({update_config, StreamId, Conf});
261+
_ ->
262+
{error, no_updatable_keys}
273263
end.
274264

275265
sac_state(#?MODULE{single_active_consumer = SacState}) ->

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 27 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,7 @@
9696
soft_limit :: non_neg_integer(),
9797
slow = false :: boolean(),
9898
readers = #{} :: #{rabbit_types:ctag() => #stream{}},
99-
writer_id :: binary(),
100-
filtering_supported :: boolean()
99+
writer_id :: binary()
101100
}).
102101

103102
-import(rabbit_queue_type_util, [args_policy_lookup/3]).
@@ -286,8 +285,7 @@ consume(Q, #{no_ack := true,
286285
consume(Q, #{limiter_active := true}, _State)
287286
when ?amqqueue_is_stream(Q) ->
288287
{error, global_qos_not_supported_for_queue_type};
289-
consume(Q, Spec,
290-
#stream_client{filtering_supported = FilteringSupported} = QState0)
288+
consume(Q, Spec, #stream_client{} = QState0)
291289
when ?amqqueue_is_stream(Q) ->
292290
%% Messages should include the offset as a custom header.
293291
case get_local_pid(QState0) of
@@ -307,26 +305,19 @@ consume(Q, Spec,
307305
{error, _} = Err ->
308306
Err;
309307
{ok, OffsetSpec} ->
310-
FilterSpec = filter_spec(Args),
311-
case {FilterSpec, FilteringSupported} of
312-
{#{filter_spec := _}, false} ->
313-
{protocol_error, precondition_failed,
314-
"Filtering is not supported", []};
315-
_ ->
316-
ConsumerPrefetchCount = case Mode of
317-
{simple_prefetch, C} -> C;
318-
_ -> 0
319-
end,
320-
AckRequired = not NoAck,
321-
rabbit_core_metrics:consumer_created(
322-
ChPid, ConsumerTag, ExclusiveConsume, AckRequired,
323-
QName, ConsumerPrefetchCount, false, up, Args),
324-
%% reply needs to be sent before the stream
325-
%% begins sending
326-
maybe_send_reply(ChPid, OkMsg),
327-
_ = rabbit_stream_coordinator:register_local_member_listener(Q),
328-
begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, FilterSpec)
329-
end
308+
ConsumerPrefetchCount = case Mode of
309+
{simple_prefetch, C} -> C;
310+
_ -> 0
311+
end,
312+
AckRequired = not NoAck,
313+
rabbit_core_metrics:consumer_created(
314+
ChPid, ConsumerTag, ExclusiveConsume, AckRequired,
315+
QName, ConsumerPrefetchCount, false, up, Args),
316+
%% reply needs to be sent before the stream
317+
%% begins sending
318+
maybe_send_reply(ChPid, OkMsg),
319+
_ = rabbit_stream_coordinator:register_local_member_listener(Q),
320+
begin_stream(QState, ConsumerTag, OffsetSpec, Mode, AckRequired, filter_spec(Args))
330321
end;
331322
{undefined, _} ->
332323
{protocol_error, precondition_failed,
@@ -510,8 +501,7 @@ deliver(QSs, Msg, Options) ->
510501
lists:foldl(
511502
fun({Q, stateless}, {Qs, Actions}) ->
512503
LeaderPid = amqqueue:get_pid(Q),
513-
ok = osiris:write(LeaderPid,
514-
stream_message(Msg, filtering_supported())),
504+
ok = osiris:write(LeaderPid, stream_message(Msg)),
515505
{Qs, Actions};
516506
({Q, S0}, {Qs, Actions0}) ->
517507
{S, Actions} = deliver0(maps:get(correlation, Options, undefined),
@@ -526,11 +516,9 @@ deliver0(MsgId, Msg,
526516
next_seq = Seq,
527517
correlation = Correlation0,
528518
soft_limit = SftLmt,
529-
slow = Slow0,
530-
filtering_supported = FilteringSupported} = State,
519+
slow = Slow0} = State,
531520
Actions0) ->
532-
ok = osiris:write(LeaderPid, WriterId, Seq,
533-
stream_message(Msg, FilteringSupported)),
521+
ok = osiris:write(LeaderPid, WriterId, Seq, stream_message(Msg)),
534522
Correlation = case MsgId of
535523
undefined ->
536524
Correlation0;
@@ -547,19 +535,14 @@ deliver0(MsgId, Msg,
547535
correlation = Correlation,
548536
slow = Slow}, Actions}.
549537

550-
stream_message(Msg, FilteringSupported) ->
538+
stream_message(Msg) ->
551539
McAmqp = mc:convert(mc_amqp, Msg),
552540
MsgData = mc:protocol_state(McAmqp),
553-
case FilteringSupported of
554-
true ->
555-
case mc:x_header(<<"x-stream-filter-value">>, McAmqp) of
556-
undefined ->
557-
MsgData;
558-
{utf8, Value} ->
559-
{Value, MsgData}
560-
end;
561-
false ->
562-
MsgData
541+
case mc:x_header(<<"x-stream-filter-value">>, McAmqp) of
542+
undefined ->
543+
MsgData;
544+
{utf8, Value} ->
545+
{Value, MsgData}
563546
end.
564547

565548
-spec dequeue(_, _, _, _, client()) -> no_return().
@@ -936,8 +919,7 @@ init(Q) when ?is_amqqueue(Q) ->
936919
name = amqqueue:get_name(Q),
937920
leader = Leader,
938921
writer_id = WriterId,
939-
soft_limit = SoftLimit,
940-
filtering_supported = filtering_supported()}};
922+
soft_limit = SoftLimit}};
941923
{ok, stream_not_found, _} ->
942924
{error, stream_not_found};
943925
{error, coordinator_unavailable} = E ->
@@ -1294,8 +1276,7 @@ notify_decorators(Q) when ?is_amqqueue(Q) ->
12941276

12951277
resend_all(#stream_client{leader = LeaderPid,
12961278
writer_id = WriterId,
1297-
correlation = Corrs,
1298-
filtering_supported = FilteringSupported} = State) ->
1279+
correlation = Corrs} = State) ->
12991280
Msgs = lists:sort(maps:values(Corrs)),
13001281
case Msgs of
13011282
[] -> ok;
@@ -1304,8 +1285,7 @@ resend_all(#stream_client{leader = LeaderPid,
13041285
[Seq, maps:size(Corrs)])
13051286
end,
13061287
[begin
1307-
ok = osiris:write(LeaderPid, WriterId, Seq,
1308-
stream_message(Msg, FilteringSupported))
1288+
ok = osiris:write(LeaderPid, WriterId, Seq, stream_message(Msg))
13091289
end || {Seq, Msg} <- Msgs],
13101290
State.
13111291

@@ -1340,9 +1320,6 @@ list_with_minimum_quorum() ->
13401320

13411321
is_stateful() -> true.
13421322

1343-
filtering_supported() ->
1344-
rabbit_feature_flags:is_enabled(stream_filtering).
1345-
13461323
get_nodes(Q) when ?is_amqqueue(Q) ->
13471324
#{nodes := Nodes} = amqqueue:get_type_state(Q),
13481325
Nodes.

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -629,32 +629,12 @@ handle_consumer_removal(Group0, Consumer, Stream, ConsumerName) ->
629629
end
630630
end.
631631

632-
message_type() ->
633-
case has_unblock_group_support() of
634-
true ->
635-
map;
636-
false ->
637-
tuple
638-
end.
639-
640632
notify_consumer_effect(Pid, SubId, Stream, Name, Active) ->
641633
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false).
642634

643635
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown) ->
644-
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown, message_type()).
636+
notify_consumer_effect(Pid, SubId, Stream, Name, Active, SteppingDown, map).
645637

646-
notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, false = _SteppingDown, tuple) ->
647-
mod_call_effect(Pid,
648-
{sac,
649-
{{subscription_id, SubId},
650-
{active, Active},
651-
{extra, []}}});
652-
notify_consumer_effect(Pid, SubId, _Stream, _Name, Active, true = _SteppingDown, tuple) ->
653-
mod_call_effect(Pid,
654-
{sac,
655-
{{subscription_id, SubId},
656-
{active, Active},
657-
{extra, [{stepping_down, true}]}}});
658638
notify_consumer_effect(Pid, SubId, Stream, Name, Active, false = _SteppingDown, map) ->
659639
mod_call_effect(Pid,
660640
{sac, #{subscription_id => SubId,
@@ -776,6 +756,3 @@ mod_call_effect(Pid, Msg) ->
776756
send_message(ConnectionPid, Msg) ->
777757
ConnectionPid ! Msg,
778758
ok.
779-
780-
has_unblock_group_support() ->
781-
rabbit_feature_flags:is_enabled(stream_sac_coordinator_unblock_group).

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3138,7 +3138,6 @@ global_counters(Config) ->
31383138
ok = amqp10_client:close_connection(Connection).
31393139

31403140
stream_filtering(Config) ->
3141-
ok = rabbit_ct_broker_helpers:enable_feature_flag(Config, ?FUNCTION_NAME),
31423141
Stream = atom_to_binary(?FUNCTION_NAME),
31433142
Address = rabbitmq_amqp_address:queue(Stream),
31443143
Ch = rabbit_ct_client_helpers:open_channel(Config),

0 commit comments

Comments
 (0)