Skip to content

Commit db22569

Browse files
committed
Remove erlang:port_command/2 hack
as selective receives are efficient in OTP 26: ``` OTP-18431 Application(s): compiler, stdlib Related Id(s): PR-6739 Improved the selective receive optimization, which can now be enabled for references returned from other functions. This greatly improves the performance of gen_server:send_request/3, gen_server:wait_response/2, and similar functions. ```
1 parent 7a10b26 commit db22569

File tree

7 files changed

+49
-133
lines changed

7 files changed

+49
-133
lines changed

deps/rabbit_common/src/rabbit_net.erl

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
-include_lib("kernel/include/net_address.hrl").
1212

1313
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
14-
recv/1, sync_recv/2, async_recv/3, port_command/2, getopts/2,
14+
recv/1, sync_recv/2, async_recv/3, getopts/2,
1515
setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1,
1616
peercert/1, connection_string/2, socket_ends/2, is_loopback/1,
1717
tcp_host/1, unwrap_socket/1, maybe_get_proxy_socket/1,
@@ -50,15 +50,14 @@
5050
rabbit_types:error(any()).
5151
-spec async_recv(socket(), integer(), timeout()) ->
5252
rabbit_types:ok(any()).
53-
-spec port_command(socket(), iolist()) -> 'true'.
5453
-spec getopts
5554
(socket(),
5655
[atom() |
5756
{raw, non_neg_integer(), non_neg_integer(),
5857
non_neg_integer() | binary()}]) ->
5958
ok_val_or_error(opts()).
6059
-spec setopts(socket(), opts()) -> ok_or_any_error().
61-
-spec send(socket(), binary() | iolist()) -> ok_or_any_error().
60+
-spec send(socket(), iodata()) -> ok_or_any_error().
6261
-spec close(socket()) -> ok_or_any_error().
6362
-spec fast_close(socket()) -> ok_or_any_error().
6463
-spec sockname(socket()) ->
@@ -161,40 +160,6 @@ async_recv(Sock, Length, infinity) when is_port(Sock) ->
161160
async_recv(Sock, Length, Timeout) when is_port(Sock) ->
162161
prim_inet:async_recv(Sock, Length, Timeout).
163162

164-
port_command(Sock, Data) when ?IS_SSL(Sock) ->
165-
case ssl:send(Sock, Data) of
166-
ok -> self() ! {inet_reply, Sock, ok},
167-
true;
168-
{error, Reason} -> erlang:error(Reason)
169-
end;
170-
port_command(Sock, Data) when is_port(Sock) ->
171-
Fun = case persistent_term:get(rabbit_net_tcp_send, undefined) of
172-
undefined ->
173-
Rel = list_to_integer(erlang:system_info(otp_release)),
174-
%% gen_tcp:send/2 does a selective receive of
175-
%% {inet_reply, Sock, Status[, CallerTag]}
176-
F = if Rel >= 26 ->
177-
%% Selective receive is optimised:
178-
%% https://github.com/erlang/otp/issues/6455
179-
fun gen_tcp_send/2;
180-
Rel < 26 ->
181-
%% Avoid costly selective receive.
182-
fun erlang:port_command/2
183-
end,
184-
ok = persistent_term:put(rabbit_net_tcp_send, F),
185-
F;
186-
F ->
187-
F
188-
end,
189-
Fun(Sock, Data).
190-
191-
gen_tcp_send(Sock, Data) ->
192-
case gen_tcp:send(Sock, Data) of
193-
ok -> self() ! {inet_reply, Sock, ok},
194-
true;
195-
{error, Reason} -> erlang:error(Reason)
196-
end.
197-
198163
getopts(Sock, Options) when ?IS_SSL(Sock) ->
199164
ssl:getopts(Sock, Options);
200165
getopts(Sock, Options) when is_port(Sock) ->

deps/rabbit_common/src/rabbit_writer.erl

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -265,10 +265,6 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
265265
handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
266266
rabbit_amqqueue_common:notify_sent_queue_down(QPid),
267267
State;
268-
handle_message({inet_reply, _, ok}, State) ->
269-
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
270-
handle_message({inet_reply, _, Status}, _State) ->
271-
exit({writer, send_failed, Status});
272268
handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
273269
ReaderPid ! ensure_stats,
274270
rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
@@ -381,33 +377,15 @@ maybe_flush(State = #wstate{pending = Pending}) ->
381377

382378
internal_flush(State = #wstate{pending = []}) ->
383379
State;
384-
internal_flush(State = #wstate{sock = Sock, pending = Pending}) ->
385-
ok = port_cmd(Sock, lists:reverse(Pending)),
386-
State#wstate{pending = []}.
387-
388-
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
389-
%% Status} to obtain the result. That is bad when it is called from
390-
%% the writer since it requires scanning of the writers possibly quite
391-
%% large message queue.
392-
%%
393-
%% So instead we lift the code from prim_inet:send/2, which is what
394-
%% gen_tcp:send/2 calls, do the first half here and then just process
395-
%% the result code in handle_message/2 as and when it arrives.
396-
%%
397-
%% This means we may end up happily sending data down a closed/broken
398-
%% socket, but that's ok since a) data in the buffers will be lost in
399-
%% any case (so qualitatively we are no worse off than if we used
400-
%% gen_tcp:send/2), and b) we do detect the changed socket status
401-
%% eventually, i.e. when we get round to handling the result code.
402-
%%
403-
%% Also note that the port has bounded buffers and port_command blocks
404-
%% when these are full. So the fact that we process the result
405-
%% asynchronously does not impact flow control.
406-
port_cmd(Sock, Data) ->
407-
true = try rabbit_net:port_command(Sock, Data)
408-
catch error:Error -> exit({writer, send_failed, Error})
409-
end,
410-
ok.
380+
internal_flush(State0 = #wstate{sock = Sock, pending = Pending}) ->
381+
case rabbit_net:send(Sock, lists:reverse(Pending)) of
382+
ok ->
383+
ok;
384+
{error, Reason} ->
385+
exit({writer, send_failed, Reason})
386+
end,
387+
State = State0#wstate{pending = []},
388+
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats).
411389

412390
%% Some processes (channel, writer) can get huge amounts of binary
413391
%% garbage when processing huge messages at high speed (since we only

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,6 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
142142
handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
143143
rabbit_amqqueue:notify_sent_queue_down(QPid),
144144
State;
145-
handle_message({inet_reply, _, ok}, State) ->
146-
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
147-
handle_message({inet_reply, _, Status}, _State) ->
148-
exit({writer, send_failed, Status});
149145
handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
150146
ReaderPid ! ensure_stats,
151147
rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
@@ -251,30 +247,12 @@ maybe_flush(State = #wstate{pending = Pending}) ->
251247

252248
flush(State = #wstate{pending = []}) ->
253249
State;
254-
flush(State = #wstate{sock = Sock, pending = Pending}) ->
255-
ok = port_cmd(Sock, lists:reverse(Pending)),
256-
State#wstate{pending = []}.
257-
258-
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
259-
%% Status} to obtain the result. That is bad when it is called from
260-
%% the writer since it requires scanning of the writers possibly quite
261-
%% large message queue.
262-
%%
263-
%% So instead we lift the code from prim_inet:send/2, which is what
264-
%% gen_tcp:send/2 calls, do the first half here and then just process
265-
%% the result code in handle_message/2 as and when it arrives.
266-
%%
267-
%% This means we may end up happily sending data down a closed/broken
268-
%% socket, but that's ok since a) data in the buffers will be lost in
269-
%% any case (so qualitatively we are no worse off than if we used
270-
%% gen_tcp:send/2), and b) we do detect the changed socket status
271-
%% eventually, i.e. when we get round to handling the result code.
272-
%%
273-
%% Also note that the port has bounded buffers and port_command blocks
274-
%% when these are full. So the fact that we process the result
275-
%% asynchronously does not impact flow control.
276-
port_cmd(Sock, Data) ->
277-
true = try rabbit_net:port_command(Sock, Data)
278-
catch error:Error -> exit({writer, send_failed, Error})
279-
end,
280-
ok.
250+
flush(State0 = #wstate{sock = Sock, pending = Pending}) ->
251+
case rabbit_net:send(Sock, lists:reverse(Pending)) of
252+
ok ->
253+
ok;
254+
{error, Reason} ->
255+
exit({writer, send_failed, Reason})
256+
end,
257+
State = State0#wstate{pending = []},
258+
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats).

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -184,12 +184,6 @@ handle_info({Tag, Sock, Reason}, State = #state{socket = Sock})
184184
when Tag =:= tcp_error; Tag =:= ssl_error ->
185185
network_error(Reason, State);
186186

187-
handle_info({inet_reply, Sock, ok}, State = #state{socket = Sock}) ->
188-
{noreply, State, ?HIBERNATE_AFTER};
189-
190-
handle_info({inet_reply, Sock, {error, Reason}}, State = #state{socket = Sock}) ->
191-
network_error(Reason, State);
192-
193187
handle_info({conserve_resources, Conserve}, State) ->
194188
maybe_process_deferred_recv(
195189
control_throttle(State #state{ conserve = Conserve }));
@@ -325,14 +319,16 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
325319
case ProcState of
326320
connect_packet_unprocessed ->
327321
Send = fun(Data) ->
328-
try rabbit_net:port_command(Socket, Data)
329-
catch error:Error ->
330-
?LOG_ERROR("writing to MQTT socket ~p failed: ~p",
331-
[Socket, Error])
332-
end,
333-
ok
322+
case rabbit_net:send(Socket, Data) of
323+
ok ->
324+
ok;
325+
{error, Reason} ->
326+
?LOG_ERROR("writing to MQTT socket ~p failed: ~p",
327+
[Socket, Reason]),
328+
exit({send_failed, Reason})
329+
end
334330
end,
335-
case rabbit_mqtt_processor:init(Packet, Socket, ConnName, Send) of
331+
try rabbit_mqtt_processor:init(Packet, Socket, ConnName, Send) of
336332
{ok, ProcState1} ->
337333
?LOG_INFO("Accepted MQTT connection ~ts for client ID ~ts",
338334
[ConnName, rabbit_mqtt_processor:info(client_id, ProcState1)]),
@@ -348,9 +344,11 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
348344
?LOG_ERROR("Rejected MQTT connection ~ts with CONNACK return code ~p",
349345
[ConnName, ConnAckReturnCode]),
350346
{stop, shutdown, {_SendWill = false, State}}
347+
catch exit:{send_failed, Reason} ->
348+
network_error(Reason, State)
351349
end;
352350
_ ->
353-
case rabbit_mqtt_processor:process_packet(Packet, ProcState) of
351+
try rabbit_mqtt_processor:process_packet(Packet, ProcState) of
354352
{ok, ProcState1} ->
355353
process_received_bytes(
356354
Rest,
@@ -364,6 +362,8 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
364362
{stop, {shutdown, Reason}, pstate(State, ProcState1)};
365363
{stop, disconnect, ProcState1} ->
366364
{stop, normal, {_SendWill = false, pstate(State, ProcState1)}}
365+
catch exit:{send_failed, Reason} ->
366+
network_error(Reason, State)
367367
end
368368
end;
369369
{error, {cannot_parse, Reason, Stacktrace}} ->

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ adapter_name(State) ->
4646
#stomp_configuration{},
4747
{SendFun, AdapterInfo, SSLLoginName, PeerAddr})
4848
-> #proc_state{}
49-
when SendFun :: fun((atom(), binary()) -> term()),
49+
when SendFun :: fun((binary()) -> term()),
5050
AdapterInfo :: #amqp_adapter_info{},
5151
SSLLoginName :: atom() | binary(),
5252
PeerAddr :: inet:ip_address().
@@ -1174,7 +1174,7 @@ send_frame(Command, Headers, BodyFragments, State) ->
11741174

11751175
send_frame(Frame, State = #proc_state{send_fun = SendFun,
11761176
trailing_lf = TrailingLF}) ->
1177-
SendFun(async, rabbit_stomp_frame:serialize(Frame, TrailingLF)),
1177+
SendFun(rabbit_stomp_frame:serialize(Frame, TrailingLF)),
11781178
State.
11791179

11801180
send_error_frame(Message, ExtraHeaders, Format, Args, State) ->

deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,6 @@ handle_info({Tag, Sock}, State=#reader_state{socket=Sock})
125125
handle_info({Tag, Sock, Reason}, State=#reader_state{socket=Sock})
126126
when Tag =:= tcp_error; Tag =:= ssl_error ->
127127
{stop, {inet_error, Reason}, State};
128-
handle_info({inet_reply, _Sock, {error, closed}}, State) ->
129-
{stop, normal, State};
130-
handle_info({inet_reply, _, ok}, State) ->
131-
{noreply, State, hibernate};
132-
handle_info({inet_reply, _, Status}, State) ->
133-
{stop, Status, State};
134128
handle_info(emit_stats, State) ->
135129
{noreply, emit_stats(State), hibernate};
136130
handle_info({conserve_resources, Conserve}, State) ->
@@ -228,7 +222,7 @@ process_received_bytes(Bytes,
228222
{more, ParseState1} ->
229223
{ok, State#reader_state{parse_state = ParseState1}};
230224
{ok, Frame, Rest} ->
231-
case rabbit_stomp_processor:process_frame(Frame, ProcState) of
225+
try rabbit_stomp_processor:process_frame(Frame, ProcState) of
232226
{ok, NewProcState, Conn} ->
233227
PS = rabbit_stomp_frame:initial_state(),
234228
NextState = maybe_block(State, Frame),
@@ -239,6 +233,10 @@ process_received_bytes(Bytes,
239233
{stop, Reason, NewProcState} ->
240234
{stop, Reason,
241235
processor_state(NewProcState, State)}
236+
catch exit:{send_failed, closed} ->
237+
{stop, normal, State};
238+
exit:{send_failed, Reason} ->
239+
{stop, Reason, State}
242240
end;
243241
{error, Reason} ->
244242
%% The parser couldn't parse data. We log the reason right
@@ -371,16 +369,13 @@ log_tls_alert(Alert, ConnName) ->
371369

372370
processor_args(Configuration, Sock) ->
373371
RealSocket = rabbit_net:unwrap_socket(Sock),
374-
SendFun = fun (sync, IoData) ->
375-
%% no messages emitted
376-
catch rabbit_net:send(RealSocket, IoData);
377-
(async, IoData) ->
378-
%% {inet_reply, _, _} will appear soon
379-
%% We ignore certain errors here, as we will be
380-
%% receiving an asynchronous notification of the
381-
%% same (or a related) fault shortly anyway. See
382-
%% bug 21365.
383-
catch rabbit_net:port_command(RealSocket, IoData)
372+
SendFun = fun(IoData) ->
373+
case rabbit_net:send(RealSocket, IoData) of
374+
ok ->
375+
ok;
376+
{error, Reason} ->
377+
exit({send_failed, Reason})
378+
end
384379
end,
385380
{ok, {PeerAddr, _PeerPort}} = rabbit_net:sockname(RealSocket),
386381
{SendFun, adapter_info(Sock),

deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ close_connection(Pid, Reason) ->
110110

111111
init_processor_state(#state{socket=Sock, peername=PeerAddr, auth_hd=AuthHd}) ->
112112
Self = self(),
113-
SendFun = fun (_Sync, Data) ->
113+
SendFun = fun(Data) ->
114114
Self ! {send, Data},
115115
ok
116116
end,

0 commit comments

Comments
 (0)