Skip to content

Commit 32fd84b

Browse files
committed
Terminate channels and queue collector after reader
What? To not risk any regressions, keep the behaviour of RabbitMQ 3.x where channel processes and connection helper processes such as rabbit_queue_collector and rabbit_heartbeat are terminated after rabbit_reader process. For example, when RabbitMQ terminates with SIGTERM, we want exclusive queues being deleted synchronously (as in 3.x). Prior to this commit: 1. java -jar target/perf-test.jar -x 0 -y 1 2. ./sbin/rabbitmqctl stop_app resulted in the following crash: ``` crasher: initial call: rabbit_reader:init/2 pid: <0.2389.0> registered_name: [] exception exit: {noproc, {gen_server,call,[<0.2391.0>,delete_all,infinity]}} in function gen_server:call/3 (gen_server.erl, line 419) in call from rabbit_reader:close_connection/1 (rabbit_reader.erl, line 683) in call from rabbit_reader:send_error_on_channel0_and_close/4 (rabbit_reader.erl, line 1668) in call from rabbit_reader:handle_dependent_exit/3 (rabbit_reader.erl, line 710) in call from rabbit_reader:mainloop/4 (rabbit_reader.erl, line 530) in call from rabbit_reader:run/1 (rabbit_reader.erl, line 452) in call from rabbit_reader:start_connection/4 (rabbit_reader.erl, line 351) ``` because rabbit_queue_collector was terminated before rabbit_reader. This commit fixes this crash. How? Any Erlang supervisor including the rabbit_connection_sup supervisor terminates its children in the opposite of the start order. Since we want channel and queue collector processes - children of rabbit_connection_helper_sup - be terminated after the reader process, we must start rabbit_connection_helper_sup before the reader process. Since rabbit_connection_sup - the ranch_protocol implementation - does not know yet whether it will supervise an AMQP 0.9.1 or AMQP 1.0 connection, it creates rabbit_connection_helper_sup for each AMQP protocol version removing the superfluous one as soon as the protocol version negotation is completed. Spawning and deleting this addition process has a negligible effect on performance. The whole problem is that the rabbit_connection_helper_sup differs in its supervisor flags for AMQP 0.9.1 and AMQP 1.0 when it is started because for Native AMQP 1.0 in 4.0 we remove the unnecessary rabbit_amqp1_0_session_sup_sup supervisor level. Therefore, we achieve our goal: * in Native AMQP 1.0, 1 additional Erlang process is created per session * in AMQP 1.0 in 3.x, 15 additional Erlang processes are created per session
1 parent f2a1a28 commit 32fd84b

File tree

3 files changed

+67
-47
lines changed

3 files changed

+67
-47
lines changed

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,17 @@
7878
%%--------------------------------------------------------------------------
7979

8080
unpack_from_0_9_1(
81-
{Sock,RecvLen, PendingRecv, Buf, BufLen, ProxySocket,
81+
{Sock,RecvLen, PendingRecv, SupPid, Buf, BufLen, ProxySocket,
8282
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt},
83-
Parent, ConnectionHelperSupPid, HandshakeTimeout) ->
83+
Parent, HandshakeTimeout) ->
8484
#v1{parent = Parent,
8585
sock = Sock,
8686
callback = handshake,
8787
recv_len = RecvLen,
8888
pending_recv = PendingRecv,
8989
connection_state = pre_init,
9090
heartbeater = none,
91-
helper_sup = ConnectionHelperSupPid,
91+
helper_sup = SupPid,
9292
buf = Buf,
9393
buf_len = BufLen,
9494
proxy_socket = ProxySocket,
@@ -612,13 +612,8 @@ handle_input(Callback, Data, _State) ->
612612
init(Mode, PackedState) ->
613613
{ok, HandshakeTimeout} = application:get_env(rabbit, handshake_timeout),
614614
{parent, Parent} = erlang:process_info(self(), parent),
615-
ConnectionHelperSupFlags = #{strategy => one_for_all,
616-
intensity => 0,
617-
period => 1,
618-
auto_shutdown => any_significant},
619-
{ok, ConnectionHelperSupPid} = rabbit_connection_sup:start_connection_helper_sup(
620-
Parent, ConnectionHelperSupFlags),
621-
State0 = unpack_from_0_9_1(PackedState, Parent, ConnectionHelperSupPid, HandshakeTimeout),
615+
ok = rabbit_connection_sup:remove_connection_helper_sup(Parent, helper_sup_amqp_091),
616+
State0 = unpack_from_0_9_1(PackedState, Parent, HandshakeTimeout),
622617
State = start_1_0_connection(Mode, State0),
623618
%% By invoking recvloop here we become 1.0.
624619
recvloop(sys:debug_options([]), State).

deps/rabbit/src/rabbit_connection_sup.erl

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
-module(rabbit_connection_sup).
99

10-
%% Supervisor for a (network) AMQP 0-9-1 client connection.
10+
%% Supervisor for a (network) AMQP client connection.
1111
%%
1212
%% Supervises
1313
%%
@@ -21,7 +21,7 @@
2121

2222
-export([start_link/3,
2323
reader/1,
24-
start_connection_helper_sup/2
24+
remove_connection_helper_sup/2
2525
]).
2626

2727
-export([init/1]).
@@ -35,12 +35,48 @@
3535

3636
start_link(Ref, _Transport, _Opts) ->
3737
{ok, SupPid} = supervisor:start_link(?MODULE, []),
38+
%% We need to get channels in the hierarchy here so they get shut
39+
%% down after the reader, so the reader gets a chance to terminate
40+
%% them cleanly. But for 1.0 readers we can't start the real
41+
%% ch_sup_sup (because we don't know if we will be 0-9-1 or 1.0) -
42+
%% so we add another supervisor into the hierarchy.
43+
%%
44+
%% This supervisor also acts as an intermediary for heartbeaters and
45+
%% the queue collector process, since these must not be siblings of the
46+
%% reader due to the potential for deadlock if they are added/restarted
47+
%% whilst the supervision tree is shutting down.
48+
ChildSpec = #{restart => transient,
49+
significant => true,
50+
shutdown => infinity,
51+
type => supervisor},
52+
{ok, HelperSup091} =
53+
supervisor:start_child(
54+
SupPid,
55+
ChildSpec#{
56+
id => helper_sup_amqp_091,
57+
start => {rabbit_connection_helper_sup, start_link,
58+
[#{strategy => one_for_one,
59+
intensity => 10,
60+
period => 10,
61+
auto_shutdown => any_significant}]}}
62+
),
63+
{ok, HelperSup10} =
64+
supervisor:start_child(
65+
SupPid,
66+
ChildSpec#{
67+
id => helper_sup_amqp_10,
68+
start => {rabbit_connection_helper_sup, start_link,
69+
[#{strategy => one_for_all,
70+
intensity => 0,
71+
period => 1,
72+
auto_shutdown => any_significant}]}}
73+
),
3874
{ok, ReaderPid} =
3975
supervisor:start_child(
4076
SupPid,
4177
#{
4278
id => reader,
43-
start => {rabbit_reader, start_link, [Ref]},
79+
start => {rabbit_reader, start_link, [{HelperSup091, HelperSup10}, Ref]},
4480
restart => transient,
4581
significant => true,
4682
shutdown => ?WORKER_WAIT,
@@ -51,23 +87,13 @@ start_link(Ref, _Transport, _Opts) ->
5187
{ok, SupPid, ReaderPid}.
5288

5389
-spec reader(pid()) -> pid().
54-
5590
reader(Pid) ->
5691
hd(rabbit_misc:find_child(Pid, reader)).
5792

58-
-spec start_connection_helper_sup(pid(), supervisor:sup_flags()) ->
59-
supervisor:startchild_ret().
60-
start_connection_helper_sup(ConnectionSupPid, ConnectionHelperSupFlags) ->
61-
supervisor:start_child(
62-
ConnectionSupPid,
63-
#{
64-
id => helper_sup,
65-
start => {rabbit_connection_helper_sup, start_link, [ConnectionHelperSupFlags]},
66-
restart => transient,
67-
significant => true,
68-
shutdown => infinity,
69-
type => supervisor
70-
}).
93+
-spec remove_connection_helper_sup(pid(), helper_sup_amqp_091 | helper_sup_amqp_10) -> ok.
94+
remove_connection_helper_sup(ConnectionSupPid, ConnectionHelperId) ->
95+
ok = supervisor:terminate_child(ConnectionSupPid, ConnectionHelperId),
96+
ok = supervisor:delete_child(ConnectionSupPid, ConnectionHelperId).
7197

7298
%%--------------------------------------------------------------------------
7399

deps/rabbit/src/rabbit_reader.erl

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@
4343
-include_lib("rabbit_common/include/rabbit_framing.hrl").
4444
-include_lib("rabbit_common/include/rabbit.hrl").
4545

46-
-export([start_link/1, info_keys/0, info/1, info/2, force_event_refresh/2,
46+
-export([start_link/2, info_keys/0, info/1, info/2, force_event_refresh/2,
4747
shutdown/2]).
4848

4949
-export([system_continue/3, system_terminate/4, system_code_change/4]).
5050

51-
-export([init/2, mainloop/4, recvloop/4]).
51+
-export([init/3, mainloop/4, recvloop/4]).
5252

5353
-export([conserve_resources/3, server_properties/1]).
5454

@@ -78,7 +78,9 @@
7878
%% pre_init | securing | running | blocking | blocked | closing | closed | {become, F}
7979
connection_state,
8080
%% see comment in rabbit_connection_sup:start_link/0
81-
helper_sup,
81+
helper_sup :: {HelperSupAmqp091 :: pid(),
82+
HelperSupAmqp10 :: pid()} % pre version negotiation
83+
| pid(), % post version negotiation
8284
%% takes care of cleaning up exclusive queues,
8385
%% see rabbit_queue_collector
8486
queue_collector,
@@ -145,25 +147,25 @@
145147

146148
%%--------------------------------------------------------------------------
147149

148-
-spec start_link(ranch:ref()) ->
150+
-spec start_link({pid(), pid()}, ranch:ref()) ->
149151
rabbit_types:ok(pid()).
150-
start_link(Ref) ->
151-
Pid = proc_lib:spawn_link(?MODULE, init, [self(), Ref]),
152+
start_link(HelperSups, Ref) ->
153+
Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSups, Ref]),
152154
{ok, Pid}.
153155

154156
-spec shutdown(pid(), string()) -> 'ok'.
155157

156158
shutdown(Pid, Explanation) ->
157159
gen_server:call(Pid, {shutdown, Explanation}, infinity).
158160

159-
-spec init(pid(), ranch:ref()) ->
161+
-spec init(pid(), {pid(), pid()}, ranch:ref()) ->
160162
no_return().
161-
init(Parent, Ref) ->
163+
init(Parent, HelperSups, Ref) ->
162164
?LG_PROCESS_TYPE(reader),
163165
{ok, Sock} = rabbit_networking:handshake(Ref,
164166
application:get_env(rabbit, proxy_protocol, false)),
165167
Deb = sys:debug_options([]),
166-
start_connection(Parent, Ref, Deb, Sock).
168+
start_connection(Parent, HelperSups, Ref, Deb, Sock).
167169

168170
-spec system_continue(_,_,{[binary()], non_neg_integer(), #v1{}}) -> any().
169171

@@ -290,10 +292,10 @@ socket_op(Sock, Fun) ->
290292
exit(normal)
291293
end.
292294

293-
-spec start_connection(pid(), ranch:ref(), any(), rabbit_net:socket()) ->
295+
-spec start_connection(pid(), {pid(), pid()}, ranch:ref(), any(), rabbit_net:socket()) ->
294296
no_return().
295297

296-
start_connection(Parent, RanchRef, Deb, Sock) ->
298+
start_connection(Parent, HelperSups, RanchRef, Deb, Sock) ->
297299
process_flag(trap_exit, true),
298300
RealSocket = rabbit_net:unwrap_socket(Sock),
299301
Name = case rabbit_net:connection_string(Sock, inbound) of
@@ -336,7 +338,7 @@ start_connection(Parent, RanchRef, Deb, Sock) ->
336338
pending_recv = false,
337339
connection_state = pre_init,
338340
queue_collector = undefined, %% started on tune-ok
339-
helper_sup = none,
341+
helper_sup = HelperSups,
340342
heartbeater = none,
341343
channel_sup_sup_pid = none,
342344
channel_count = 0,
@@ -1104,13 +1106,9 @@ start_091_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
11041106
Protocol,
11051107
#v1{parent = Parent,
11061108
sock = Sock,
1109+
helper_sup = {HelperSup091, _HelperSup10},
11071110
connection = Connection} = State0) ->
1108-
ConnectionHelperSupFlags = #{strategy => one_for_one,
1109-
intensity => 10,
1110-
period => 10,
1111-
auto_shutdown => any_significant},
1112-
{ok, ConnectionHelperSupPid} = rabbit_connection_sup:start_connection_helper_sup(
1113-
Parent, ConnectionHelperSupFlags),
1111+
ok = rabbit_connection_sup:remove_connection_helper_sup(Parent, helper_sup_amqp_10),
11141112
rabbit_networking:register_connection(self()),
11151113
Start = #'connection.start'{
11161114
version_major = ProtocolMajor,
@@ -1123,7 +1121,7 @@ start_091_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
11231121
timeout_sec = ?NORMAL_TIMEOUT,
11241122
protocol = Protocol},
11251123
connection_state = starting,
1126-
helper_sup = ConnectionHelperSupPid},
1124+
helper_sup = HelperSup091},
11271125
switch_callback(State, frame_header, 7).
11281126

11291127
-spec refuse_connection(rabbit_net:socket(), any()) -> no_return().
@@ -1647,6 +1645,7 @@ become_10(Id, State = #v1{sock = Sock}) ->
16471645
pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
16481646
recv_len = RecvLen,
16491647
pending_recv = PendingRecv,
1648+
helper_sup = {_HelperSup091, HelperSup10},
16501649
proxy_socket = ProxySocket,
16511650
connection = #connection{
16521651
name = Name,
@@ -1655,7 +1654,7 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
16551654
port = Port,
16561655
peer_port = PeerPort,
16571656
connected_at = ConnectedAt}}) ->
1658-
{Sock, RecvLen, PendingRecv, Buf, BufLen, ProxySocket,
1657+
{Sock, RecvLen, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket,
16591658
Name, Host, PeerHost, Port, PeerPort, ConnectedAt}.
16601659

16611660
respond_and_close(State, Channel, Protocol, Reason, LogErr) ->

0 commit comments

Comments
 (0)