Skip to content

Commit 4bc1e69

Browse files
committed
Speed up AMQP connection and session (de)registration
## What? Prior to this commit connecting 40k AMQP clients with 5 sessions each, i.e. 200k sessions in total, took 7m55s. After to this commit the same scenario takes 1m37s. Additionally, prior to this commit, disconnecting all connections and sessions at once caused the pg process to become overloaded taking ~14 minutes to process its mailbox. After this commit, these same deregistrations take less than 5 seconds. To repro: ```go package main import ( "context" "log" "time" "github.com/Azure/go-amqp" ) func main() { for i := 0; i < 40_000; i++ { if i%1000 == 0 { log.Printf("opened %d connections", i) } conn, err := amqp.Dial( context.TODO(), "amqp://localhost", &amqp.ConnOptions{SASLType: amqp.SASLTypeAnonymous()}) if err != nil { log.Fatal("open connection:", err) } for j := 0; j < 5; j++ { _, err = conn.NewSession(context.TODO(), nil) if err != nil { log.Fatal("begin session:", err) } } } log.Println("opened all connections") time.Sleep(5 * time.Hour) } ``` ## How? This commit uses separate pg scopes (that is processes and ETS tables) to register AMQP connections and AMQP sessions. Since each Pid is now its own group, registration and deregistration is fast. (cherry picked from commit 93d1ac9)
1 parent ea1b672 commit 4bc1e69

File tree

4 files changed

+36
-17
lines changed

4 files changed

+36
-17
lines changed

deps/rabbit/src/rabbit.erl

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,17 @@
2929
base_product_name/0,
3030
base_product_version/0,
3131
motd_file/0,
32-
motd/0]).
32+
motd/0,
33+
pg_local_scope/1]).
3334
%% For CLI, testing and mgmt-agent.
3435
-export([set_log_level/1, log_locations/0, config_files/0]).
3536
-export([is_booted/1, is_booted/0, is_booting/1, is_booting/0]).
3637

3738
%%---------------------------------------------------------------------------
3839
%% Boot steps.
39-
-export([maybe_insert_default_data/0, boot_delegate/0, recover/0, pg_local/0]).
40+
-export([maybe_insert_default_data/0, boot_delegate/0, recover/0,
41+
pg_local_amqp_session/0,
42+
pg_local_amqp_connection/0]).
4043

4144
%% for tests
4245
-export([validate_msg_store_io_batch_size_and_credit_disc_bound/2]).
@@ -263,9 +266,15 @@
263266
{mfa, {rabbit_vhosts, boot, []}},
264267
{requires, notify_cluster}]}).
265268

266-
-rabbit_boot_step({pg_local,
267-
[{description, "local-only pg scope"},
268-
{mfa, {rabbit, pg_local, []}},
269+
-rabbit_boot_step({pg_local_amqp_session,
270+
[{description, "local-only pg scope for AMQP sessions"},
271+
{mfa, {rabbit, pg_local_amqp_session, []}},
272+
{requires, kernel_ready},
273+
{enables, core_initialized}]}).
274+
275+
-rabbit_boot_step({pg_local_amqp_connection,
276+
[{description, "local-only pg scope for AMQP connections"},
277+
{mfa, {rabbit, pg_local_amqp_connection, []}},
269278
{requires, kernel_ready},
270279
{enables, core_initialized}]}).
271280

@@ -1115,11 +1124,18 @@ boot_delegate() ->
11151124
-spec recover() -> 'ok'.
11161125

11171126
recover() ->
1118-
ok = rabbit_vhost:recover(),
1119-
ok.
1127+
ok = rabbit_vhost:recover().
1128+
1129+
pg_local_amqp_session() ->
1130+
PgScope = pg_local_scope(amqp_session),
1131+
rabbit_sup:start_child(pg_amqp_session, pg, [PgScope]).
1132+
1133+
pg_local_amqp_connection() ->
1134+
PgScope = pg_local_scope(amqp_connection),
1135+
rabbit_sup:start_child(pg_amqp_connection, pg, [PgScope]).
11201136

1121-
pg_local() ->
1122-
rabbit_sup:start_child(pg, [node()]).
1137+
pg_local_scope(Prefix) ->
1138+
list_to_atom(io_lib:format("~s_~s", [Prefix, node()])).
11231139

11241140
-spec maybe_insert_default_data() -> 'ok'.
11251141

deps/rabbit/src/rabbit_amqp1_0.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@
66
%%
77
-module(rabbit_amqp1_0).
88

9-
-define(PROCESS_GROUP_NAME, rabbit_amqp10_connections).
10-
119
-export([list_local/0,
1210
register_connection/1]).
1311

@@ -36,8 +34,11 @@ emit_connection_info_local(Items, Ref, AggregatorPid) ->
3634

3735
-spec list_local() -> [pid()].
3836
list_local() ->
39-
pg:get_local_members(node(), ?PROCESS_GROUP_NAME).
37+
pg:which_groups(pg_scope()).
4038

4139
-spec register_connection(pid()) -> ok.
4240
register_connection(Pid) ->
43-
ok = pg:join(node(), ?PROCESS_GROUP_NAME, Pid).
41+
ok = pg:join(pg_scope(), Pid, Pid).
42+
43+
pg_scope() ->
44+
rabbit:pg_local_scope(amqp_connection).

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@
6969
?V_1_0_SYMBOL_MODIFIED]).
7070
-define(DEFAULT_EXCHANGE_NAME, <<>>).
7171
-define(PROTOCOL, amqp10).
72-
-define(PROCESS_GROUP_NAME, amqp_sessions).
7372
-define(MAX_PERMISSION_CACHE_SIZE, 12).
7473
-define(HIBERNATE_AFTER, 6_000).
7574
-define(CREDIT_REPLY_TIMEOUT, 30_000).
@@ -373,8 +372,8 @@ init({ReaderPid, WriterPid, ChannelNum, MaxFrameSize, User, Vhost, ConnName,
373372
handle_max = HandleMax0}}) ->
374373
process_flag(trap_exit, true),
375374
process_flag(message_queue_data, off_heap),
376-
ok = pg:join(node(), ?PROCESS_GROUP_NAME, self()),
377375

376+
ok = pg:join(pg_scope(), self(), self()),
378377
Alarms0 = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
379378
Alarms = sets:from_list(Alarms0, [{version, 2}]),
380379

@@ -439,7 +438,7 @@ terminate(_Reason, #state{incoming_links = IncomingLinks,
439438

440439
-spec list_local() -> [pid()].
441440
list_local() ->
442-
pg:get_local_members(node(), ?PROCESS_GROUP_NAME).
441+
pg:which_groups(pg_scope()).
443442

444443
-spec conserve_resources(pid(),
445444
rabbit_alarm:resource_alarm_source(),
@@ -3432,6 +3431,9 @@ is_valid_max(Val) ->
34323431
Val > 0 andalso
34333432
Val =< ?UINT_MAX.
34343433

3434+
pg_scope() ->
3435+
rabbit:pg_local_scope(amqp_session).
3436+
34353437
-spec cap_credit(rabbit_queue_type:credit()) ->
34363438
rabbit_queue_type:credit().
34373439
cap_credit(DesiredCredit) ->

deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ init([{Listeners, SslListeners0}]) ->
2929
end,
3030
%% Use separate process group scope per RabbitMQ node. This achieves a local-only
3131
%% process group which requires less memory with millions of connections.
32-
PgScope = list_to_atom(io_lib:format("~s_~s", [?PG_SCOPE, node()])),
32+
PgScope = rabbit:pg_local_scope(?PG_SCOPE),
3333
persistent_term:put(?PG_SCOPE, PgScope),
3434
{ok,
3535
{#{strategy => one_for_all,

0 commit comments

Comments
 (0)