|
17 | 17 | handle_event/3]).
|
18 | 18 | -export([is_recoverable/1,
|
19 | 19 | recover/2,
|
| 20 | + system_recover/1, |
20 | 21 | stop/1,
|
21 | 22 | start_server/1,
|
22 | 23 | restart_server/1,
|
|
97 | 98 | -define(RA_SYSTEM, quorum_queues).
|
98 | 99 | -define(RA_WAL_NAME, ra_log_wal).
|
99 | 100 |
|
| 101 | +-define(INFO(Str, Args), |
| 102 | + rabbit_log:info("[~s:~s/~b] " Str, |
| 103 | + [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY | Args])). |
| 104 | +-define(WARN(Str, Args), |
| 105 | + rabbit_log:warning("[~s:~s/~b] " Str, |
| 106 | + [?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY | Args])). |
| 107 | + |
| 108 | + |
100 | 109 | -define(STATISTICS_KEYS,
|
101 | 110 | [policy,
|
102 | 111 | operator_policy,
|
@@ -641,6 +650,21 @@ is_recoverable(Q) when ?is_amqqueue(Q) and ?amqqueue_is_quorum(Q) ->
|
641 | 650 | Nodes = get_nodes(Q),
|
642 | 651 | lists:member(Node, Nodes).
|
643 | 652 |
|
| 653 | +system_recover(quorum_queues) -> |
| 654 | + case rabbit:is_booted() of |
| 655 | + true -> |
| 656 | + Queues = rabbit_amqqueue:list_local_quorum_queues(), |
| 657 | + ?INFO("recovering ~b queues", [length(Queues)]), |
| 658 | + {Recovered, Failed} = recover(<<>>, Queues), |
| 659 | + ?INFO("recovered ~b queues, " |
| 660 | + "failed to recover ~b queues", |
| 661 | + [length(Recovered), length(Failed)]), |
| 662 | + ok; |
| 663 | + false -> |
| 664 | + ?INFO("rabbit not booted, skipping queue recovery", []), |
| 665 | + ok |
| 666 | + end. |
| 667 | + |
644 | 668 | -spec recover(binary(), [amqqueue:amqqueue()]) ->
|
645 | 669 | {[amqqueue:amqqueue()], [amqqueue:amqqueue()]}.
|
646 | 670 | recover(_Vhost, Queues) ->
|
|
0 commit comments