Skip to content

Commit dad0025

Browse files
committed
Perform stream reader cleanup in terminate
Otherwise metrics will not get cleaned up correctly when processes crash. It's also tidier to do this in a single place, in terminate/3 Pair: @kjnilsson Signed-off-by: Gerhard Lazu <[email protected]>
1 parent 8dc0240 commit dad0025

File tree

1 file changed

+13
-36
lines changed

1 file changed

+13
-36
lines changed

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 13 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@
173173
callback_mode() ->
174174
[state_functions, state_enter].
175175

176-
terminate(Reason, State, _StatemData) ->
176+
terminate(Reason, State, StatemData) ->
177+
rabbit_networking:unregister_non_amqp_connection(self()),
178+
notify_connection_closed(StatemData),
177179
rabbit_log:debug("~p terminating in state '~s' with reason '~p'", [?MODULE, State, Reason]).
178180

179181
start_link(KeepaliveSup, Transport, Ref, Opts) ->
@@ -656,8 +658,6 @@ open(info,
656658
case Step of
657659
closing ->
658660
close(Transport, S, State),
659-
rabbit_networking:unregister_non_amqp_connection(self()),
660-
notify_connection_closed(Connection1, State1),
661661
stop;
662662
close_sent ->
663663
rabbit_log_connection:debug("Transitioned to close_sent"),
@@ -697,24 +697,14 @@ open(info,
697697
connection = Connection1,
698698
connection_state = State2}}
699699
end;
700-
open(info, {Closed, Socket}, #statem_data{
701-
connection = Connection,
702-
connection_state = State
703-
})
700+
open(info, {Closed, Socket}, #statem_data{connection = Connection})
704701
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
705702
demonitor_all_streams(Connection),
706-
rabbit_networking:unregister_non_amqp_connection(self()),
707-
notify_connection_closed(Connection, State),
708703
rabbit_log_connection:warning("Socket ~w closed [~w]", [Socket, self()]),
709704
stop;
710-
open(info, {Error, Socket, Reason}, #statem_data{
711-
connection = Connection,
712-
connection_state = State
713-
})
705+
open(info, {Error, Socket, Reason}, #statem_data{connection = Connection})
714706
when Error =:= tcp_error; Error =:= ssl_error ->
715707
demonitor_all_streams(Connection),
716-
rabbit_networking:unregister_non_amqp_connection(self()),
717-
notify_connection_closed(Connection, State),
718708
rabbit_log_connection:error("Socket error ~p [~w] [~w]", [Reason, Socket, self()]),
719709
stop;
720710
open(info,
@@ -813,8 +803,6 @@ open({call, From}, {shutdown, Explanation}, #statem_data{
813803
rabbit_log_connection:info("Forcing stream connection ~p closing: ~p",
814804
[self(), Explanation]),
815805
demonitor_all_streams(Connection),
816-
rabbit_networking:unregister_non_amqp_connection(self()),
817-
notify_connection_closed(Connection, State),
818806
close(Transport, S, State),
819807
{stop_and_reply, normal, {reply, From, ok}};
820808
open(cast,
@@ -1013,15 +1001,13 @@ close_sent(enter, _OldState, #statem_data{
10131001
{keep_state_and_data, {state_timeout, StateTimeout, close}};
10141002
close_sent(state_timeout, close, #statem_data{
10151003
transport = Transport,
1016-
connection = #stream_connection{socket = Socket} = Connection,
1004+
connection = #stream_connection{socket = Socket},
10171005
connection_state = State
10181006
}) ->
10191007
rabbit_log_connection:warning(
10201008
"Closing connection because of timeout in state '~s' likely due to lack of client action.",
10211009
[?FUNCTION_NAME]),
10221010
close(Transport, Socket, State),
1023-
rabbit_networking:unregister_non_amqp_connection(self()),
1024-
notify_connection_closed(Connection, State),
10251011
stop;
10261012
close_sent(info, {tcp, S, Data}, #statem_data{
10271013
transport = Transport,
@@ -1037,8 +1023,6 @@ close_sent(info, {tcp, S, Data}, #statem_data{
10371023
case Step of
10381024
closing_done ->
10391025
close(Transport, S, State1),
1040-
rabbit_networking:unregister_non_amqp_connection(self()),
1041-
notify_connection_closed(Connection1, State1),
10421026
stop;
10431027
_ ->
10441028
Transport:setopts(S, [{active, once}]),
@@ -1047,23 +1031,15 @@ close_sent(info, {tcp, S, Data}, #statem_data{
10471031
connection_state = State1
10481032
}}
10491033
end;
1050-
close_sent(info, {tcp_closed, S}, #statem_data{
1051-
connection = Connection,
1052-
connection_state = State
1053-
}) ->
1054-
rabbit_networking:unregister_non_amqp_connection(self()),
1055-
notify_connection_closed(Connection, State),
1034+
close_sent(info, {tcp_closed, S}, _StatemData) ->
10561035
rabbit_log_connection:debug("Stream protocol connection socket ~w closed [~w]", [S, self()]),
10571036
stop;
10581037
close_sent(info, {tcp_error, S, Reason}, #statem_data{
10591038
transport = Transport,
1060-
connection = Connection,
10611039
connection_state = State
10621040
}) ->
10631041
rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]", [Reason, S, self()]),
10641042
close(Transport, S, State),
1065-
rabbit_networking:unregister_non_amqp_connection(self()),
1066-
notify_connection_closed(Connection, State),
10671043
stop;
10681044
close_sent(info,{resource_alarm, IsThereAlarm},
10691045
StatemData = #statem_data{connection = Connection}) ->
@@ -2290,11 +2266,12 @@ handle_frame_post_auth(Transport,
22902266
rabbit_global_counters:increase_protocol_counter(stream, ?UNKNOWN_FRAME, 1),
22912267
{Connection#stream_connection{connection_step = close_sent}, State}.
22922268

2293-
notify_connection_closed(#stream_connection{name = Name,
2294-
publishers = Publishers} =
2295-
Connection,
2296-
#stream_connection_state{consumers = Consumers} =
2297-
ConnectionState) ->
2269+
notify_connection_closed(
2270+
#statem_data{connection = #stream_connection{
2271+
name = Name,
2272+
publishers = Publishers} = Connection,
2273+
connection_state = #stream_connection_state{
2274+
consumers = Consumers} = ConnectionState}) ->
22982275
rabbit_core_metrics:connection_closed(self()),
22992276
[rabbit_stream_metrics:consumer_cancelled(self(),
23002277
stream_r(S, Connection), SubId)

0 commit comments

Comments
 (0)