Skip to content

Commit 6e7d34d

Browse files
Merge pull request #11820 from rabbitmq/mergify/bp/v4.0.x/pr-11809
QQ: use a dedicated function for queue recovery after Ra system restart. (backport #11809)
2 parents cba540b + 216f2e8 commit 6e7d34d

File tree

2 files changed

+23
-1
lines changed

2 files changed

+23
-1
lines changed

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
handle_event/3]).
1818
-export([is_recoverable/1,
1919
recover/2,
20+
system_recover/1,
2021
stop/1,
2122
start_server/1,
2223
restart_server/1,
@@ -97,6 +98,11 @@
9798
-define(RA_SYSTEM, quorum_queues).
9899
-define(RA_WAL_NAME, ra_log_wal).
99100

101+
-define(INFO(Str, Args),
102+
rabbit_log:info("[~s:~s/~b] " Str,
103+
[?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY | Args])).
104+
105+
100106
-define(STATISTICS_KEYS,
101107
[policy,
102108
operator_policy,
@@ -641,6 +647,21 @@ is_recoverable(Q) when ?is_amqqueue(Q) and ?amqqueue_is_quorum(Q) ->
641647
Nodes = get_nodes(Q),
642648
lists:member(Node, Nodes).
643649

650+
system_recover(quorum_queues) ->
651+
case rabbit:is_booted() of
652+
true ->
653+
Queues = rabbit_amqqueue:list_local_quorum_queues(),
654+
?INFO("recovering ~b queues", [length(Queues)]),
655+
{Recovered, Failed} = recover(<<>>, Queues),
656+
?INFO("recovered ~b queues, "
657+
"failed to recover ~b queues",
658+
[length(Recovered), length(Failed)]),
659+
ok;
660+
false ->
661+
?INFO("rabbit not booted, skipping queue recovery", []),
662+
ok
663+
end.
664+
644665
-spec recover(binary(), [amqqueue:amqqueue()]) ->
645666
{[amqqueue:amqqueue()], [amqqueue:amqqueue()]}.
646667
recover(_Vhost, Queues) ->

deps/rabbit/src/rabbit_ra_systems.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ get_config(quorum_queues = RaSystem) ->
130130
wal_max_entries => WalMaxEntries,
131131
segment_compute_checksums => SegmentChecksums,
132132
compress_mem_tables => CompressMemTables,
133-
server_recovery_strategy => registered};
133+
server_recovery_strategy => {rabbit_quorum_queue,
134+
system_recover, []}};
134135
get_config(coordination = RaSystem) ->
135136
DefaultConfig = get_default_config(),
136137
CoordDataDir = filename:join(

0 commit comments

Comments
 (0)