Skip to content

Commit 1257162

Browse files
Import definitions concurrently
…or rather, import some definition categories concurrently, namely users, virtual hosts, queues, exchanges and bindings. For some workloads this leads to a 20% to 70% reduction in definition import time. Note that for virtual hosts, most of the creation process steps cannot be made concurrent or significantly optimised without compromising the observed atomicity of HTTP API and CLI operations, so concurrent import both makes less of a difference and is the only realistic way of speeding up the process for virtual hosts. This introduces a dedicated work pool for import operations to avoid overloading the default pool, in particular on node boot when definitions can be imported concurrently with on disk data recovery steps which use the default pool heavily.
1 parent a636060 commit 1257162

File tree

2 files changed

+95
-26
lines changed

2 files changed

+95
-26
lines changed

src/rabbit.erl

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@
8888
{enables, worker_pool}]}).
8989

9090
-rabbit_boot_step({worker_pool,
91-
[{description, "worker pool"},
91+
[{description, "default worker pool"},
9292
{mfa, {rabbit_sup, start_supervisor_child,
9393
[worker_pool_sup]}},
9494
{requires, pre_boot},
@@ -229,6 +229,11 @@
229229
[{description, "ready to communicate with peers and clients"},
230230
{requires, [core_initialized, recovery, routing_ready]}]}).
231231

232+
-rabbit_boot_step({definition_import_worker_pool,
233+
[{description, "dedicated worker pool for definition import"},
234+
{mfa, {rabbit_definitions, boot, []}},
235+
{requires, pre_flight}]}).
236+
232237
-rabbit_boot_step({cluster_name,
233238
[{description, "sets cluster name if configured"},
234239
{mfa, {rabbit_nodes, boot, []}},

src/rabbit_definitions.erl

Lines changed: 89 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
-module(rabbit_definitions).
1818
-include_lib("rabbit_common/include/rabbit.hrl").
1919

20+
-export([boot/0]).
2021
%% automatic import on boot
2122
-export([maybe_load_definitions/0, maybe_load_definitions_from/2]).
2223
%% import
@@ -58,6 +59,12 @@
5859

5960
-export_type([definition_object/0, definition_list/0, definition_category/0, definitions/0]).
6061

62+
-define(IMPORT_WORK_POOL, definition_import_pool).
63+
64+
boot() ->
65+
PoolSize = application:get_env(rabbit, definition_import_work_pool_size, rabbit_runtime:guess_number_of_cpu_cores()),
66+
rabbit_sup:start_supervisor_child(definition_import_pool_sup, worker_pool_sup, [PoolSize, ?IMPORT_WORK_POOL]).
67+
6168
maybe_load_definitions() ->
6269
%% this feature was a part of rabbitmq-management for a long time,
6370
%% so we check rabbit_management.load_definitions for backward compatibility.
@@ -224,20 +231,22 @@ apply_defs(Map, ActingUser, VHost) when is_binary(VHost) ->
224231
apply_defs(Map, ActingUser, SuccessFun) when is_function(SuccessFun) ->
225232
Version = maps:get(rabbitmq_version, Map, maps:get(rabbit_version, Map, undefined)),
226233
try
227-
for_all(users, ActingUser, Map,
234+
concurrent_for_all(users, ActingUser, Map,
228235
fun(User, _Username) ->
229236
rabbit_auth_backend_internal:put_user(User, Version, ActingUser)
230237
end),
231-
for_all(vhosts, ActingUser, Map, fun add_vhost/2),
238+
concurrent_for_all(vhosts, ActingUser, Map, fun add_vhost/2),
232239
validate_limits(Map),
233-
for_all(permissions, ActingUser, Map, fun add_permission/2),
234-
for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
235-
for_all(parameters, ActingUser, Map, fun add_parameter/2),
236-
for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
237-
for_all(policies, ActingUser, Map, fun add_policy/2),
238-
for_all(queues, ActingUser, Map, fun add_queue/2),
239-
for_all(exchanges, ActingUser, Map, fun add_exchange/2),
240-
for_all(bindings, ActingUser, Map, fun add_binding/2),
240+
concurrent_for_all(permissions, ActingUser, Map, fun add_permission/2),
241+
concurrent_for_all(topic_permissions, ActingUser, Map, fun add_topic_permission/2),
242+
sequential_for_all(parameters, ActingUser, Map, fun add_parameter/2),
243+
sequential_for_all(global_parameters, ActingUser, Map, fun add_global_parameter/2),
244+
%% importing policies concurrently can be unsafe as queues will be getting
245+
%% potentially out of order notifications of applicable policy changes
246+
sequential_for_all(policies, ActingUser, Map, fun add_policy/2),
247+
concurrent_for_all(queues, ActingUser, Map, fun add_queue/2),
248+
concurrent_for_all(exchanges, ActingUser, Map, fun add_exchange/2),
249+
concurrent_for_all(bindings, ActingUser, Map, fun add_binding/2),
241250
SuccessFun(),
242251
ok
243252
catch {error, E} -> {error, E};
@@ -254,11 +263,13 @@ apply_defs(Map, ActingUser, SuccessFun, VHost) when is_binary(VHost) ->
254263
[VHost, ActingUser]),
255264
try
256265
validate_limits(Map, VHost),
257-
for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
258-
for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
259-
for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
260-
for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
261-
for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
266+
sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
267+
%% importing policies concurrently can be unsafe as queues will be getting
268+
%% potentially out of order notifications of applicable policy changes
269+
sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
270+
concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
271+
concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
272+
concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
262273
SuccessFun()
263274
catch {error, E} -> {error, format(E)};
264275
exit:E -> {error, format(E)}
@@ -275,17 +286,19 @@ apply_defs(Map, ActingUser, SuccessFun, ErrorFun, VHost) ->
275286
[VHost, ActingUser]),
276287
try
277288
validate_limits(Map, VHost),
278-
for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
279-
for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
280-
for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
281-
for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
282-
for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
289+
sequential_for_all(parameters, ActingUser, Map, VHost, fun add_parameter/3),
290+
%% importing policies concurrently can be unsafe as queues will be getting
291+
%% potentially out of order notifications of applicable policy changes
292+
sequential_for_all(policies, ActingUser, Map, VHost, fun add_policy/3),
293+
concurrent_for_all(queues, ActingUser, Map, VHost, fun add_queue/3),
294+
concurrent_for_all(exchanges, ActingUser, Map, VHost, fun add_exchange/3),
295+
concurrent_for_all(bindings, ActingUser, Map, VHost, fun add_binding/3),
283296
SuccessFun()
284297
catch {error, E} -> ErrorFun(format(E));
285298
exit:E -> ErrorFun(format(E))
286299
end.
287300

288-
for_all(Category, ActingUser, Definitions, Fun) ->
301+
sequential_for_all(Category, ActingUser, Definitions, Fun) ->
289302
case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of
290303
undefined -> ok;
291304
List ->
@@ -295,14 +308,12 @@ for_all(Category, ActingUser, Definitions, Fun) ->
295308
end,
296309
[begin
297310
%% keys are expected to be atoms
298-
Atomized = maps:fold(fun (K, V, Acc) ->
299-
maps:put(rabbit_data_coercion:to_atom(K), V, Acc)
300-
end, #{}, M),
311+
Atomized = atomize_keys(M),
301312
Fun(Atomized, ActingUser)
302313
end || M <- List, is_map(M)]
303314
end.
304315

305-
for_all(Name, ActingUser, Definitions, VHost, Fun) ->
316+
sequential_for_all(Name, ActingUser, Definitions, VHost, Fun) ->
306317

307318
case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
308319
undefined -> ok;
@@ -311,6 +322,57 @@ for_all(Name, ActingUser, Definitions, VHost, Fun) ->
311322
M <- List, is_map(M)]
312323
end.
313324

325+
concurrent_for_all(Category, ActingUser, Definitions, Fun) ->
326+
case maps:get(rabbit_data_coercion:to_atom(Category), Definitions, undefined) of
327+
undefined -> ok;
328+
List ->
329+
case length(List) of
330+
0 -> ok;
331+
N -> rabbit_log:info("Importing ~p ~s...", [N, human_readable_category_name(Category)])
332+
end,
333+
{ok, Gatherer} = gatherer:start_link(),
334+
[begin
335+
%% keys are expected to be atoms
336+
Atomized = atomize_keys(M),
337+
ok = gatherer:fork(Gatherer),
338+
worker_pool:submit_async(
339+
?IMPORT_WORK_POOL,
340+
fun() ->
341+
Fun(Atomized, ActingUser),
342+
gatherer:finish(Gatherer)
343+
end)
344+
end || M <- List, is_map(M)],
345+
gatherer:out(Gatherer),
346+
gatherer:stop(Gatherer)
347+
end.
348+
349+
concurrent_for_all(Name, ActingUser, Definitions, VHost, Fun) ->
350+
case maps:get(rabbit_data_coercion:to_atom(Name), Definitions, undefined) of
351+
undefined -> ok;
352+
List ->
353+
{ok, Gatherer} = gatherer:start_link(),
354+
[begin
355+
%% keys are expected to be atoms
356+
Atomized = M = atomize_keys(M),
357+
ok = gatherer:fork(Gatherer),
358+
worker_pool:submit_async(
359+
?IMPORT_WORK_POOL,
360+
fun() ->
361+
Fun(VHost, Atomized, ActingUser),
362+
gatherer:finish(Gatherer)
363+
end)
364+
end || M <- List, is_map(M)],
365+
gatherer:out(Gatherer),
366+
gatherer:stop(Gatherer)
367+
end.
368+
369+
-spec atomize_keys(#{any() => any()}) -> #{atom() => any()}.
370+
371+
atomize_keys(M) ->
372+
maps:fold(fun(K, V, Acc) ->
373+
maps:put(rabbit_data_coercion:to_atom(K), V, Acc)
374+
end, #{}, M).
375+
314376
-spec human_readable_category_name(definition_category()) -> string().
315377

316378
human_readable_category_name(topic_permissions) -> "topic permissions";
@@ -390,6 +452,8 @@ add_policy(VHost, Param, Username) ->
390452
exit(rabbit_data_coercion:to_binary(rabbit_misc:escape_html_tags(E ++ S)))
391453
end.
392454

455+
-spec add_vhost(map(), rabbit_types:username()) -> ok.
456+
393457
add_vhost(VHost, ActingUser) ->
394458
VHostName = maps:get(name, VHost, undefined),
395459
VHostTrace = maps:get(tracing, VHost, undefined),

0 commit comments

Comments
 (0)