Skip to content

Commit d40110b

Browse files
Merge pull request #11407 from rabbitmq/mk-make-sure-virtual-hosts-are-started-on-all-nodes-after-definition-import
Periodically reconcile virtual host processes
2 parents 2b39724 + c03d90f commit d40110b

File tree

13 files changed

+372
-30
lines changed

13 files changed

+372
-30
lines changed

.github/workflows/templates/test.template.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ jobs:
6969
run: |
7070
echo "value=bazel-repo-cache-${{ hashFiles('MODULE.bazel') }}" | tee -a $GITHUB_OUTPUT
7171
- name: AUTHENTICATE TO GOOGLE CLOUD
72-
uses: google-github-actions/[email protected].2
72+
uses: google-github-actions/[email protected].3
7373
with:
7474
credentials_json: ${{ secrets.REMOTE_CACHE_CREDENTIALS_JSON }}
7575
- name: REPO CACHE

deps/rabbit/BUILD.bazel

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,11 @@ _APP_ENV = """[
142142
{dead_letter_worker_publisher_confirm_timeout, 180000},
143143
144144
%% EOL date for the current release series, if known/announced
145-
{release_series_eol_date, none}
145+
{release_series_eol_date, none},
146+
147+
{vhost_process_reconciliation_run_interval, 30},
148+
%% for testing
149+
{vhost_process_reconciliation_enabled, true}
146150
]
147151
"""
148152

deps/rabbit/Makefile

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,12 +119,15 @@ define PROJECT_ENV
119119
%% interval at which connection/channel tracking executes post operations
120120
{tracking_execution_timeout, 15000},
121121
{stream_messages_soft_limit, 256},
122-
{track_auth_attempt_source, false},
123-
{credentials_obfuscation_fallback_secret, <<"nocookie">>},
124-
{dead_letter_worker_consumer_prefetch, 32},
125-
{dead_letter_worker_publisher_confirm_timeout, 180000},
126-
%% EOL date for the current release series, if known/announced
127-
{release_series_eol_date, none}
122+
{track_auth_attempt_source, false},
123+
{credentials_obfuscation_fallback_secret, <<"nocookie">>},
124+
{dead_letter_worker_consumer_prefetch, 32},
125+
{dead_letter_worker_publisher_confirm_timeout, 180000},
126+
%% EOL date for the current release series, if known/announced
127+
{release_series_eol_date, none},
128+
{vhost_process_reconciliation_run_interval, 30},
129+
%% for testing
130+
{vhost_process_reconciliation_enabled, true}
128131
]
129132
endef
130133

deps/rabbit/app.bzl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ def all_beam_files(name = "all_beam_files"):
234234
"src/rabbit_vhost_sup.erl",
235235
"src/rabbit_vhost_sup_sup.erl",
236236
"src/rabbit_vhost_sup_wrapper.erl",
237+
"src/rabbit_vhosts.erl",
237238
"src/rabbit_vm.erl",
238239
"src/supervised_lifecycle.erl",
239240
"src/tcp_listener.erl",
@@ -493,6 +494,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
493494
"src/rabbit_vhost_sup.erl",
494495
"src/rabbit_vhost_sup_sup.erl",
495496
"src/rabbit_vhost_sup_wrapper.erl",
497+
"src/rabbit_vhosts.erl",
496498
"src/rabbit_vm.erl",
497499
"src/supervised_lifecycle.erl",
498500
"src/tcp_listener.erl",
@@ -775,6 +777,7 @@ def all_srcs(name = "all_srcs"):
775777
"src/rabbit_vhost_sup.erl",
776778
"src/rabbit_vhost_sup_sup.erl",
777779
"src/rabbit_vhost_sup_wrapper.erl",
780+
"src/rabbit_vhosts.erl",
778781
"src/rabbit_vm.erl",
779782
"src/supervised_lifecycle.erl",
780783
"src/tcp_listener.erl",

deps/rabbit/src/rabbit.erl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,15 @@
267267
{mfa, {logger, debug, ["'networking' boot step skipped and moved to end of startup", [], #{domain => ?RMQLOG_DOMAIN_GLOBAL}]}},
268268
{requires, notify_cluster}]}).
269269

270+
%% This mechanism is necessary in environments where a cluster is formed in parallel,
271+
%% which is the case with many container orchestration tools.
272+
%% In such scenarios, a virtual host can be declared before the cluster is formed and all
273+
%% cluster members are known, e.g. via definition import.
274+
-rabbit_boot_step({virtual_host_reconciliation,
275+
[{description, "makes sure all virtual host have running processes on all nodes"},
276+
{mfa, {rabbit_vhosts, boot, []}},
277+
{requires, notify_cluster}]}).
278+
270279
-rabbit_boot_step({pg_local,
271280
[{description, "local-only pg scope"},
272281
{mfa, {rabbit, pg_local, []}},

deps/rabbit/src/rabbit_node_monitor.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -897,6 +897,7 @@ handle_live_rabbit(Node) ->
897897
true -> ok;
898898
false -> on_node_up_using_mnesia(Node)
899899
end,
900+
ok = rabbit_vhosts:on_node_up(Node),
900901
ok = rabbit_quorum_queue_periodic_membership_reconciliation:on_node_up(Node).
901902

902903
on_node_up_using_mnesia(Node) ->

deps/rabbit/src/rabbit_vhosts.erl

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2024 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
%% This module exists to avoid circular module dependencies between
9+
%% several others virtual hosts-related modules.
10+
-module(rabbit_vhosts).
11+
12+
-define(PERSISTENT_TERM_COUNTER_KEY, rabbit_vhosts_reconciliation_run_counter).
13+
14+
%% API
15+
16+
-export([
17+
list_names/0,
18+
exists/1,
19+
boot/0,
20+
reconcile/0,
21+
reconcile_once/0,
22+
is_reconciliation_enabled/0,
23+
disable_reconciliation/0,
24+
enable_reconciliation/0,
25+
start_processes_for_all/0,
26+
start_on_all_nodes/2,
27+
on_node_up/1
28+
]).
29+
30+
%% Same as rabbit_vhost:exists/1.
31+
-spec exists(vhost:name()) -> boolean().
32+
exists(VirtualHost) ->
33+
rabbit_db_vhost:exists(VirtualHost).
34+
35+
%% Same as rabbit_vhost:list_names/0.
36+
-spec list_names() -> [vhost:name()].
37+
list_names() -> rabbit_db_vhost:list().
38+
39+
-spec boot() -> 'ok'.
40+
boot() ->
41+
_ = start_processes_for_all(),
42+
_ = increment_run_counter(),
43+
_ = case is_reconciliation_enabled() of
44+
false -> ok;
45+
true -> maybe_start_timer(reconcile)
46+
end,
47+
ok.
48+
49+
%% Performs a round of virtual host process reconciliation and sets up a timer to
50+
%% re-run this operation again unless it has been run 10 or more times since cluster boot.
51+
%% See start_processes_for_all/1.
52+
-spec reconcile() -> 'ok'.
53+
reconcile() ->
54+
case is_reconciliation_enabled() of
55+
false -> ok;
56+
true ->
57+
_ = reconcile_once(),
58+
_ = maybe_start_timer(?FUNCTION_NAME),
59+
ok
60+
end.
61+
62+
%% Performs a round of virtual host process reconciliation but does not schedule any future runs.
63+
%% See start_processes_for_all/1.
64+
-spec reconcile_once() -> 'ok'.
65+
reconcile_once() ->
66+
rabbit_log:debug("Will reconcile virtual host processes on all cluster members..."),
67+
_ = start_processes_for_all(),
68+
_ = increment_run_counter(),
69+
N = get_run_counter(),
70+
rabbit_log:debug("Done with virtual host processes reconciliation (run ~tp)", [N]),
71+
ok.
72+
73+
-spec on_node_up(Node :: node()) -> 'ok'.
74+
on_node_up(_Node) ->
75+
case is_reconciliation_enabled() of
76+
false -> ok;
77+
true ->
78+
DelayInSeconds = 10,
79+
Delay = DelayInSeconds * 1000,
80+
rabbit_log:debug("Will reschedule virtual host process reconciliation after ~b seconds", [DelayInSeconds]),
81+
_ = timer:apply_after(Delay, ?MODULE, reconcile_once, []),
82+
ok
83+
end.
84+
85+
-spec is_reconciliation_enabled() -> boolean().
86+
is_reconciliation_enabled() ->
87+
application:get_env(rabbit, vhost_process_reconciliation_enabled, true).
88+
89+
-spec enable_reconciliation() -> 'ok'.
90+
enable_reconciliation() ->
91+
%% reset the auto-stop counter
92+
persistent_term:put(?PERSISTENT_TERM_COUNTER_KEY, 0),
93+
application:set_env(rabbit, vhost_process_reconciliation_enabled, true).
94+
95+
-spec disable_reconciliation() -> 'ok'.
96+
disable_reconciliation() ->
97+
application:set_env(rabbit, vhost_process_reconciliation_enabled, false).
98+
99+
-spec reconciliation_interval() -> non_neg_integer().
100+
reconciliation_interval() ->
101+
application:get_env(rabbit, vhost_process_reconciliation_run_interval, 30).
102+
103+
%% Starts a virtual host process on every specified nodes.
104+
%% Only exists to allow for "virtual host process repair"
105+
%% in clusters where nodes a booted in parallel and seeded
106+
%% (e.g. using definitions) at the same time.
107+
%%
108+
%% In that case, during virtual host insertion into the schema database,
109+
%% some processes predictably won't be started on the yet-to-be-discovered nodes.
110+
-spec start_processes_for_all([node()]) -> 'ok'.
111+
start_processes_for_all(Nodes) ->
112+
Names = list_names(),
113+
N = length(Names),
114+
rabbit_log:debug("Will make sure that processes of ~p virtual hosts are running on all reachable cluster nodes", [N]),
115+
[begin
116+
try
117+
start_on_all_nodes(VH, Nodes)
118+
catch
119+
_:Err:_Stacktrace ->
120+
rabbit_log:error("Could not reconcile virtual host ~ts: ~tp", [VH, Err])
121+
end
122+
end || VH <- Names],
123+
ok.
124+
125+
-spec start_processes_for_all() -> 'ok'.
126+
start_processes_for_all() ->
127+
start_processes_for_all(rabbit_nodes:list_reachable()).
128+
129+
%% Same as rabbit_vhost_sup_sup:start_on_all_nodes/0.
130+
-spec start_on_all_nodes(vhost:name(), [node()]) -> 'ok'.
131+
start_on_all_nodes(VirtualHost, Nodes) ->
132+
_ = rabbit_vhost_sup_sup:start_on_all_nodes(VirtualHost, Nodes),
133+
ok.
134+
135+
%%
136+
%% Implementation
137+
%%
138+
139+
-spec get_run_counter() -> non_neg_integer().
140+
get_run_counter() ->
141+
persistent_term:get(?PERSISTENT_TERM_COUNTER_KEY, 0).
142+
143+
-spec increment_run_counter() -> non_neg_integer().
144+
increment_run_counter() ->
145+
N = get_run_counter(),
146+
persistent_term:put(?PERSISTENT_TERM_COUNTER_KEY, N + 1),
147+
N.
148+
149+
-spec maybe_start_timer(atom()) -> ok | {ok, timer:tref()} | {error, any()}.
150+
maybe_start_timer(FunName) ->
151+
N = get_run_counter(),
152+
DelayInSeconds = reconciliation_interval(),
153+
case N >= 10 of
154+
true ->
155+
%% Stop after ten runs
156+
rabbit_log:debug("Will stop virtual host process reconciliation after ~tp runs", [N]),
157+
ok;
158+
false ->
159+
case is_reconciliation_enabled() of
160+
false -> ok;
161+
true ->
162+
Delay = DelayInSeconds * 1000,
163+
rabbit_log:debug("Will reschedule virtual host process reconciliation after ~b seconds", [DelayInSeconds]),
164+
timer:apply_after(Delay, ?MODULE, FunName, [])
165+
end
166+
end.

deps/rabbit/test/rabbitmqctl_integration_SUITE.erl

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ create_n_node_cluster(Config0, NumNodes) ->
6767
Config1 = rabbit_ct_helpers:set_config(
6868
Config0, [{rmq_nodes_count, NumNodes},
6969
{rmq_nodes_clustered, true}]),
70-
rabbit_ct_helpers:run_steps(Config1,
70+
Config2 = rabbit_ct_helpers:merge_app_env(
71+
Config1, {rabbit, [
72+
{vhost_process_reconciliation_enabled, false}
73+
]}),
74+
rabbit_ct_helpers:run_steps(Config2,
7175
rabbit_ct_broker_helpers:setup_steps() ++
7276
rabbit_ct_client_helpers:setup_steps()).
7377

@@ -100,9 +104,12 @@ end_per_group(_, Config) ->
100104
Config.
101105

102106
init_per_testcase(list_queues_stopped, Config0) ->
103-
%% Start node 3 to crash it's queues
107+
%% Start node 3 to kill a few virtual hosts on it
104108
rabbit_ct_broker_helpers:start_node(Config0, 2),
105-
%% Make vhost "down" on nodes 2 and 3
109+
%% Disable virtual host reconciliation
110+
rabbit_ct_broker_helpers:rpc(Config0, 1, rabbit_vhosts, disable_reconciliation, []),
111+
rabbit_ct_broker_helpers:rpc(Config0, 2, rabbit_vhosts, disable_reconciliation, []),
112+
%% Terminate default virtual host's processes on nodes 2 and 3
106113
ok = rabbit_ct_broker_helpers:force_vhost_failure(Config0, 1, <<"/">>),
107114
ok = rabbit_ct_broker_helpers:force_vhost_failure(Config0, 2, <<"/">>),
108115

@@ -118,6 +125,7 @@ end_per_testcase(Testcase, Config0) ->
118125
%%----------------------------------------------------------------------------
119126
%% Test cases
120127
%%----------------------------------------------------------------------------
128+
121129
list_queues_local(Config) ->
122130
Node1Queues = lists:nth(1, ?config(per_node_queues, Config)),
123131
Node2Queues = lists:nth(2, ?config(per_node_queues, Config)),
@@ -141,23 +149,13 @@ list_queues_offline(Config) ->
141149
ok.
142150

143151
list_queues_stopped(Config) ->
144-
Node1Queues = lists:nth(1, ?config(per_node_queues, Config)),
145-
Node2Queues = lists:nth(2, ?config(per_node_queues, Config)),
146-
Node3Queues = lists:nth(3, ?config(per_node_queues, Config)),
147-
148-
Expected =
149-
lists:sort([ {Q, <<"running">>} || Q <- Node1Queues ] ++
150-
%% Node is running. Vhost is down
151-
[ {Q, <<"stopped">>} || Q <- Node2Queues ] ++
152-
%% Node is not running. Vhost is down
153-
[ {Q, <<"down">>} || Q <- Node3Queues ]),
154-
155-
?awaitMatch(
156-
Expected,
157-
lists:sort(
158-
[ {Name, State}
159-
|| [Name, State] <- rabbit_ct_broker_helpers:rabbitmqctl_list(Config, 0, ["list_queues", "name", "state", "--no-table-headers"]) ]),
160-
30_000).
152+
rabbit_ct_helpers:await_condition(fun() ->
153+
Listed = rabbit_ct_broker_helpers:rabbitmqctl_list(Config, 0, ["list_queues", "name", "state", "--no-table-headers"]),
154+
%% We expect some queue replicas to be reported as running, some as down and some as stopped,
155+
%% and that CLI tools are capable of handling and formatting such rows. MK.
156+
ReplicaStates = lists:usort([State|| [_Name, State] <- Listed]),
157+
ReplicaStates =:= [<<"down">>, <<"running">>, <<"stopped">>]
158+
end, 30_000).
161159

162160
%%----------------------------------------------------------------------------
163161
%% Helpers

deps/rabbitmq_auth_backend_http/BUILD.bazel

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,9 @@ all_srcs(name = "all_srcs")
4343

4444
test_suite_beam_files(name = "test_suite_beam_files")
4545

46+
# gazelle:erlang_app_extra_app crypto
4647
# gazelle:erlang_app_extra_app inets
48+
# gazelle:erlang_app_extra_app ssl
4749
# gazelle:erlang_app_extra_app public_key
4850
# gazelle:erlang_app_dep rabbit
4951

@@ -106,7 +108,7 @@ rabbitmq_integration_suite(
106108
"test/auth_http_mock.beam",
107109
],
108110
deps = [
109-
"@cowboy//:erlang_app"
111+
"@cowboy//:erlang_app",
110112
],
111113
)
112114

0 commit comments

Comments
 (0)