Skip to content

Remove erlang:port_command/2 hack #9810

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 2 additions & 37 deletions deps/rabbit_common/src/rabbit_net.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
-include_lib("kernel/include/net_address.hrl").

-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, sync_recv/2, async_recv/3, port_command/2, getopts/2,
recv/1, sync_recv/2, async_recv/3, getopts/2,
setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1,
peercert/1, connection_string/2, socket_ends/2, is_loopback/1,
tcp_host/1, unwrap_socket/1, maybe_get_proxy_socket/1,
Expand Down Expand Up @@ -50,15 +50,14 @@
rabbit_types:error(any()).
-spec async_recv(socket(), integer(), timeout()) ->
rabbit_types:ok(any()).
-spec port_command(socket(), iolist()) -> 'true'.
-spec getopts
(socket(),
[atom() |
{raw, non_neg_integer(), non_neg_integer(),
non_neg_integer() | binary()}]) ->
ok_val_or_error(opts()).
-spec setopts(socket(), opts()) -> ok_or_any_error().
-spec send(socket(), binary() | iolist()) -> ok_or_any_error().
-spec send(socket(), iodata()) -> ok_or_any_error().
-spec close(socket()) -> ok_or_any_error().
-spec fast_close(socket()) -> ok_or_any_error().
-spec sockname(socket()) ->
Expand Down Expand Up @@ -161,40 +160,6 @@ async_recv(Sock, Length, infinity) when is_port(Sock) ->
async_recv(Sock, Length, Timeout) when is_port(Sock) ->
prim_inet:async_recv(Sock, Length, Timeout).

port_command(Sock, Data) when ?IS_SSL(Sock) ->
case ssl:send(Sock, Data) of
ok -> self() ! {inet_reply, Sock, ok},
true;
{error, Reason} -> erlang:error(Reason)
end;
port_command(Sock, Data) when is_port(Sock) ->
Fun = case persistent_term:get(rabbit_net_tcp_send, undefined) of
undefined ->
Rel = list_to_integer(erlang:system_info(otp_release)),
%% gen_tcp:send/2 does a selective receive of
%% {inet_reply, Sock, Status[, CallerTag]}
F = if Rel >= 26 ->
%% Selective receive is optimised:
%% https://github.com/erlang/otp/issues/6455
fun gen_tcp_send/2;
Rel < 26 ->
%% Avoid costly selective receive.
fun erlang:port_command/2
end,
ok = persistent_term:put(rabbit_net_tcp_send, F),
F;
F ->
F
end,
Fun(Sock, Data).

gen_tcp_send(Sock, Data) ->
case gen_tcp:send(Sock, Data) of
ok -> self() ! {inet_reply, Sock, ok},
true;
{error, Reason} -> erlang:error(Reason)
end.

getopts(Sock, Options) when ?IS_SSL(Sock) ->
ssl:getopts(Sock, Options);
getopts(Sock, Options) when is_port(Sock) ->
Expand Down
40 changes: 9 additions & 31 deletions deps/rabbit_common/src/rabbit_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,6 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
rabbit_amqqueue_common:notify_sent_queue_down(QPid),
State;
handle_message({inet_reply, _, ok}, State) ->
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
ReaderPid ! ensure_stats,
rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
Expand Down Expand Up @@ -384,33 +380,15 @@ maybe_flush(State = #wstate{pending = Pending}) ->

internal_flush(State = #wstate{pending = []}) ->
State;
internal_flush(State = #wstate{sock = Sock, pending = Pending}) ->
ok = port_cmd(Sock, lists:reverse(Pending)),
State#wstate{pending = []}.

%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
%% the writer since it requires scanning of the writers possibly quite
%% large message queue.
%%
%% So instead we lift the code from prim_inet:send/2, which is what
%% gen_tcp:send/2 calls, do the first half here and then just process
%% the result code in handle_message/2 as and when it arrives.
%%
%% This means we may end up happily sending data down a closed/broken
%% socket, but that's ok since a) data in the buffers will be lost in
%% any case (so qualitatively we are no worse off than if we used
%% gen_tcp:send/2), and b) we do detect the changed socket status
%% eventually, i.e. when we get round to handling the result code.
%%
%% Also note that the port has bounded buffers and port_command blocks
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
port_cmd(Sock, Data) ->
true = try rabbit_net:port_command(Sock, Data)
catch error:Error -> exit({writer, send_failed, Error})
end,
ok.
internal_flush(State0 = #wstate{sock = Sock, pending = Pending}) ->
case rabbit_net:send(Sock, lists:reverse(Pending)) of
ok ->
ok;
{error, Reason} ->
exit({writer, send_failed, Reason})
end,
State = State0#wstate{pending = []},
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats).

%% Some processes (channel, writer) can get huge amounts of binary
%% garbage when processing huge messages at high speed (since we only
Expand Down
40 changes: 9 additions & 31 deletions deps/rabbitmq_amqp1_0/src/rabbit_amqp1_0_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,6 @@ handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QPid),
State;
handle_message({inet_reply, _, ok}, State) ->
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
ReaderPid ! ensure_stats,
rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
Expand Down Expand Up @@ -251,30 +247,12 @@ maybe_flush(State = #wstate{pending = Pending}) ->

flush(State = #wstate{pending = []}) ->
State;
flush(State = #wstate{sock = Sock, pending = Pending}) ->
ok = port_cmd(Sock, lists:reverse(Pending)),
State#wstate{pending = []}.

%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
%% the writer since it requires scanning of the writers possibly quite
%% large message queue.
%%
%% So instead we lift the code from prim_inet:send/2, which is what
%% gen_tcp:send/2 calls, do the first half here and then just process
%% the result code in handle_message/2 as and when it arrives.
%%
%% This means we may end up happily sending data down a closed/broken
%% socket, but that's ok since a) data in the buffers will be lost in
%% any case (so qualitatively we are no worse off than if we used
%% gen_tcp:send/2), and b) we do detect the changed socket status
%% eventually, i.e. when we get round to handling the result code.
%%
%% Also note that the port has bounded buffers and port_command blocks
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
port_cmd(Sock, Data) ->
true = try rabbit_net:port_command(Sock, Data)
catch error:Error -> exit({writer, send_failed, Error})
end,
ok.
flush(State0 = #wstate{sock = Sock, pending = Pending}) ->
case rabbit_net:send(Sock, lists:reverse(Pending)) of
ok ->
ok;
{error, Reason} ->
exit({writer, send_failed, Reason})
end,
State = State0#wstate{pending = []},
rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats).
21 changes: 8 additions & 13 deletions deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,6 @@ handle_info({Tag, Sock, Reason}, State = #state{socket = Sock})
when Tag =:= tcp_error; Tag =:= ssl_error ->
network_error(Reason, State);

handle_info({inet_reply, Sock, ok}, State = #state{socket = Sock}) ->
{noreply, State, ?HIBERNATE_AFTER};

handle_info({inet_reply, Sock, {error, Reason}}, State = #state{socket = Sock}) ->
network_error(Reason, State);

handle_info({conserve_resources, Conserve}, State) ->
maybe_process_deferred_recv(
control_throttle(State #state{ conserve = Conserve }));
Expand Down Expand Up @@ -335,13 +329,14 @@ process_received_bytes(Bytes, State = #state{socket = Socket,
case ProcState of
connect_packet_unprocessed ->
Send = fun(Data) ->
try rabbit_net:port_command(Socket, Data)
catch error:Reason ->
?LOG_ERROR("writing to MQTT socket ~p failed: ~p",
[Socket, Reason]),
exit({send_failed, Reason})
end,
ok
case rabbit_net:send(Socket, Data) of
ok ->
ok;
{error, Reason} ->
?LOG_ERROR("writing to MQTT socket ~p failed: ~p",
[Socket, Reason]),
exit({send_failed, Reason})
end
end,
try rabbit_mqtt_processor:init(Packet, Socket, ConnName, Send) of
{ok, ProcState1} ->
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ adapter_name(State) ->
#stomp_configuration{},
{SendFun, AdapterInfo, SSLLoginName, PeerAddr})
-> #proc_state{}
when SendFun :: fun((atom(), binary()) -> term()),
when SendFun :: fun((binary()) -> term()),
AdapterInfo :: #amqp_adapter_info{},
SSLLoginName :: atom() | binary(),
PeerAddr :: inet:ip_address().
Expand Down Expand Up @@ -1174,7 +1174,7 @@ send_frame(Command, Headers, BodyFragments, State) ->

send_frame(Frame, State = #proc_state{send_fun = SendFun,
trailing_lf = TrailingLF}) ->
SendFun(async, rabbit_stomp_frame:serialize(Frame, TrailingLF)),
SendFun(rabbit_stomp_frame:serialize(Frame, TrailingLF)),
State.

send_error_frame(Message, ExtraHeaders, Format, Args, State) ->
Expand Down
29 changes: 12 additions & 17 deletions deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,6 @@ handle_info({Tag, Sock}, State=#reader_state{socket=Sock})
handle_info({Tag, Sock, Reason}, State=#reader_state{socket=Sock})
when Tag =:= tcp_error; Tag =:= ssl_error ->
{stop, {inet_error, Reason}, State};
handle_info({inet_reply, _Sock, {error, closed}}, State) ->
{stop, normal, State};
handle_info({inet_reply, _, ok}, State) ->
{noreply, State, hibernate};
handle_info({inet_reply, _, Status}, State) ->
{stop, Status, State};
handle_info(emit_stats, State) ->
{noreply, emit_stats(State), hibernate};
handle_info({conserve_resources, Conserve}, State) ->
Expand Down Expand Up @@ -259,7 +253,7 @@ process_received_bytes(Bytes,
log_reason({network_error, {frame_too_big, {FrameLength1, MaxFrameSize}}}, State),
{stop, normal, State};
false ->
case rabbit_stomp_processor:process_frame(Frame, ProcState) of
try rabbit_stomp_processor:process_frame(Frame, ProcState) of
{ok, NewProcState, Conn} ->
PS = rabbit_stomp_frame:initial_state(),
NextState = maybe_block(State, Frame),
Expand All @@ -271,6 +265,10 @@ process_received_bytes(Bytes,
{stop, Reason, NewProcState} ->
{stop, Reason,
processor_state(NewProcState, State)}
catch exit:{send_failed, closed} ->
{stop, normal, State};
exit:{send_failed, Reason} ->
{stop, Reason, State}
end
end;
{error, Reason} ->
Expand Down Expand Up @@ -404,16 +402,13 @@ log_tls_alert(Alert, ConnName) ->

processor_args(Configuration, Sock) ->
RealSocket = rabbit_net:unwrap_socket(Sock),
SendFun = fun (sync, IoData) ->
%% no messages emitted
catch rabbit_net:send(RealSocket, IoData);
(async, IoData) ->
%% {inet_reply, _, _} will appear soon
%% We ignore certain errors here, as we will be
%% receiving an asynchronous notification of the
%% same (or a related) fault shortly anyway. See
%% bug 21365.
catch rabbit_net:port_command(RealSocket, IoData)
SendFun = fun(IoData) ->
case rabbit_net:send(RealSocket, IoData) of
ok ->
ok;
{error, Reason} ->
exit({send_failed, Reason})
end
end,
{ok, {PeerAddr, _PeerPort}} = rabbit_net:sockname(RealSocket),
{SendFun, adapter_info(Sock),
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_web_stomp/src/rabbit_web_stomp_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ close_connection(Pid, Reason) ->

init_processor_state(#state{socket=Sock, peername=PeerAddr, auth_hd=AuthHd}) ->
Self = self(),
SendFun = fun (_Sync, Data) ->
SendFun = fun(Data) ->
Self ! {send, Data},
ok
end,
Expand Down