Skip to content

Commit 0c557a6

Browse files
committed
Transform rabbit_amqp1_0_writer into gen_server
Why: Prior to this commit, when clicking on the AMQP 1.0 writer process in observer, the process crashed. Instead of handling all these debug messages of the sys module, it's much better to implement a gen_server. There is no advantage of using a special OTP process over gen_server for the AMQP 1.0 writer. gen_server also provides cleaner format status output. How: Message callbacks return a timeout of 0. After all messages in the inbox are processed, the timeout message is handled by flushing any pending bytes.
1 parent 57bbbf4 commit 0c557a6

File tree

1 file changed

+130
-110
lines changed

1 file changed

+130
-110
lines changed

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl

Lines changed: 130 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -6,176 +6,196 @@
66
%%
77

88
-module(rabbit_amqp1_0_writer).
9-
-include("rabbit_amqp1_0.hrl").
9+
-behaviour(gen_server).
1010

11-
-export([start_link/3]).
11+
-include("rabbit_amqp1_0.hrl").
1212

13-
-export([send_command/3,
13+
%% client API
14+
-export([start_link/3,
15+
send_command/3,
1416
send_command/4,
1517
send_command_sync/3,
1618
send_command_and_notify/6,
1719
internal_send_command/3]).
1820

19-
%% internal
20-
-export([mainloop/1, mainloop1/1]).
21+
%% gen_server callbacks
22+
-export([init/1,
23+
handle_call/3,
24+
handle_cast/2,
25+
handle_info/2,
26+
format_status/1]).
2127

22-
-record(wstate, {
28+
-record(state, {
2329
sock :: rabbit_net:socket(),
2430
frame_max,
2531
reader :: pid(),
2632
stats_timer,
27-
pending}).
33+
pending :: iolist()
34+
}).
2835

2936
-define(HIBERNATE_AFTER, 6_000).
37+
-define(CALL_TIMEOUT, 300_000).
3038
-define(AMQP_SASL_FRAME_TYPE, 1).
3139

40+
%%%%%%%%%%%%%%%%%%
41+
%%% client API %%%
42+
%%%%%%%%%%%%%%%%%%
43+
3244
-spec start_link (rabbit_net:socket(), non_neg_integer(), pid()) ->
3345
rabbit_types:ok(pid()).
3446
start_link(Sock, FrameMax, ReaderPid) ->
35-
State = initial_state(Sock, FrameMax, ReaderPid),
36-
{ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}.
47+
Args = {Sock, FrameMax, ReaderPid},
48+
Opts = [{hibernate_after, ?HIBERNATE_AFTER}],
49+
gen_server:start_link(?MODULE, Args, Opts).
3750

3851
-spec send_command(pid(),
3952
rabbit_types:channel_number(),
40-
rabbit_framing:amqp_method_record()) -> 'ok'.
41-
send_command(W, Ch, MethodRecord) ->
42-
W ! {send_command, Ch, MethodRecord},
43-
ok.
53+
rabbit_framing:amqp_method_record()) -> ok.
54+
send_command(Writer, ChannelNum, MethodRecord) ->
55+
Request = {send_command, ChannelNum, MethodRecord},
56+
gen_server:cast(Writer, Request).
4457

4558
-spec send_command(pid(),
4659
rabbit_types:channel_number(),
4760
rabbit_framing:amqp_method_record(),
48-
rabbit_types:content()) -> 'ok'.
49-
send_command(W, Ch, MethodRecord, Content) ->
50-
W ! {send_command, Ch, MethodRecord, Content},
51-
ok.
61+
rabbit_types:content()) -> ok.
62+
send_command(Writer, ChannelNum, MethodRecord, Content) ->
63+
Request = {send_command, ChannelNum, MethodRecord, Content},
64+
gen_server:cast(Writer, Request).
5265

5366
-spec send_command_sync(pid(),
5467
rabbit_types:channel_number(),
55-
rabbit_framing:amqp_method_record()) -> 'ok'.
56-
send_command_sync(W, Ch, MethodRecord) ->
57-
call(W, {send_command_sync, Ch, MethodRecord}).
68+
rabbit_framing:amqp_method_record()) -> ok.
69+
send_command_sync(Writer, ChannelNum, MethodRecord) ->
70+
Request = {send_command, ChannelNum, MethodRecord},
71+
gen_server:call(Writer, Request, ?CALL_TIMEOUT).
5872

5973
-spec send_command_and_notify(pid(),
6074
rabbit_types:channel_number(),
6175
pid(),
6276
pid(),
6377
rabbit_framing:amqp_method_record(),
64-
rabbit_types:content()) -> 'ok'.
65-
send_command_and_notify(W, Ch, Q, SessionPid, MethodRecord, Content) ->
66-
W ! {send_command_and_notify, Ch, Q, SessionPid, MethodRecord, Content},
67-
ok.
78+
rabbit_types:content()) -> ok.
79+
send_command_and_notify(Writer, ChannelNum, QueuePid, SessionPid, MethodRecord, Content) ->
80+
Request = {send_command_and_notify, ChannelNum, QueuePid, SessionPid, MethodRecord, Content},
81+
gen_server:cast(Writer, Request).
6882

6983
-spec internal_send_command(rabbit_net:socket(),
7084
rabbit_framing:amqp_method_record(),
71-
'amqp10_framing' | 'rabbit_amqp1_0_sasl') -> 'ok'.
85+
amqp10_framing | rabbit_amqp1_0_sasl) -> ok.
7286
internal_send_command(Sock, MethodRecord, Protocol) ->
73-
ok = tcp_send(Sock, assemble_frame(0, MethodRecord, Protocol)).
87+
Data = assemble_frame(0, MethodRecord, Protocol),
88+
ok = tcp_send(Sock, Data).
7489

75-
call(Pid, Msg) ->
76-
{ok, Res} = gen:call(Pid, '$gen_call', Msg, infinity),
77-
Res.
90+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
91+
%%% gen_server callbacks %%%
92+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%
7893

79-
initial_state(Sock, FrameMax, ReaderPid) ->
80-
State = #wstate{sock = Sock,
94+
init({Sock, FrameMax, ReaderPid}) ->
95+
State0 = #state{sock = Sock,
8196
frame_max = FrameMax,
8297
reader = ReaderPid,
8398
pending = []},
8499
%%TODO check stats_timer: When is it enabled and needs rabbit_event:init_stats_timer/2
85-
rabbit_event:init_disabled_stats_timer(State, #wstate.stats_timer).
86-
87-
mainloop(State) ->
88-
% try
89-
mainloop1(State),
90-
%%TODO handle writer failures properly
91-
% catch
92-
% exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
93-
% ReaderPid ! {channel_exit, Channel, Error}
94-
% end,
95-
done.
96-
97-
mainloop1(State = #wstate{pending = []}) ->
98-
receive
99-
Message -> ?MODULE:mainloop1(handle_message(Message, State))
100-
after ?HIBERNATE_AFTER ->
101-
erlang:hibernate(?MODULE, mainloop, [State])
102-
end;
103-
mainloop1(State) ->
104-
receive
105-
Message -> ?MODULE:mainloop1(handle_message(Message, State))
106-
after 0 ->
107-
?MODULE:mainloop1(flush(State))
108-
end.
109-
110-
handle_message({send_command, Ch, MethodRecord}, State) ->
111-
internal_send_command_async(Ch, MethodRecord, State);
112-
handle_message({send_command, Ch, MethodRecord, Content}, State) ->
113-
internal_send_command_async(Ch, MethodRecord, Content, State);
114-
handle_message({'$gen_call', From, {send_command_sync, Ch, MethodRecord}}, State) ->
115-
State1 = flush(internal_send_command_async(Ch, MethodRecord, State)),
116-
gen_server:reply(From, ok),
117-
State1;
118-
handle_message({send_command_and_notify, Ch, QPid, SessionPid, MethodRecord, Content}, State) ->
119-
State1 = internal_send_command_async(Ch, MethodRecord, Content, State),
120-
rabbit_amqqueue:notify_sent(QPid, SessionPid),
121-
State1;
122-
handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
123-
rabbit_amqqueue:notify_sent_queue_down(QPid),
124-
State;
125-
handle_message({inet_reply, _, ok}, State) ->
126-
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
127-
handle_message({inet_reply, _, Status}, _State) ->
100+
State = rabbit_event:init_disabled_stats_timer(State0, #state.stats_timer),
101+
{ok, State}.
102+
103+
handle_cast({send_command, ChannelNum, MethodRecord}, State0) ->
104+
State = internal_send_command_async(ChannelNum, MethodRecord, State0),
105+
no_reply(State);
106+
handle_cast({send_command, ChannelNum, MethodRecord, Content}, State0) ->
107+
State = internal_send_command_async(ChannelNum, MethodRecord, Content, State0),
108+
no_reply(State);
109+
handle_cast({send_command_and_notify, ChannelNum, QueuePid, SessionPid, MethodRecord, Content}, State0) ->
110+
State = internal_send_command_async(ChannelNum, MethodRecord, Content, State0),
111+
rabbit_amqqueue:notify_sent(QueuePid, SessionPid),
112+
no_reply(State).
113+
114+
handle_call({send_command, ChannelNum, MethodRecord}, _From, State0) ->
115+
State1 = internal_send_command_async(ChannelNum, MethodRecord, State0),
116+
State = flush(State1),
117+
{reply, ok, State}.
118+
119+
handle_info(timeout, State0) ->
120+
State = flush(State0),
121+
{noreply, State};
122+
handle_info({inet_reply, _, ok}, State0) ->
123+
State = rabbit_event:ensure_stats_timer(State0, #state.stats_timer, emit_stats),
124+
no_reply(State);
125+
handle_info({inet_reply, _, Status}, _State) ->
128126
exit({writer, send_failed, Status});
129-
handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
127+
handle_info(emit_stats, State0 = #state{reader = ReaderPid}) ->
130128
ReaderPid ! ensure_stats,
131-
rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
132-
handle_message(Message, _State) ->
133-
exit({writer, message_not_understood, Message}).
129+
State = rabbit_event:reset_stats_timer(State0, #state.stats_timer),
130+
no_reply(State);
131+
handle_info({'DOWN', _MRef, process, QueuePid, _Reason}, State) ->
132+
rabbit_amqqueue:notify_sent_queue_down(QueuePid),
133+
no_reply(State).
134+
135+
format_status(Status) ->
136+
maps:update_with(
137+
state,
138+
fun(#state{sock = Sock,
139+
frame_max = FrameMax,
140+
reader = Reader,
141+
stats_timer = _,
142+
pending = Pending}) ->
143+
#{socket => Sock,
144+
frame_max => FrameMax,
145+
reader => Reader,
146+
pending_bytes => iolist_size(Pending)}
147+
end,
148+
Status).
149+
150+
%%%%%%%%%%%%%%%
151+
%%% Helpers %%%
152+
%%%%%%%%%%%%%%%
153+
154+
no_reply(State) ->
155+
{noreply, State, 0}.
134156

135-
%% Begin 1-0
136-
137-
assemble_frame(Channel, Performative) ->
138-
assemble_frame(Channel, Performative, amqp10_framing).
157+
internal_send_command_async(Channel, MethodRecord,
158+
State = #state{pending = Pending}) ->
159+
Frame = assemble_frame(Channel, MethodRecord),
160+
maybe_flush(State#state{pending = [Frame | Pending]}).
139161

140-
assemble_frame(Channel, Performative, amqp10_framing) ->
141-
?DEBUG("Channel ~tp <-~n~tp",
142-
[Channel, amqp10_framing:pprint(Performative)]),
143-
PerfBin = amqp10_framing:encode_bin(Performative),
144-
amqp10_binary_generator:build_frame(Channel, PerfBin);
145-
assemble_frame(Channel, Performative, rabbit_amqp1_0_sasl) ->
146-
?DEBUG("Channel ~tp <-~n~tp",
147-
[Channel, amqp10_framing:pprint(Performative)]),
148-
PerfBin = amqp10_framing:encode_bin(Performative),
149-
amqp10_binary_generator:build_frame(Channel, ?AMQP_SASL_FRAME_TYPE, PerfBin).
162+
internal_send_command_async(Channel, MethodRecord, Content,
163+
State = #state{frame_max = FrameMax,
164+
pending = Pending}) ->
165+
Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax),
166+
maybe_flush(State#state{pending = [Frames | Pending]}).
150167

151168
%% Note: a transfer record can be followed by a number of other
152169
%% records to make a complete frame but unlike 0-9-1 we may have many
153170
%% content records. However, that's already been handled for us, we're
154171
%% just sending a chunk, so from this perspective it's just a binary.
155172

173+
%%TODO respect FrameMax
156174
assemble_frames(Channel, Performative, Content, _FrameMax) ->
157175
?DEBUG("Channel ~tp <-~n~tp~n followed by ~tp bytes of content",
158176
[Channel, amqp10_framing:pprint(Performative),
159177
iolist_size(Content)]),
160178
PerfBin = amqp10_framing:encode_bin(Performative),
161179
amqp10_binary_generator:build_frame(Channel, [PerfBin, Content]).
162180

163-
%% End 1-0
164-
165-
tcp_send(Sock, Data) ->
166-
rabbit_misc:throw_on_error(inet_error,
167-
fun () -> rabbit_net:send(Sock, Data) end).
181+
assemble_frame(Channel, Performative) ->
182+
assemble_frame(Channel, Performative, amqp10_framing).
168183

169-
internal_send_command_async(Channel, MethodRecord,
170-
State = #wstate{pending = Pending}) ->
171-
Frame = assemble_frame(Channel, MethodRecord),
172-
maybe_flush(State#wstate{pending = [Frame | Pending]}).
184+
assemble_frame(Channel, Performative, amqp10_framing) ->
185+
?DEBUG("Channel ~tp <-~n~tp",
186+
[Channel, amqp10_framing:pprint(Performative)]),
187+
PerfBin = amqp10_framing:encode_bin(Performative),
188+
amqp10_binary_generator:build_frame(Channel, PerfBin);
189+
assemble_frame(Channel, Performative, rabbit_amqp1_0_sasl) ->
190+
?DEBUG("Channel ~tp <-~n~tp",
191+
[Channel, amqp10_framing:pprint(Performative)]),
192+
PerfBin = amqp10_framing:encode_bin(Performative),
193+
amqp10_binary_generator:build_frame(Channel, ?AMQP_SASL_FRAME_TYPE, PerfBin).
173194

174-
internal_send_command_async(Channel, MethodRecord, Content,
175-
State = #wstate{frame_max = FrameMax,
176-
pending = Pending}) ->
177-
Frames = assemble_frames(Channel, MethodRecord, Content, FrameMax),
178-
maybe_flush(State#wstate{pending = [Frames | Pending]}).
195+
tcp_send(Sock, Data) ->
196+
rabbit_misc:throw_on_error(
197+
inet_error,
198+
fun() -> rabbit_net:send(Sock, Data) end).
179199

180200
%% This magic number is the tcp-over-ethernet MSS (1460) minus the
181201
%% minimum size of a AMQP basic.deliver method frame (24) plus basic
@@ -184,17 +204,17 @@ internal_send_command_async(Channel, MethodRecord, Content,
184204
%% TODO doesn't make sense for AMQP 1.0
185205
-define(FLUSH_THRESHOLD, 1414).
186206

187-
maybe_flush(State = #wstate{pending = Pending}) ->
207+
maybe_flush(State = #state{pending = Pending}) ->
188208
case iolist_size(Pending) >= ?FLUSH_THRESHOLD of
189209
true -> flush(State);
190210
false -> State
191211
end.
192212

193-
flush(State = #wstate{pending = []}) ->
213+
flush(State = #state{pending = []}) ->
194214
State;
195-
flush(State = #wstate{sock = Sock, pending = Pending}) ->
215+
flush(State = #state{sock = Sock, pending = Pending}) ->
196216
ok = port_cmd(Sock, lists:reverse(Pending)),
197-
State#wstate{pending = []}.
217+
State#state{pending = []}.
198218

199219
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
200220
%% Status} to obtain the result. That is bad when it is called from

0 commit comments

Comments
 (0)