Skip to content

Commit d84d666

Browse files
committed
Remove one supervisory level
Given that we now have 1 writer per AMQP 1.0 connection, this commit changes the supervisor hierarchy such that only 1 additional process (rabbit_amqp1_0_session) is created per AMQP 1.0 session.
1 parent 823ae44 commit d84d666

File tree

9 files changed

+67
-122
lines changed

9 files changed

+67
-122
lines changed

deps/rabbitmq_amqp1_0/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ rabbitmq_app(
5050
priv = [":priv"],
5151
deps = [
5252
"//deps/amqp10_common:erlang_app",
53-
"//deps/amqp_client:erlang_app",
5453
"//deps/rabbit:erlang_app",
5554
"//deps/rabbit_common:erlang_app",
5655
],

deps/rabbitmq_amqp1_0/app.bzl

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ def all_beam_files(name = "all_beam_files"):
1414
"src/rabbit_amqp1_0_reader.erl",
1515
"src/rabbit_amqp1_0_session.erl",
1616
"src/rabbit_amqp1_0_session_sup.erl",
17-
"src/rabbit_amqp1_0_session_sup_sup.erl",
1817
"src/rabbit_amqp1_0_util.erl",
1918
"src/rabbit_amqp1_0_writer.erl",
2019
],
@@ -24,7 +23,6 @@ def all_beam_files(name = "all_beam_files"):
2423
erlc_opts = "//:erlc_opts",
2524
deps = [
2625
"//deps/amqp10_common:erlang_app",
27-
"//deps/amqp_client:erlang_app",
2826
"//deps/rabbit_common:erlang_app",
2927
"//deps/rabbitmq_cli:erlang_app",
3028
],
@@ -45,7 +43,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
4543
"src/rabbit_amqp1_0_reader.erl",
4644
"src/rabbit_amqp1_0_session.erl",
4745
"src/rabbit_amqp1_0_session_sup.erl",
48-
"src/rabbit_amqp1_0_session_sup_sup.erl",
4946
"src/rabbit_amqp1_0_util.erl",
5047
"src/rabbit_amqp1_0_writer.erl",
5148
],
@@ -55,7 +52,6 @@ def all_test_beam_files(name = "all_test_beam_files"):
5552
erlc_opts = "//:test_erlc_opts",
5653
deps = [
5754
"//deps/amqp10_common:erlang_app",
58-
"//deps/amqp_client:erlang_app",
5955
"//deps/rabbit_common:erlang_app",
6056
"//deps/rabbitmq_cli:erlang_app",
6157
],
@@ -86,7 +82,6 @@ def all_srcs(name = "all_srcs"):
8682
"src/rabbit_amqp1_0_reader.erl",
8783
"src/rabbit_amqp1_0_session.erl",
8884
"src/rabbit_amqp1_0_session_sup.erl",
89-
"src/rabbit_amqp1_0_session_sup_sup.erl",
9085
"src/rabbit_amqp1_0_util.erl",
9186
"src/rabbit_amqp1_0_writer.erl",
9287
],

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@
3333
parent,
3434
sock :: rabbit_net:socket(),
3535
connection, callback, recv_len, pending_recv,
36-
connection_state, heartbeater, helper_sup,
37-
channel_sup_sup_pid, buf, buf_len, throttle, proxy_socket,
36+
connection_state, heartbeater,
37+
helper_sup :: pid(),
38+
session_sup :: rabbit_types:option(pid()),
39+
buf, buf_len, throttle, proxy_socket,
3840
tracked_channels,
3941
writer :: rabbit_types:option(pid())
4042
}).
@@ -630,25 +632,23 @@ start_1_0_connection(amqp,
630632

631633
start_1_0_connection0(Mode, State = #v1{connection = Connection,
632634
helper_sup = HelperSup}) ->
633-
ChannelSupSupPid =
634-
case Mode of
635-
sasl ->
636-
undefined;
637-
amqp ->
638-
StartMFA = {rabbit_amqp1_0_session_sup_sup, start_link, []},
639-
ChildSpec = #{id => channel_sup_sup,
640-
start => StartMFA,
641-
restart => transient,
642-
significant => true,
643-
shutdown => infinity,
644-
type => supervisor,
645-
modules => [rabbit_amqp1_0_session_sup_sup]},
646-
{ok, Pid} = supervisor:start_child(HelperSup, ChildSpec),
647-
Pid
648-
end,
635+
SessionSup = case Mode of
636+
sasl ->
637+
undefined;
638+
amqp ->
639+
ChildSpec = #{id => session_sup,
640+
start => {rabbit_amqp1_0_session_sup, start_link, [self()]},
641+
restart => transient,
642+
significant => true,
643+
shutdown => infinity,
644+
type => supervisor,
645+
modules => [rabbit_amqp1_0_session_sup]},
646+
{ok, Pid} = supervisor:start_child(HelperSup, ChildSpec),
647+
Pid
648+
end,
649649
switch_callback(State#v1{connection = Connection#v1_connection{
650650
timeout_sec = ?NORMAL_TIMEOUT},
651-
channel_sup_sup_pid = ChannelSupSupPid,
651+
session_sup = SessionSup,
652652
connection_state = starting},
653653
{frame_header_1_0, Mode}, 8).
654654

@@ -734,9 +734,10 @@ auth_phase_1_0(Response,
734734
handshake, 8)
735735
end.
736736

737-
track_channel(Channel, ChFrPid, State) ->
738-
rabbit_log:debug("AMQP 1.0 opened channel = ~tp " , [{Channel, ChFrPid}]),
739-
State#v1{tracked_channels = maps:put(Channel, ChFrPid, State#v1.tracked_channels)}.
737+
track_channel(ChannelNum, SessionPid, State) ->
738+
rabbit_log:debug("AMQP 1.0 created session process ~p for channel number ~b",
739+
[SessionPid, ChannelNum]),
740+
State#v1{tracked_channels = maps:put(ChannelNum, SessionPid, State#v1.tracked_channels)}.
740741

741742
untrack_channel(Channel, State) ->
742743
case maps:take(Channel, State#v1.tracked_channels) of
@@ -747,8 +748,8 @@ untrack_channel(Channel, State) ->
747748
end.
748749

749750
send_to_new_1_0_session(
750-
Channel, Frame,
751-
#v1{channel_sup_sup_pid = ChanSupSup,
751+
ChannelNum, Frame,
752+
#v1{session_sup = SessionSup,
752753
connection = #v1_connection{frame_max = FrameMax0,
753754
hostname = Hostname,
754755
user = User},
@@ -758,23 +759,20 @@ send_to_new_1_0_session(
758759
unlimited -> unlimited;
759760
_ -> FrameMax0 - 8
760761
end,
761-
ChildArg = {amqp10_framing,
762-
Channel,
763-
FrameMax,
764-
self(),
765-
WriterPid,
766-
User,
767-
vhost(Hostname)},
768-
%% The equivalent, start_channel is in channel_sup_sup.
769-
case rabbit_amqp1_0_session_sup_sup:start_session(ChanSupSup, ChildArg) of
770-
{ok, _ChSupPid, ChFrPid} ->
771-
erlang:monitor(process, ChFrPid),
772-
ModifiedState = track_channel(Channel, ChFrPid, State),
762+
ChildArgs = [WriterPid,
763+
ChannelNum,
764+
FrameMax,
765+
User,
766+
vhost(Hostname)],
767+
case rabbit_amqp1_0_session_sup:start_session(SessionSup, ChildArgs) of
768+
{ok, SessionPid} ->
769+
erlang:monitor(process, SessionPid),
770+
ModifiedState = track_channel(ChannelNum, SessionPid, State),
773771
rabbit_log_connection:info(
774772
"AMQP 1.0 connection ~tp: "
775773
"user '~ts' authenticated and granted access to vhost '~ts'",
776774
[self(), User#user.username, vhost(Hostname)]),
777-
ok = rabbit_amqp1_0_session:process_frame(ChFrPid, Frame),
775+
ok = rabbit_amqp1_0_session:process_frame(SessionPid, Frame),
778776
ModifiedState;
779777
{error, {not_allowed, _}} ->
780778
rabbit_log:error("AMQP 1.0: user '~ts' is not allowed to access virtual host '~ts'",

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
% [2.8.10]
3636
-type sequence_no() :: 0..16#ffffffff.
3737

38-
-export([start_link/1]).
38+
-export([start_link/6]).
3939
-export([process_frame/2,
4040
get_info/1]).
4141
-export([init/1,
@@ -115,7 +115,8 @@
115115
% confirmed = [] :: [[non_neg_integer(),...]]
116116
}).
117117

118-
start_link(Args) ->
118+
start_link(ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost) ->
119+
Args = {ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost},
119120
gen_server:start_link(?MODULE, Args, []).
120121

121122
get_info(Pid) ->
@@ -125,7 +126,7 @@ process_frame(Pid, Frame) ->
125126
credit_flow:send(Pid),
126127
gen_server:cast(Pid, {frame, Frame, self()}).
127128

128-
init({Channel, ReaderPid, WriterPid, User, Vhost, FrameMax}) ->
129+
init({ReaderPid, WriterPid, ChannelNum, FrameMax, User, Vhost}) ->
129130
process_flag(trap_exit, true),
130131
%% TODO tick_timer with consumer_timeout and permission expiry as done in channel?
131132
% put(permission_cache_can_expire, rabbit_access_control:permission_cache_can_expire(User)),
@@ -134,9 +135,8 @@ init({Channel, ReaderPid, WriterPid, User, Vhost, FrameMax}) ->
134135
frame_max = FrameMax,
135136
user = User,
136137
vhost = Vhost,
137-
channel_num = Channel,
138-
next_publish_id = 0
139-
}}.
138+
channel_num = ChannelNum,
139+
next_publish_id = 0}}.
140140

141141
terminate(_Reason, _State) ->
142142
ok.

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup.erl

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,41 +10,31 @@
1010

1111
-include_lib("rabbit_common/include/rabbit.hrl").
1212

13-
-export([start_link/1]).
14-
-export([init/1]).
13+
%% client API
14+
-export([start_link/1,
15+
start_session/2]).
1516

16-
-export_type([start_link_arg/0]).
17-
-type start_link_arg() :: {amqp10_framing,
18-
rabbit_channel:channel_number(),
19-
non_neg_integer() | 'unlimited',
20-
pid(),
21-
pid(),
22-
rabbit_types:user(),
23-
rabbit_types:vhost()}.
17+
%% supervisor callback
18+
-export([init/1]).
2419

25-
-spec start_link(start_link_arg()) ->
26-
{'ok', pid(), pid()} | {'error', term()}.
27-
start_link({amqp10_framing, Channel, FrameMax, ReaderPid, WriterPid, User, VHost}) ->
28-
{ok, SupPid} = supervisor:start_link(?MODULE, []),
29-
SessionSpec = #{id => channel,
30-
start =>
31-
{rabbit_amqp1_0_session, start_link,
32-
[{Channel, ReaderPid, WriterPid, User, VHost, FrameMax}]},
33-
restart => transient,
34-
significant => true,
35-
shutdown => ?WORKER_WAIT,
36-
type => worker,
37-
modules => [rabbit_amqp1_0_session]},
38-
case supervisor:start_child(SupPid, SessionSpec) of
39-
{ok, ChannelPid} ->
40-
{ok, SupPid, ChannelPid};
41-
{error, Reason} ->
42-
{error, Reason}
43-
end.
20+
-spec start_link(Reader :: pid()) ->
21+
supervisor:startlink_ret().
22+
start_link(ReaderPid) ->
23+
supervisor:start_link(?MODULE, ReaderPid).
4424

45-
init([]) ->
46-
SupFlags = #{strategy => one_for_all,
25+
init(ReaderPid) ->
26+
SupFlags = #{strategy => simple_one_for_one,
4727
intensity => 0,
48-
period => 1,
49-
auto_shutdown => any_significant},
50-
{ok, {SupFlags, []}}.
28+
period => 1},
29+
ChildSpec = #{id => amqp1_0_session,
30+
start => {rabbit_amqp1_0_session, start_link, [ReaderPid]},
31+
restart => temporary,
32+
shutdown => ?WORKER_WAIT,
33+
type => worker,
34+
modules => [rabbit_amqp1_0_session]},
35+
{ok, {SupFlags, [ChildSpec]}}.
36+
37+
-spec start_session(pid(), list()) ->
38+
supervisor:startchild_ret().
39+
start_session(SessionSupPid, Args) ->
40+
supervisor:start_child(SessionSupPid, Args).

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session_sup_sup.erl

Lines changed: 0 additions & 36 deletions
This file was deleted.

deps/rabbitmq_amqp1_0/test/system_SUITE_data/fsharp-tests/Program.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ module Test =
435435
receiver.Close()
436436
with
437437
| :? Amqp.AmqpException as ae ->
438-
assertEqual (ae.Error.Condition) (Symbol cond)
438+
assertEqual (Symbol cond) (ae.Error.Condition)
439439
| _ -> failwith "invalid expection thrown"
440440

441441
let authFailure uri =

deps/rabbitmq_mqtt/app.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,5 +360,5 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
360360
outs = ["test/protocol_interop_SUITE.beam"],
361361
app_name = "rabbitmq_mqtt",
362362
erlc_opts = "//:test_erlc_opts",
363-
deps = ["//deps/amqp_client:erlang_app", "//deps/rabbitmq_stomp:erlang_app"],
363+
deps = ["//deps/amqp10_common:erlang_app", "//deps/amqp_client:erlang_app", "//deps/rabbitmq_stomp:erlang_app"],
364364
)

moduleindex.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -853,7 +853,6 @@ rabbitmq_amqp1_0:
853853
- rabbit_amqp1_0_reader
854854
- rabbit_amqp1_0_session
855855
- rabbit_amqp1_0_session_sup
856-
- rabbit_amqp1_0_session_sup_sup
857856
- rabbit_amqp1_0_util
858857
- rabbit_amqp1_0_writer
859858
rabbitmq_auth_backend_cache:

0 commit comments

Comments
 (0)