Skip to content

Commit eb24d9d

Browse files
Merge pull request #10636 from rabbitmq/mergify/bp/v3.13.x/pr-10569
Configurable writer module (backport #10569)
2 parents 2233d65 + 24997d5 commit eb24d9d

File tree

3 files changed

+40
-25
lines changed

3 files changed

+40
-25
lines changed

MODULE.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ bazel_dep(
4343

4444
bazel_dep(
4545
name = "rabbitmq_osiris",
46-
version = "1.7.2",
46+
version = "1.8.0",
4747
repo_name = "osiris",
4848
)
4949

deps/rabbit/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers amqp_client meck prop
139139
PLT_APPS += mnesia
140140

141141
dep_syslog = git https://github.com/schlagert/syslog 4.0.0
142-
dep_osiris = git https://github.com/rabbitmq/osiris v1.7.2
142+
dep_osiris = git https://github.com/rabbitmq/osiris v1.8.0
143143
dep_systemd = hex 0.6.1
144144

145145
dep_seshat = git https://github.com/rabbitmq/seshat v0.6.1

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
tick/2,
2121
version/0,
2222
which_module/1,
23-
overview/1,
23+
overview/1]).
24+
25+
-export([update_config/2,
2426
policy_changed/1]).
2527

2628
%% coordinator API
@@ -236,16 +238,38 @@ delete_replica(StreamId, Node) ->
236238

237239
policy_changed(Q) when ?is_amqqueue(Q) ->
238240
StreamId = maps:get(name, amqqueue:get_type_state(Q)),
241+
Config = rabbit_stream_queue:update_stream_conf(Q, #{}),
242+
case update_config(Q, Config) of
243+
{ok, ok, _} = Res ->
244+
Res;
245+
{error, feature_not_enabled} ->
246+
%% backwards compatibility
247+
%% TODO: remove in future
248+
process_command({policy_changed, StreamId, #{queue => Q}});
249+
Err ->
250+
Err
251+
end.
252+
253+
-spec update_config(amqqueue:amqqueue(), #{atom() => term()}) ->
254+
{ok, ok, ra:server_id()} | {error, not_supported | term()}.
255+
update_config(Q, Config)
256+
when ?is_amqqueue(Q) andalso is_map(Config) ->
239257
case rabbit_feature_flags:is_enabled(stream_update_config_command) of
240258
true ->
241-
%% there are the only two configuration keys that are safe to
259+
%% there are the only a few configuration keys that are safe to
242260
%% update
243-
Conf = maps:with([filter_size,
244-
retention],
245-
rabbit_stream_queue:update_stream_conf(Q, #{})),
246-
process_command({update_config, StreamId, Conf});
261+
StreamId = maps:get(name, amqqueue:get_type_state(Q)),
262+
case maps:with([filter_size,
263+
retention,
264+
writer_mod,
265+
replica_mod], Config) of
266+
Conf when map_size(Conf) > 0 ->
267+
process_command({update_config, StreamId, Conf});
268+
_ ->
269+
{error, no_updatable_keys}
270+
end;
247271
false ->
248-
process_command({policy_changed, StreamId, #{queue => Q}})
272+
{error, feature_not_enabled}
249273
end.
250274

251275
sac_state(#?MODULE{single_active_consumer = SacState}) ->
@@ -1016,7 +1040,7 @@ phase_start_replica(StreamId, #{epoch := Epoch,
10161040
{error, already_present} ->
10171041
%% need to remove child record if this is the case
10181042
%% can it ever happen?
1019-
_ = osiris_replica:stop(Node, Conf0),
1043+
_ = osiris:stop_member(Node, Conf0),
10201044
send_action_failed(StreamId, starting, Args);
10211045
{error, {already_started, Pid}} ->
10221046
%% TODO: we need to check that the current epoch is the same
@@ -1049,7 +1073,7 @@ phase_delete_member(StreamId, #{node := Node} = Arg, Conf) ->
10491073
fun() ->
10501074
case rabbit_nodes:is_member(Node) of
10511075
true ->
1052-
try osiris_server_sup:delete_child(Node, Conf) of
1076+
try osiris:delete_member(Node, Conf) of
10531077
ok ->
10541078
rabbit_log:info("~ts: Member deleted for ~ts : on node ~ts",
10551079
[?MODULE, StreamId, Node]),
@@ -1071,10 +1095,9 @@ phase_delete_member(StreamId, #{node := Node} = Arg, Conf) ->
10711095
end
10721096
end.
10731097

1074-
phase_stop_member(StreamId, #{node := Node,
1075-
epoch := Epoch} = Arg0, Conf) ->
1098+
phase_stop_member(StreamId, #{node := Node, epoch := Epoch} = Arg0, Conf) ->
10761099
fun() ->
1077-
try osiris_server_sup:stop_child(Node, StreamId) of
1100+
try osiris_member:stop(Node, Conf) of
10781101
ok ->
10791102
%% get tail
10801103
try get_replica_tail(Node, Conf) of
@@ -1093,13 +1116,7 @@ phase_stop_member(StreamId, #{node := Node,
10931116
[?MODULE, StreamId, Node, Epoch, Err]),
10941117
maybe_sleep(Err),
10951118
send_action_failed(StreamId, stopping, Arg0)
1096-
end;
1097-
Err ->
1098-
rabbit_log:warning("~ts: failed to stop "
1099-
"member ~ts ~w Error: ~w",
1100-
[?MODULE, StreamId, Node, Err]),
1101-
maybe_sleep(Err),
1102-
send_action_failed(StreamId, stopping, Arg0)
1119+
end
11031120
catch _:Err ->
11041121
rabbit_log:warning("~ts: failed to stop member ~ts ~w Error: ~w",
11051122
[?MODULE, StreamId, Node, Err]),
@@ -1108,10 +1125,9 @@ phase_stop_member(StreamId, #{node := Node,
11081125
end
11091126
end.
11101127

1111-
phase_start_writer(StreamId, #{epoch := Epoch,
1112-
node := Node} = Args0, Conf) ->
1128+
phase_start_writer(StreamId, #{epoch := Epoch, node := Node} = Args0, Conf) ->
11131129
fun() ->
1114-
try osiris_writer:start(Conf) of
1130+
try osiris:start_writer(Conf) of
11151131
{ok, Pid} ->
11161132
Args = Args0#{epoch => Epoch, pid => Pid},
11171133
rabbit_log:info("~ts: started writer ~ts on ~w in ~b",
@@ -2019,7 +2035,6 @@ make_writer_conf(Node, #stream{epoch = Epoch,
20192035
replica_nodes => lists:delete(Node, Nodes),
20202036
epoch => Epoch}.
20212037

2022-
20232038
find_leader(Members) ->
20242039
case lists:partition(
20252040
fun ({_, #member{target = deleted}}) ->

0 commit comments

Comments
 (0)