Skip to content

Commit cad067f

Browse files
committed
Remove erlang:port_command/2 hack
as selective receives are efficient in OTP 26: ``` OTP-18431 Application(s): compiler, stdlib Related Id(s): PR-6739 Improved the selective receive optimization, which can now be enabled for references returned from other functions. This greatly improves the performance of gen_server:send_request/3, gen_server:wait_response/2, and similar functions. ```
1 parent 61557d0 commit cad067f

File tree

7 files changed

+43
-132
lines changed

7 files changed

+43
-132
lines changed

deps/rabbit_common/src/rabbit_net.erl

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
-include_lib("kernel/include/net_address.hrl").
1212

1313
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
14-
recv/1, sync_recv/2, async_recv/3, port_command/2, getopts/2,
14+
recv/1, sync_recv/2, async_recv/3, getopts/2,
1515
setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1,
1616
peercert/1, connection_string/2, socket_ends/2, is_loopback/1,
1717
tcp_host/1, unwrap_socket/1, maybe_get_proxy_socket/1,
@@ -50,15 +50,14 @@
5050
rabbit_types:error(any()).
5151
-spec async_recv(socket(), integer(), timeout()) ->
5252
rabbit_types:ok(any()).
53-
-spec port_command(socket(), iolist()) -> 'true'.
5453
-spec getopts
5554
(socket(),
5655
[atom() |
5756
{raw, non_neg_integer(), non_neg_integer(),
5857
non_neg_integer() | binary()}]) ->
5958
ok_val_or_error(opts()).
6059
-spec setopts(socket(), opts()) -> ok_or_any_error().
61-
-spec send(socket(), binary() | iolist()) -> ok_or_any_error().
60+
-spec send(socket(), iodata()) -> ok_or_any_error().
6261
-spec close(socket()) -> ok_or_any_error().
6362
-spec fast_close(socket()) -> ok_or_any_error().
6463
-spec sockname(socket()) ->
@@ -161,40 +160,6 @@ async_recv(Sock, Length, infinity) when is_port(Sock) ->
161160
async_recv(Sock, Length, Timeout) when is_port(Sock) ->
162161
prim_inet:async_recv(Sock, Length, Timeout).
163162

164-
port_command(Sock, Data) when ?IS_SSL(Sock) ->
165-
case ssl:send(Sock, Data) of
166-
ok -> self() ! {inet_reply, Sock, ok},
167-
true;
168-
{error, Reason} -> erlang:error(Reason)
169-
end;
170-
port_command(Sock, Data) when is_port(Sock) ->
171-
Fun = case persistent_term:get(rabbit_net_tcp_send, undefined) of
172-
undefined ->
173-
Rel = list_to_integer(erlang:system_info(otp_release)),
174-
%% gen_tcp:send/2 does a selective receive of
175-
%% {inet_reply, Sock, Status[, CallerTag]}
176-
F = if Rel >= 26 ->
177-
%% Selective receive is optimised:
178-
%% https://github.com/erlang/otp/issues/6455
179-
fun gen_tcp_send/2;
180-
Rel < 26 ->
181-
%% Avoid costly selective receive.
182-
fun erlang:port_command/2
183-
end,
184-
ok = persistent_term:put(rabbit_net_tcp_send, F),
185-
F;
186-
F ->
187-
F
188-
end,
189-
Fun(Sock, Data).
190-
191-
gen_tcp_send(Sock, Data) ->
192-
case gen_tcp:send(Sock, Data) of
193-
ok -> self() ! {inet_reply, Sock, ok},
194-
true;
195-
{error, Reason} -> erlang:error(Reason)
196-
end.
197-
198163
getopts(Sock, Options) when ?IS_SSL(Sock) ->
199164
ssl:getopts(Sock, Options);
200165
getopts(Sock, Options) when is_port(Sock) ->

deps/rabbit_common/src/rabbit_writer.erl

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,6 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
268268
handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
269269
rabbit_amqqueue_common:notify_sent_queue_down(QPid),
270270
State;
271-
handle_message({inet_reply, _, ok}, State) ->
272-
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
273-
handle_message({inet_reply, _, Status}, _State) ->
274-
exit({writer, send_failed, Status});
275271
handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
276272
ReaderPid ! ensure_stats,
277273
rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
@@ -384,33 +380,15 @@ maybe_flush(State = #wstate{pending = Pending}) ->
384380

385381
internal_flush(State = #wstate{pending = []}) ->
386382
State;
387-
internal_flush(State = #wstate{sock = Sock, pending = Pending}) ->
388-
ok = port_cmd(Sock, lists:reverse(Pending)),
389-
State#wstate{pending = []}.
390-
391-
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
392-
%% Status} to obtain the result. That is bad when it is called from
393-
%% the writer since it requires scanning of the writers possibly quite
394-
%% large message queue.
395-
%%
396-
%% So instead we lift the code from prim_inet:send/2, which is what
397-
%% gen_tcp:send/2 calls, do the first half here and then just process
398-
%% the result code in handle_message/2 as and when it arrives.
399-
%%
400-
%% This means we may end up happily sending data down a closed/broken
401-
%% socket, but that's ok since a) data in the buffers will be lost in
402-
%% any case (so qualitatively we are no worse off than if we used
403-
%% gen_tcp:send/2), and b) we do detect the changed socket status
404-
%% eventually, i.e. when we get round to handling the result code.
405-
%%
406-
%% Also note that the port has bounded buffers and port_command blocks
407-
%% when these are full. So the fact that we process the result
408-
%% asynchronously does not impact flow control.
409-
port_cmd(Sock, Data) ->
410-
true = try rabbit_net:port_command(Sock, Data)
411-
catch error:Error -> exit({writer, send_failed, Error})
412-
end,
413-
ok.
383+
internal_flush(State0 = #wstate{sock = Sock, pending = Pending}) ->
384+
case rabbit_net:send(Sock, lists:reverse(Pending)) of
385+
ok ->
386+
ok;
387+
{error, Reason} ->
388+
exit({writer, send_failed, Reason})
389+
end,
390+
State = State0#wstate{pending = []},
391+
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats).
414392

415393
%% Some processes (channel, writer) can get huge amounts of binary
416394
%% garbage when processing huge messages at high speed (since we only

deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl

Lines changed: 9 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -142,10 +142,6 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
142142
handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
143143
rabbit_amqqueue:notify_sent_queue_down(QPid),
144144
State;
145-
handle_message({inet_reply, _, ok}, State) ->
146-
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
147-
handle_message({inet_reply, _, Status}, _State) ->
148-
exit({writer, send_failed, Status});
149145
handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
150146
ReaderPid ! ensure_stats,
151147
rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
@@ -251,30 +247,12 @@ maybe_flush(State = #wstate{pending = Pending}) ->
251247

252248
flush(State = #wstate{pending = []}) ->
253249
State;
254-
flush(State = #wstate{sock = Sock, pending = Pending}) ->
255-
ok = port_cmd(Sock, lists:reverse(Pending)),
256-
State#wstate{pending = []}.
257-
258-
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
259-
%% Status} to obtain the result. That is bad when it is called from
260-
%% the writer since it requires scanning of the writers possibly quite
261-
%% large message queue.
262-
%%
263-
%% So instead we lift the code from prim_inet:send/2, which is what
264-
%% gen_tcp:send/2 calls, do the first half here and then just process
265-
%% the result code in handle_message/2 as and when it arrives.
266-
%%
267-
%% This means we may end up happily sending data down a closed/broken
268-
%% socket, but that's ok since a) data in the buffers will be lost in
269-
%% any case (so qualitatively we are no worse off than if we used
270-
%% gen_tcp:send/2), and b) we do detect the changed socket status
271-
%% eventually, i.e. when we get round to handling the result code.
272-
%%
273-
%% Also note that the port has bounded buffers and port_command blocks
274-
%% when these are full. So the fact that we process the result
275-
%% asynchronously does not impact flow control.
276-
port_cmd(Sock, Data) ->
277-
true = try rabbit_net:port_command(Sock, Data)
278-
catch error:Error -> exit({writer, send_failed, Error})
279-
end,
280-
ok.
250+
flush(State0 = #wstate{sock = Sock, pending = Pending}) ->
251+
case rabbit_net:send(Sock, lists:reverse(Pending)) of
252+
ok ->
253+
ok;
254+
{error, Reason} ->
255+
exit({writer, send_failed, Reason})
256+
end,
257+
State = State0#wstate{pending = []},
258+
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats).

deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -193,12 +193,6 @@ handle_info({Tag, Sock, Reason}, State = #state{socket = Sock})
193193
when Tag =:= tcp_error; Tag =:= ssl_error ->
194194
network_error(Reason, State);
195195

196-
handle_info({inet_reply, Sock, ok}, State = #state{socket = Sock}) ->
197-
{noreply, State, ?HIBERNATE_AFTER};
198-
199-
handle_info({inet_reply, Sock, {error, Reason}}, State = #state{socket = Sock}) ->
200-
network_error(Reason, State);
201-
202196
handle_info({conserve_resources, Conserve}, State) ->
203197
maybe_process_deferred_recv(
204198
control_throttle(State #state{ conserve = Conserve }));
@@ -335,13 +329,14 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
335329
case ProcState of
336330
connect_packet_unprocessed ->
337331
Send = fun(Data) ->
338-
try rabbit_net:port_command(Socket, Data)
339-
catch error:Reason ->
340-
?LOG_ERROR("writing to MQTT socket ~p failed: ~p",
341-
[Socket, Reason]),
342-
exit({send_failed, Reason})
343-
end,
344-
ok
332+
case rabbit_net:send(Socket, Data) of
333+
ok ->
334+
ok;
335+
{error, Reason} ->
336+
?LOG_ERROR("writing to MQTT socket ~p failed: ~p",
337+
[Socket, Reason]),
338+
exit({send_failed, Reason})
339+
end
345340
end,
346341
try rabbit_mqtt_processor:init(Packet, Socket, ConnName, Send) of
347342
{ok, ProcState1} ->

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ adapter_name(State) ->
4646
#stomp_configuration{},
4747
{SendFun, AdapterInfo, SSLLoginName, PeerAddr})
4848
-> #proc_state{}
49-
when SendFun :: fun((atom(), binary()) -> term()),
49+
when SendFun :: fun((binary()) -> term()),
5050
AdapterInfo :: #amqp_adapter_info{},
5151
SSLLoginName :: atom() | binary(),
5252
PeerAddr :: inet:ip_address().
@@ -1174,7 +1174,7 @@ send_frame(Command, Headers, BodyFragments, State) ->
11741174

11751175
send_frame(Frame, State = #proc_state{send_fun = SendFun,
11761176
trailing_lf = TrailingLF}) ->
1177-
SendFun(async, rabbit_stomp_frame:serialize(Frame, TrailingLF)),
1177+
SendFun(rabbit_stomp_frame:serialize(Frame, TrailingLF)),
11781178
State.
11791179

11801180
send_error_frame(Message, ExtraHeaders, Format, Args, State) ->

deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,6 @@ handle_info({Tag, Sock}, State=#reader_state{socket=Sock})
140140
handle_info({Tag, Sock, Reason}, State=#reader_state{socket=Sock})
141141
when Tag =:= tcp_error; Tag =:= ssl_error ->
142142
{stop, {inet_error, Reason}, State};
143-
handle_info({inet_reply, _Sock, {error, closed}}, State) ->
144-
{stop, normal, State};
145-
handle_info({inet_reply, _, ok}, State) ->
146-
{noreply, State, hibernate};
147-
handle_info({inet_reply, _, Status}, State) ->
148-
{stop, Status, State};
149143
handle_info(emit_stats, State) ->
150144
{noreply, emit_stats(State), hibernate};
151145
handle_info({conserve_resources, Conserve}, State) ->
@@ -259,7 +253,7 @@ process_received_bytes(Bytes,
259253
log_reason({network_error, {frame_too_big, {FrameLength1, MaxFrameSize}}}, State),
260254
{stop, normal, State};
261255
false ->
262-
case rabbit_stomp_processor:process_frame(Frame, ProcState) of
256+
try rabbit_stomp_processor:process_frame(Frame, ProcState) of
263257
{ok, NewProcState, Conn} ->
264258
PS = rabbit_stomp_frame:initial_state(),
265259
NextState = maybe_block(State, Frame),
@@ -271,6 +265,10 @@ process_received_bytes(Bytes,
271265
{stop, Reason, NewProcState} ->
272266
{stop, Reason,
273267
processor_state(NewProcState, State)}
268+
catch exit:{send_failed, closed} ->
269+
{stop, normal, State};
270+
exit:{send_failed, Reason} ->
271+
{stop, Reason, State}
274272
end
275273
end;
276274
{error, Reason} ->
@@ -404,16 +402,13 @@ log_tls_alert(Alert, ConnName) ->
404402

405403
processor_args(Configuration, Sock) ->
406404
RealSocket = rabbit_net:unwrap_socket(Sock),
407-
SendFun = fun (sync, IoData) ->
408-
%% no messages emitted
409-
catch rabbit_net:send(RealSocket, IoData);
410-
(async, IoData) ->
411-
%% {inet_reply, _, _} will appear soon
412-
%% We ignore certain errors here, as we will be
413-
%% receiving an asynchronous notification of the
414-
%% same (or a related) fault shortly anyway. See
415-
%% bug 21365.
416-
catch rabbit_net:port_command(RealSocket, IoData)
405+
SendFun = fun(IoData) ->
406+
case rabbit_net:send(RealSocket, IoData) of
407+
ok ->
408+
ok;
409+
{error, Reason} ->
410+
exit({send_failed, Reason})
411+
end
417412
end,
418413
{ok, {PeerAddr, _PeerPort}} = rabbit_net:sockname(RealSocket),
419414
{SendFun, adapter_info(Sock),

deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ close_connection(Pid, Reason) ->
127127

128128
init_processor_state(#state{socket=Sock, peername=PeerAddr, auth_hd=AuthHd}) ->
129129
Self = self(),
130-
SendFun = fun (_Sync, Data) ->
130+
SendFun = fun(Data) ->
131131
Self ! {send, Data},
132132
ok
133133
end,

0 commit comments

Comments
 (0)