Skip to content

Commit c915d2e

Browse files
Merge pull request #3413 from rabbitmq/stream-reader-close-in-terminate
Stream reader: close osiris logs and sockets in terminate
2 parents 8f207e3 + c8d4838 commit c915d2e

File tree

1 file changed

+14
-30
lines changed

1 file changed

+14
-30
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
%% The contents of this file are subject to the Mozilla Public License
2-
%% Version 2.0 (the "License"); you may not use this file except in
3-
%% compliance with the License. You may obtain a copy of the License
42
%% at https://www.mozilla.org/en-US/MPL/2.0/
53
%%
64
%% Software distributed under the License is distributed on an "AS IS"
@@ -173,11 +171,15 @@
173171
callback_mode() ->
174172
[state_functions, state_enter].
175173

176-
terminate(Reason, State, StatemData) ->
174+
terminate(Reason, State,
175+
#statem_data{transport = Transport,
176+
connection = #stream_connection{socket = Socket},
177+
connection_state = ConnectionState} = StatemData) ->
178+
close(Transport, Socket, ConnectionState),
177179
rabbit_networking:unregister_non_amqp_connection(self()),
178180
notify_connection_closed(StatemData),
179-
rabbit_log:debug("~p terminating in state '~s' with reason '~p'",
180-
[?MODULE, State, Reason]).
181+
rabbit_log:debug("~s terminating in state '~s' with reason '~W'",
182+
[?MODULE, State, Reason, 10]).
181183

182184
start_link(KeepaliveSup, Transport, Ref, Opts) ->
183185
{ok,
@@ -715,7 +717,6 @@ open(info, {OK, S, Data},
715717
#stream_connection{connection_step = Step} = Connection1,
716718
case Step of
717719
closing ->
718-
close(Transport, S, State),
719720
stop;
720721
close_sent ->
721722
rabbit_log_connection:debug("Transitioned to close_sent"),
@@ -800,8 +801,7 @@ open(info, {'DOWN', MonitorRef, process, _OsirisPid, _Reason},
800801
connection_state = State1}};
801802
open(info, heartbeat_send,
802803
#statem_data{transport = Transport,
803-
connection = #stream_connection{socket = S} = Connection,
804-
connection_state = State}) ->
804+
connection = #stream_connection{socket = S} = Connection}) ->
805805
Frame = rabbit_stream_core:frame(heartbeat),
806806
case catch send(Transport, S, Frame) of
807807
ok ->
@@ -810,16 +810,12 @@ open(info, heartbeat_send,
810810
rabbit_log_connection:info("Heartbeat send error ~p, closing connection",
811811
[Unexpected]),
812812
_C1 = demonitor_all_streams(Connection),
813-
close(Transport, S, State),
814813
stop
815814
end;
816815
open(info, heartbeat_timeout,
817-
#statem_data{transport = Transport,
818-
connection = #stream_connection{socket = S} = Connection,
819-
connection_state = State}) ->
816+
#statem_data{connection = #stream_connection{} = Connection}) ->
820817
rabbit_log_connection:debug("Heartbeat timeout, closing connection"),
821818
_C1 = demonitor_all_streams(Connection),
822-
close(Transport, S, State),
823819
stop;
824820
open(info, {infos, From},
825821
#statem_data{connection =
@@ -852,14 +848,11 @@ open({call, From}, {publishers_info, Items},
852848
{keep_state_and_data,
853849
{reply, From, publishers_infos(Items, Connection)}};
854850
open({call, From}, {shutdown, Explanation},
855-
#statem_data{transport = Transport,
856-
connection = #stream_connection{socket = S} = Connection,
857-
connection_state = State}) ->
851+
#statem_data{connection = Connection}) ->
858852
% likely closing call from the management plugin
859853
rabbit_log_connection:info("Forcing stream connection ~p closing: ~p",
860854
[self(), Explanation]),
861855
demonitor_all_streams(Connection),
862-
close(Transport, S, State),
863856
{stop_and_reply, normal, {reply, From, ok}};
864857
open(cast,
865858
{queue_event, _, {osiris_written, _, undefined, CorrelationList}},
@@ -1059,14 +1052,9 @@ close_sent(enter, _OldState,
10591052
#configuration{connection_negotiation_step_timeout =
10601053
StateTimeout}}) ->
10611054
{keep_state_and_data, {state_timeout, StateTimeout, close}};
1062-
close_sent(state_timeout, close,
1063-
#statem_data{transport = Transport,
1064-
connection = #stream_connection{socket = Socket},
1065-
connection_state = State}) ->
1066-
rabbit_log_connection:warning("Closing connection because of timeout in state "
1067-
"'~s' likely due to lack of client action.",
1055+
close_sent(state_timeout, close, #statem_data{}) ->
1056+
rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.",
10681057
[?FUNCTION_NAME]),
1069-
close(Transport, Socket, State),
10701058
stop;
10711059
close_sent(info, {tcp, S, Data},
10721060
#statem_data{transport = Transport,
@@ -1081,7 +1069,6 @@ close_sent(info, {tcp, S, Data},
10811069
[?FUNCTION_NAME, Step]),
10821070
case Step of
10831071
closing_done ->
1084-
close(Transport, S, State1),
10851072
stop;
10861073
_ ->
10871074
Transport:setopts(S, [{active, once}]),
@@ -1093,12 +1080,9 @@ close_sent(info, {tcp_closed, S}, _StatemData) ->
10931080
rabbit_log_connection:debug("Stream protocol connection socket ~w closed [~w]",
10941081
[S, self()]),
10951082
stop;
1096-
close_sent(info, {tcp_error, S, Reason},
1097-
#statem_data{transport = Transport, connection_state = State}) ->
1098-
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] "
1099-
"[~w]",
1083+
close_sent(info, {tcp_error, S, Reason}, #statem_data{}) ->
1084+
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]",
11001085
[Reason, S, self()]),
1101-
close(Transport, S, State),
11021086
stop;
11031087
close_sent(info, {resource_alarm, IsThereAlarm},
11041088
StatemData = #statem_data{connection = Connection}) ->

0 commit comments

Comments
 (0)