Skip to content

Commit 628ba71

Browse files
kjnilssonmergify[bot]
authored andcommitted
Stream coordinator: add update_config/2 function.
To allow config updates without going through a policy update. (cherry picked from commit f872372)
1 parent e003dbb commit 628ba71

File tree

1 file changed

+34
-16
lines changed

1 file changed

+34
-16
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
tick/2,
2121
version/0,
2222
which_module/1,
23-
overview/1,
23+
overview/1]).
24+
25+
-export([update_config/2,
2426
policy_changed/1]).
2527

2628
%% coordinator API
@@ -236,16 +238,38 @@ delete_replica(StreamId, Node) ->
236238

237239
policy_changed(Q) when ?is_amqqueue(Q) ->
238240
StreamId = maps:get(name, amqqueue:get_type_state(Q)),
241+
Config = rabbit_stream_queue:update_stream_conf(Q, #{}),
242+
case update_config(Q, Config) of
243+
{ok, ok, _} = Res ->
244+
Res;
245+
{error, feature_not_enabled} ->
246+
%% backwards compatibility
247+
%% TODO: remove in future
248+
process_command({policy_changed, StreamId, #{queue => Q}});
249+
Err ->
250+
Err
251+
end.
252+
253+
-spec update_config(amqqueue:amqqueue(), #{atom() => term()}) ->
254+
{ok, ok, ra:server_id()} | {error, not_supported | term()}.
255+
update_config(Q, Config)
256+
when ?is_amqqueue(Q) andalso is_map(Config) ->
239257
case rabbit_feature_flags:is_enabled(stream_update_config_command) of
240258
true ->
241-
%% there are the only two configuration keys that are safe to
259+
%% there are the only a few configuration keys that are safe to
242260
%% update
243-
Conf = maps:with([filter_size,
244-
retention],
245-
rabbit_stream_queue:update_stream_conf(Q, #{})),
246-
process_command({update_config, StreamId, Conf});
261+
StreamId = maps:get(name, amqqueue:get_type_state(Q)),
262+
case maps:with([filter_size,
263+
retention,
264+
writer_mod,
265+
replica_mod], Config) of
266+
Conf when map_size(Conf) > 0 ->
267+
process_command({update_config, StreamId, Conf});
268+
_ ->
269+
{error, no_updatable_keys}
270+
end;
247271
false ->
248-
process_command({policy_changed, StreamId, #{queue => Q}})
272+
{error, feature_not_enabled}
249273
end.
250274

251275
sac_state(#?MODULE{single_active_consumer = SacState}) ->
@@ -1016,7 +1040,7 @@ phase_start_replica(StreamId, #{epoch := Epoch,
10161040
{error, already_present} ->
10171041
%% need to remove child record if this is the case
10181042
%% can it ever happen?
1019-
_ = osiris_replica:stop(Node, Conf0),
1043+
_ = osiris:stop_member(Node, Conf0),
10201044
send_action_failed(StreamId, starting, Args);
10211045
{error, {already_started, Pid}} ->
10221046
%% TODO: we need to check that the current epoch is the same
@@ -1073,7 +1097,7 @@ phase_delete_member(StreamId, #{node := Node} = Arg, Conf) ->
10731097

10741098
phase_stop_member(StreamId, #{node := Node, epoch := Epoch} = Arg0, Conf) ->
10751099
fun() ->
1076-
try osiris:stop_member(Node, Conf) of
1100+
try osiris_member:stop(Node, Conf) of
10771101
ok ->
10781102
%% get tail
10791103
try get_replica_tail(Node, Conf) of
@@ -1092,13 +1116,7 @@ phase_stop_member(StreamId, #{node := Node, epoch := Epoch} = Arg0, Conf) ->
10921116
[?MODULE, StreamId, Node, Epoch, Err]),
10931117
maybe_sleep(Err),
10941118
send_action_failed(StreamId, stopping, Arg0)
1095-
end;
1096-
Err ->
1097-
rabbit_log:warning("~ts: failed to stop "
1098-
"member ~ts ~w Error: ~w",
1099-
[?MODULE, StreamId, Node, Err]),
1100-
maybe_sleep(Err),
1101-
send_action_failed(StreamId, stopping, Arg0)
1119+
end
11021120
catch _:Err ->
11031121
rabbit_log:warning("~ts: failed to stop member ~ts ~w Error: ~w",
11041122
[?MODULE, StreamId, Node, Err]),

0 commit comments

Comments
 (0)