Skip to content

Commit 540aee1

Browse files
committed
Use 1 writer process per AMQP 1.0 connection
AMQP 0.9.1 uses a separate rabbit_writer Erlang process per AMQP 0.9.1 channel. Prior to this commit, AMQP 1.0 used a separate rabbit_amqp1_0_writer Erlang process per AMQP 1.0 session. Advantage of single writer proc per session (prior to this commit): * High parallelism for serialising packets if multiple sessions within a connection write heavily at the same time. This commit uses a single writer process per AMQP 1.0 connection that is shared across all AMQP 1.0 sessions. Advantages of single writer proc per connection (this commit): * Lower memory usage with hundreds of thousands of AMQP 1.0 sessions * Less TCP and IP header overhead given that the single writer process can accumulate across all sessions bytes worth a MSS before flushing the socket. In other words, this commit decides that a reader / writer process pair per AMQP 1.0 connection is good enough for bi-directional TRANSFER flows. Having a writer per session is too heavy. The final goal by previous commits and follow-up commits is to reduce the total number of Erlang processes to allow hundreds of thousands of AMQP clients to connect while keeping resource usage in RabbitMQ at a low level. We still ensure high thoughput by having separate reader, writer, and session processes.
1 parent f8bdd33 commit 540aee1

File tree

6 files changed

+240
-301
lines changed

6 files changed

+240
-301
lines changed

deps/rabbit_common/src/rabbit_net.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@
3232
{raw, non_neg_integer(), non_neg_integer(), binary()}].
3333
-type hostname() :: inet:hostname().
3434
-type ip_port() :: inet:port_number().
35-
-type rabbit_proxy_socket() :: {'rabbit_proxy_socket', ranch_transport:socket(), ranch_proxy_header:proxy_info()}.
35+
-type proxy_socket() :: {'rabbit_proxy_socket', ranch_transport:socket(), ranch_proxy_header:proxy_info()}.
3636
% -type host_or_ip() :: binary() | inet:ip_address().
3737
-spec is_ssl(socket()) -> boolean().
3838
-spec ssl_info(socket()) -> 'nossl' | ok_val_or_error([{atom(), any()}]).
39-
-spec proxy_ssl_info(socket(), rabbit_proxy_socket() | 'undefined') -> 'nossl' | ok_val_or_error([{atom(), any()}]).
39+
-spec proxy_ssl_info(socket(), proxy_socket() | 'undefined') -> 'nossl' | ok_val_or_error([{atom(), any()}]).
4040
-spec controlling_process(socket(), pid()) -> ok_or_any_error().
4141
-spec getstat(socket(), [stat_option()]) ->
4242
ok_val_or_error([{stat_option(), integer()}]).

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_reader.erl

Lines changed: 99 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,16 @@
2828

2929
%%--------------------------------------------------------------------------
3030

31-
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
32-
connection_state, heartbeater, helper_sup,
33-
channel_sup_sup_pid, buf, buf_len, throttle, proxy_socket,
34-
tracked_channels}).
31+
-record(v1,
32+
{
33+
parent,
34+
sock :: rabbit_net:socket(),
35+
connection, callback, recv_len, pending_recv,
36+
connection_state, heartbeater, helper_sup,
37+
channel_sup_sup_pid, buf, buf_len, throttle, proxy_socket,
38+
tracked_channels,
39+
writer :: rabbit_types:option(pid())
40+
}).
3541

3642
-record(v1_connection, {name :: binary(),
3743
host,
@@ -385,64 +391,62 @@ parse_1_0_frame(Payload, _Channel) ->
385391
_ -> {Perf, Rest}
386392
end.
387393

388-
handle_1_0_connection_frame(#'v1_0.open'{ max_frame_size = ClientFrameMax,
389-
channel_max = ClientChannelMax,
390-
idle_time_out = IdleTimeout,
391-
hostname = Hostname },
392-
State = #v1{
393-
connection_state = starting,
394-
connection = Connection,
395-
throttle = Throttle,
396-
helper_sup = HelperSupPid,
397-
sock = Sock}) ->
394+
handle_1_0_connection_frame(
395+
#'v1_0.open'{max_frame_size = ClientFrameMax,
396+
channel_max = ClientChannelMax,
397+
idle_time_out = IdleTimeout,
398+
hostname = Hostname },
399+
#v1{connection_state = starting,
400+
connection = Connection,
401+
throttle = Throttle,
402+
helper_sup = HelperSupPid,
403+
sock = Sock} = State0) ->
398404
ClientHeartbeatSec = case IdleTimeout of
399-
undefined -> 0;
405+
undefined -> 0;
400406
{uint, Interval} -> Interval div 1000
401407
end,
402-
FrameMax = case ClientFrameMax of
403-
undefined -> unlimited;
404-
{_, FM} -> FM
405-
end,
408+
FrameMax = case ClientFrameMax of
409+
undefined -> unlimited;
410+
{_, FM} -> FM
411+
end,
406412
{ok, HeartbeatSec} = application:get_env(rabbit, heartbeat),
407-
State1 =
408-
if (FrameMax =/= unlimited) and (FrameMax < ?FRAME_1_0_MIN_SIZE) ->
409-
protocol_error(?V_1_0_AMQP_ERROR_FRAME_SIZE_TOO_SMALL,
410-
"frame_max=~w < ~w min size",
411-
[FrameMax, ?FRAME_1_0_MIN_SIZE]);
412-
true ->
413-
SendFun =
414-
fun() ->
415-
Frame =
416-
amqp10_binary_generator:build_heartbeat_frame(),
417-
catch rabbit_net:send(Sock, Frame)
418-
end,
419-
420-
Parent = self(),
421-
ReceiveFun =
422-
fun() ->
423-
Parent ! heartbeat_timeout
424-
end,
425-
%% [2.4.5] the value in idle-time-out SHOULD be half the peer's
426-
%% actual timeout threshold
427-
ReceiverHeartbeatSec = lists:min([HeartbeatSec * 2, 4294967]),
428-
%% TODO: only start heartbeat receive timer at next next frame
429-
Heartbeater =
430-
rabbit_heartbeat:start(HelperSupPid, Sock,
431-
ClientHeartbeatSec, SendFun,
432-
ReceiverHeartbeatSec, ReceiveFun),
433-
State#v1{connection_state = running,
434-
connection = Connection#v1_connection{
435-
frame_max = FrameMax,
436-
hostname = Hostname},
437-
heartbeater = Heartbeater}
438-
end,
413+
if (FrameMax =/= unlimited) and (FrameMax < ?FRAME_1_0_MIN_SIZE) ->
414+
protocol_error(?V_1_0_AMQP_ERROR_FRAME_SIZE_TOO_SMALL,
415+
"frame_max=~w < ~w min size",
416+
[FrameMax, ?FRAME_1_0_MIN_SIZE]);
417+
true ->
418+
ok
419+
end,
420+
421+
SendFun = fun() ->
422+
Frame = amqp10_binary_generator:build_heartbeat_frame(),
423+
catch rabbit_net:send(Sock, Frame)
424+
end,
425+
Parent = self(),
426+
ReceiveFun = fun() ->
427+
Parent ! heartbeat_timeout
428+
end,
429+
%% [2.4.5] the value in idle-time-out SHOULD be half the peer's
430+
%% actual timeout threshold
431+
ReceiverHeartbeatSec = lists:min([HeartbeatSec * 2, 4294967]),
432+
%% TODO: only start heartbeat receive timer at next next frame
433+
Heartbeater = rabbit_heartbeat:start(
434+
HelperSupPid, Sock,
435+
ClientHeartbeatSec, SendFun,
436+
ReceiverHeartbeatSec, ReceiveFun),
437+
State1 = State0#v1{connection_state = running,
438+
connection = Connection#v1_connection{
439+
frame_max = FrameMax,
440+
hostname = Hostname},
441+
heartbeater = Heartbeater},
442+
State2 = start_writer(State1),
439443
HostnameVal = case Hostname of
440-
undefined -> undefined;
441-
null -> undefined;
442-
{utf8, Val} -> Val
444+
undefined -> undefined;
445+
null -> undefined;
446+
{utf8, Val} -> Val
443447
end,
444448
rabbit_log:debug("AMQP 1.0 connection.open frame: hostname = ~ts, extracted vhost = ~ts, idle_timeout = ~tp" ,
445-
[HostnameVal, vhost(Hostname), HeartbeatSec * 1000]),
449+
[HostnameVal, vhost(Hostname), HeartbeatSec * 1000]),
446450
%% TODO enforce channel_max
447451
ok = send_on_channel0(
448452
Sock,
@@ -452,20 +456,35 @@ handle_1_0_connection_frame(#'v1_0.open'{ max_frame_size = ClientFrameMax,
452456
container_id = {utf8, rabbit_nodes:cluster_name()},
453457
properties = server_properties()}),
454458
Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
459+
State = State2#v1{throttle = Throttle#throttle{alarmed_by = Conserve}},
455460

456-
Infos = infos(?EVENT_KEYS, State1),
461+
Infos = infos(?EVENT_KEYS, State),
457462
ok = rabbit_core_metrics:connection_created(
458463
proplists:get_value(pid, Infos),
459464
Infos),
460465
ok = rabbit_event:notify(connection_created, Infos),
461466
ok = rabbit_networking:register_connection(self()),
462467

463-
control_throttle(
464-
State1#v1{throttle = Throttle#throttle{alarmed_by = Conserve}});
468+
control_throttle(State);
465469

466470
handle_1_0_connection_frame(_Frame, State) ->
467471
maybe_close(State#v1{connection_state = closing}).
468472

473+
start_writer(#v1{helper_sup = SupPid,
474+
sock = Sock,
475+
connection = #v1_connection{frame_max = FrameMax}} = State) ->
476+
ChildSpec = #{id => writer,
477+
start => {rabbit_amqp1_0_writer, start_link,
478+
[Sock, FrameMax, self()]},
479+
restart => transient,
480+
significant => true,
481+
shutdown => ?WORKER_WAIT,
482+
type => worker,
483+
modules => [rabbit_amqp1_0_writer]
484+
},
485+
{ok, Pid} = supervisor:start_child(SupPid, ChildSpec),
486+
State#v1{writer = Pid}.
487+
469488
handle_1_0_session_frame(Channel, Frame, State) ->
470489
case maps:get(Channel, State#v1.tracked_channels, undefined) of
471490
undefined ->
@@ -640,8 +659,7 @@ send_on_channel0(Sock, Method) ->
640659
send_on_channel0(Sock, Method, amqp10_framing).
641660

642661
send_on_channel0(Sock, Method, Framing) ->
643-
ok = rabbit_amqp1_0_writer:internal_send_command(
644-
Sock, 0, Method, Framing).
662+
ok = rabbit_amqp1_0_writer:internal_send_command(Sock, Method, Framing).
645663

646664
%% End 1-0
647665

@@ -728,23 +746,27 @@ untrack_channel(Channel, State) ->
728746
error -> State
729747
end.
730748

731-
send_to_new_1_0_session(Channel, Frame, State) ->
732-
#v1{sock = Sock,
733-
channel_sup_sup_pid = ChanSupSup,
734-
connection = #v1_connection{frame_max = FrameMax,
735-
hostname = Hostname,
736-
user = User}
737-
} = State,
738-
%% Note: the equivalent, start_channel is in channel_sup_sup
739-
740-
case rabbit_amqp1_0_session_sup_sup:start_session(
741-
%% NB subtract fixed frame header size
742-
ChanSupSup, {amqp10_framing, Sock, Channel,
743-
case FrameMax of
744-
unlimited -> unlimited;
745-
_ -> FrameMax - 8
746-
end,
747-
self(), User, vhost(Hostname)}) of
749+
send_to_new_1_0_session(
750+
Channel, Frame,
751+
#v1{channel_sup_sup_pid = ChanSupSup,
752+
connection = #v1_connection{frame_max = FrameMax0,
753+
hostname = Hostname,
754+
user = User},
755+
writer = WriterPid} = State) ->
756+
%% Subtract fixed frame header size.
757+
FrameMax = case FrameMax0 of
758+
unlimited -> unlimited;
759+
_ -> FrameMax0 - 8
760+
end,
761+
ChildArg = {amqp10_framing,
762+
Channel,
763+
FrameMax,
764+
self(),
765+
WriterPid,
766+
User,
767+
vhost(Hostname)},
768+
%% The equivalent, start_channel is in channel_sup_sup.
769+
case rabbit_amqp1_0_session_sup_sup:start_session(ChanSupSup, ChildArg) of
748770
{ok, _ChSupPid, ChFrPid} ->
749771
erlang:monitor(process, ChFrPid),
750772
ModifiedState = track_channel(Channel, ChFrPid, State),

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_session.erl

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -162,16 +162,17 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
162162

163163
handle_cast({frame, Frame, FlowPid},
164164
#state{reader_pid = ReaderPid,
165-
writer_pid = Sock} = State) ->
165+
writer_pid = WriterPid,
166+
channel_num = Ch} = State) ->
166167
credit_flow:ack(FlowPid),
167168
try handle_control(Frame, State) of
168169
{reply, Replies, NewState} when is_list(Replies) ->
169170
lists:foreach(fun (Reply) ->
170-
rabbit_amqp1_0_writer:send_command(Sock, Reply)
171+
rabbit_amqp1_0_writer:send_command(WriterPid, Ch, Reply)
171172
end, Replies),
172173
{noreply, NewState};
173174
{reply, Reply, NewState} ->
174-
rabbit_amqp1_0_writer:send_command(Sock, Reply),
175+
rabbit_amqp1_0_writer:send_command(WriterPid, Ch, Reply),
175176
{noreply, NewState};
176177
{noreply, _} = NoreplyNewstate ->
177178
NoreplyNewstate;
@@ -182,17 +183,18 @@ handle_cast({frame, Frame, FlowPid},
182183
End = #'v1_0.end'{ error = Reason },
183184
rabbit_log:warning("Closing session for connection ~tp:~n~tp",
184185
[ReaderPid, Reason]),
185-
ok = rabbit_amqp1_0_writer:send_command_sync(Sock, End),
186+
ok = rabbit_amqp1_0_writer:send_command_sync(WriterPid, Ch, End),
186187
{stop, normal, State};
187188
exit:normal ->
188189
{stop, normal, State};
189190
_:Reason:Stacktrace ->
190191
{stop, {Reason, Stacktrace}, State}
191192
end;
192193
handle_cast({queue_event, _, _} = QEvent,
193-
#state{writer_pid = WriterPid} = State0) ->
194+
#state{writer_pid = WriterPid,
195+
channel_num = Ch} = State0) ->
194196
{Reply, State} = handle_queue_event(QEvent, State0),
195-
[rabbit_amqp1_0_writer:send_command(WriterPid, F) ||
197+
[rabbit_amqp1_0_writer:send_command(WriterPid, Ch, F) ||
196198
F <- flow_fields(Reply, State)],
197199
%% TODO noreply_coalesce(State) as done in rabbit_channel to process further msgs
198200
%% from the mailbox before sending the disposition with confirms / rejects
@@ -443,6 +445,7 @@ handle_control(Flow = #'v1_0.flow'{handle = Handle}, State0) ->
443445
handle_control(#'v1_0.detach'{handle = Handle,
444446
closed = Closed},
445447
#state{writer_pid = WriterPid,
448+
channel_num = Ch,
446449
queue_states = QStates0,
447450
vhost = Vhost,
448451
user = #user{username = Username}} = State0) ->
@@ -469,15 +472,16 @@ handle_control(#'v1_0.detach'{handle = Handle,
469472
State0
470473
end
471474
end,
472-
ok = rabbit_amqp1_0_writer:send_command_sync(WriterPid,
473-
#'v1_0.detach'{handle = Handle,
474-
closed = Closed}),
475+
ok = rabbit_amqp1_0_writer:send_command_sync(
476+
WriterPid, Ch, #'v1_0.detach'{handle = Handle,
477+
closed = Closed}),
475478
{noreply, State};
476479

477-
handle_control(#'v1_0.end'{}, #state{writer_pid = Sock,
480+
handle_control(#'v1_0.end'{}, #state{writer_pid = WriterPid,
481+
channel_num = Ch,
478482
queue_states = QStates}) ->
479483
ok = rabbit_queue_type:close(QStates),
480-
ok = rabbit_amqp1_0_writer:send_command_sync(Sock, #'v1_0.end'{}),
484+
ok = rabbit_amqp1_0_writer:send_command_sync(WriterPid, Ch, #'v1_0.end'{}),
481485
stop;
482486

483487
handle_control(Frame, _State) ->
@@ -488,6 +492,7 @@ handle_control(Frame, _State) ->
488492
send_pending_transfers(#state{outgoing_window = LocalSpace,
489493
remote_incoming_window = RemoteSpace,
490494
writer_pid = WriterPid,
495+
channel_num = Ch,
491496
pending_transfers = Buf0,
492497
queue_states = QStates} = State0)
493498
when RemoteSpace > 0 andalso LocalSpace > 0 ->
@@ -506,12 +511,12 @@ send_pending_transfers(#state{outgoing_window = LocalSpace,
506511
{ok, rabbit_classic_queue} ->
507512
fun(T, C) ->
508513
rabbit_amqp1_0_writer:send_command_and_notify(
509-
WriterPid, QPid, self(), T, C)
514+
WriterPid, Ch, QPid, self(), T, C)
510515
end;
511516
{ok, _QType} ->
512517
fun(T, C) ->
513518
rabbit_amqp1_0_writer:send_command(
514-
WriterPid, T, C)
519+
WriterPid, Ch, T, C)
515520
end
516521
end,
517522
%% rabbit_basic:maybe_gc_large_msg(Content, GCThreshold)
@@ -532,10 +537,11 @@ send_pending_transfers(#state{outgoing_window = LocalSpace,
532537
end
533538
end;
534539
send_pending_transfers(#state{remote_incoming_window = RemoteSpace,
535-
writer_pid = WriterPid} = State0)
540+
writer_pid = WriterPid,
541+
channel_num = Ch} = State0)
536542
when RemoteSpace > 0 ->
537543
{Flow = #'v1_0.flow'{}, State} = bump_outgoing_window(State0),
538-
rabbit_amqp1_0_writer:send_command(WriterPid, flow_fields(Flow, State)),
544+
rabbit_amqp1_0_writer:send_command(WriterPid, Ch, flow_fields(Flow, State)),
539545
send_pending_transfers(State);
540546
send_pending_transfers(State) ->
541547
State.

0 commit comments

Comments
 (0)