20
20
tick /2 ,
21
21
version /0 ,
22
22
which_module /1 ,
23
- overview /1 ,
23
+ overview /1 ]).
24
+
25
+ -export ([update_config /2 ,
24
26
policy_changed /1 ]).
25
27
26
28
% % coordinator API
@@ -236,16 +238,38 @@ delete_replica(StreamId, Node) ->
236
238
237
239
policy_changed (Q ) when ? is_amqqueue (Q ) ->
238
240
StreamId = maps :get (name , amqqueue :get_type_state (Q )),
241
+ Config = rabbit_stream_queue :update_stream_conf (Q , #{}),
242
+ case update_config (Q , Config ) of
243
+ {ok , ok , _ } = Res ->
244
+ Res ;
245
+ {error , feature_not_enabled } ->
246
+ % % backwards compatibility
247
+ % % TODO: remove in future
248
+ process_command ({policy_changed , StreamId , #{queue => Q }});
249
+ Err ->
250
+ Err
251
+ end .
252
+
253
+ -spec update_config (amqqueue :amqqueue (), #{atom () => term ()}) ->
254
+ {ok , ok , ra :server_id ()} | {error , not_supported | term ()}.
255
+ update_config (Q , Config )
256
+ when ? is_amqqueue (Q ) andalso is_map (Config ) ->
239
257
case rabbit_feature_flags :is_enabled (stream_update_config_command ) of
240
258
true ->
241
- % % there are the only two configuration keys that are safe to
259
+ % % there are the only a few configuration keys that are safe to
242
260
% % update
243
- Conf = maps :with ([filter_size ,
244
- retention ],
245
- rabbit_stream_queue :update_stream_conf (Q , #{})),
246
- process_command ({update_config , StreamId , Conf });
261
+ StreamId = maps :get (name , amqqueue :get_type_state (Q )),
262
+ case maps :with ([filter_size ,
263
+ retention ,
264
+ writer_mod ,
265
+ replica_mod ], Config ) of
266
+ Conf when map_size (Conf ) > 0 ->
267
+ process_command ({update_config , StreamId , Conf });
268
+ _ ->
269
+ {error , no_updatable_keys }
270
+ end ;
247
271
false ->
248
- process_command ({ policy_changed , StreamId , #{ queue => Q }})
272
+ { error , feature_not_enabled }
249
273
end .
250
274
251
275
sac_state (#? MODULE {single_active_consumer = SacState }) ->
@@ -1016,7 +1040,7 @@ phase_start_replica(StreamId, #{epoch := Epoch,
1016
1040
{error , already_present } ->
1017
1041
% % need to remove child record if this is the case
1018
1042
% % can it ever happen?
1019
- _ = osiris_replica : stop (Node , Conf0 ),
1043
+ _ = osiris : stop_member (Node , Conf0 ),
1020
1044
send_action_failed (StreamId , starting , Args );
1021
1045
{error , {already_started , Pid }} ->
1022
1046
% % TODO: we need to check that the current epoch is the same
@@ -1073,7 +1097,7 @@ phase_delete_member(StreamId, #{node := Node} = Arg, Conf) ->
1073
1097
1074
1098
phase_stop_member (StreamId , #{node := Node , epoch := Epoch } = Arg0 , Conf ) ->
1075
1099
fun () ->
1076
- try osiris : stop_member (Node , Conf ) of
1100
+ try osiris_member : stop (Node , Conf ) of
1077
1101
ok ->
1078
1102
% % get tail
1079
1103
try get_replica_tail (Node , Conf ) of
@@ -1092,13 +1116,7 @@ phase_stop_member(StreamId, #{node := Node, epoch := Epoch} = Arg0, Conf) ->
1092
1116
[? MODULE , StreamId , Node , Epoch , Err ]),
1093
1117
maybe_sleep (Err ),
1094
1118
send_action_failed (StreamId , stopping , Arg0 )
1095
- end ;
1096
- Err ->
1097
- rabbit_log :warning (" ~ts : failed to stop "
1098
- " member ~ts ~w Error: ~w " ,
1099
- [? MODULE , StreamId , Node , Err ]),
1100
- maybe_sleep (Err ),
1101
- send_action_failed (StreamId , stopping , Arg0 )
1119
+ end
1102
1120
catch _ :Err ->
1103
1121
rabbit_log :warning (" ~ts : failed to stop member ~ts ~w Error: ~w " ,
1104
1122
[? MODULE , StreamId , Node , Err ]),
0 commit comments