46
46
filter_quorum_critical /1 , filter_quorum_critical /2 ,
47
47
all_replica_states /0 ]).
48
48
-export ([is_policy_applicable /2 ]).
49
+ -export ([repair_amqqueue_nodes /1 ,
50
+ repair_amqqueue_nodes /2
51
+ ]).
49
52
50
53
-include_lib (" stdlib/include/qlc.hrl" ).
51
54
-include (" rabbit.hrl" ).
@@ -376,7 +379,38 @@ repair_leader_record(QName, Self) ->
376
379
end ,
377
380
ok .
378
381
379
-
382
+ repair_amqqueue_nodes (VHost , QueueName ) ->
383
+ QName = # resource {virtual_host = VHost , name = QueueName , kind = queue },
384
+ repair_amqqueue_nodes (QName ).
385
+
386
+ -spec repair_amqqueue_nodes (rabbit_types :r ('queue' ) | amqqueue :amqqueue ()) ->
387
+ ok | repaired .
388
+ repair_amqqueue_nodes (QName = # resource {}) ->
389
+ {ok , Q0 } = rabbit_amqqueue :lookup (QName ),
390
+ repair_amqqueue_nodes (Q0 );
391
+ repair_amqqueue_nodes (Q0 ) ->
392
+ QName = amqqueue :get_name (Q0 ),
393
+ Leader = amqqueue :get_pid (Q0 ),
394
+ {ok , Members , _ } = ra :members (Leader ),
395
+ RaNodes = [N || {_ , N } <- Members ],
396
+ #{nodes := Nodes } = amqqueue :get_type_state (Q0 ),
397
+ case lists :sort (RaNodes ) =:= lists :sort (Nodes ) of
398
+ true ->
399
+ % % up to date
400
+ ok ;
401
+ false ->
402
+ % % update amqqueue record
403
+ Fun = fun (Q ) ->
404
+ TS0 = amqqueue :get_type_state (Q ),
405
+ TS = TS0 #{nodes => RaNodes },
406
+ amqqueue :set_type_state (Q , TS )
407
+ end ,
408
+ rabbit_misc :execute_mnesia_transaction (
409
+ fun () ->
410
+ rabbit_amqqueue :update (QName , Fun )
411
+ end ),
412
+ repaired
413
+ end .
380
414
381
415
reductions (Name ) ->
382
416
try
@@ -899,8 +933,8 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
899
933
% % deleting the last member is not allowed
900
934
{error , last_node };
901
935
Members ->
902
- case ra :leave_and_delete_server (Members , ServerId ) of
903
- ok ->
936
+ case ra :remove_member (Members , ServerId ) of
937
+ { ok , _ , _Leader } ->
904
938
Fun = fun (Q1 ) ->
905
939
update_type_state (
906
940
Q1 ,
@@ -910,8 +944,15 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
910
944
end ,
911
945
rabbit_misc :execute_mnesia_transaction (
912
946
fun () -> rabbit_amqqueue :update (QName , Fun ) end ),
913
- ok ;
914
- timeout ->
947
+ case ra :force_delete_server (ServerId ) of
948
+ ok ->
949
+ ok ;
950
+ {error , _ } = Err ->
951
+ Err ;
952
+ Err ->
953
+ {error , Err }
954
+ end ;
955
+ {timeout , _ } ->
915
956
{error , timeout };
916
957
E ->
917
958
E
0 commit comments