Skip to content

Commit 13f7ab0

Browse files
acogoluegnesmergify-bot
authored andcommitted
Re-issue monitors to clean up stale listeners
References #4133 (cherry picked from commit 6ab1158)
1 parent 1ec796b commit 13f7ab0

File tree

2 files changed

+39
-15
lines changed

2 files changed

+39
-15
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -529,20 +529,35 @@ apply(Meta, {machine_version, From = 1, To = 2}, State = #?MODULE{streams = Stre
529529
monitors = Monitors0}) ->
530530
rabbit_log:info("Stream coordinator machine version changes from ~p to ~p, updating state.",
531531
[From, To]),
532-
Streams1 = maps:fold(fun(S, #stream{listeners = L0} = S0, StreamAcc) ->
533-
L1 = maps:fold(fun(ListPid, LeaderPid, LAcc) ->
534-
LAcc#{{ListPid, leader} => LeaderPid}
535-
end, #{}, L0),
536-
StreamAcc#{S => S0#stream{listeners = L1}}
537-
end, #{}, Streams0),
538-
Monitors1 = maps:fold(fun(P, {StreamId, listener}, Acc) ->
532+
%% conversion from old state to new state
533+
%% additional operation: the stream listeners are never collected in the previous version
534+
%% so we'll emit monitors for all listener PIDs
535+
%% this way we'll get the DOWN event for dead listener PIDs and
536+
%% we'll clean the stream listeners the in the DOWN event callback
537+
538+
%% transform the listeners of each stream and accumulate listener PIDs
539+
{Streams1, Listeners} =
540+
maps:fold(fun(S, #stream{listeners = L0} = S0, {StreamAcc, GlobalListAcc}) ->
541+
{L1, GlobalListAcc1} = maps:fold(fun(ListPid, LeaderPid, {LAcc, GLAcc}) ->
542+
{LAcc#{{ListPid, leader} => LeaderPid},
543+
GLAcc#{ListPid => S}}
544+
end, {#{}, GlobalListAcc}, L0),
545+
{StreamAcc#{S => S0#stream{listeners = L1}}, GlobalListAcc1}
546+
end, {#{}, #{}}, Streams0),
547+
%% accumulate monitors for the map and create the effects to emit the monitors
548+
{ExtraMonitors, Effects} = maps:fold(fun(P, StreamId, {MAcc, EAcc}) ->
549+
{MAcc#{P => {StreamId, listener}},
550+
[{monitor, process, P} | EAcc]}
551+
end, {#{}, []}, Listeners),
552+
Monitors1 = maps:merge(Monitors0, ExtraMonitors),
553+
Monitors2 = maps:fold(fun(P, {StreamId, listener}, Acc) ->
539554
Acc#{P => {#{StreamId => ok}, listener}};
540555
(P, V, Acc) ->
541556
Acc#{P => V}
542-
end, #{}, Monitors0),
557+
end, #{}, Monitors1),
543558
return(Meta, State#?MODULE{streams = Streams1,
544-
monitors = Monitors1,
545-
listeners = undefined}, ok, []);
559+
monitors = Monitors2,
560+
listeners = undefined}, ok, Effects);
546561
apply(Meta, {machine_version, From, To}, State) ->
547562
rabbit_log:info("Stream coordinator machine version changes from ~p to ~p, no state changes required.",
548563
[From, To]),

deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -200,22 +200,31 @@ listeners(_) ->
200200
machine_version_from_1_to_2(_) ->
201201
S = <<"stream">>,
202202
LeaderPid = spawn(fun() -> ok end),
203-
ListPid = spawn(fun() -> ok end),
203+
ListPid = spawn(fun() -> ok end), %% simulate a dead listener (not cleaned up)
204+
DeadListPid = spawn(fun() -> ok end),
204205
State0 = #?STATE{streams = #{S =>
205-
#stream{listeners = #{ListPid => LeaderPid}}},
206+
#stream{listeners = #{ListPid => LeaderPid,
207+
DeadListPid => LeaderPid}}},
206208
monitors = #{ListPid => {S, listener}}},
207209

208-
{State1, ok, []} = apply_cmd(#{index => 42}, {machine_version, 1, 2}, State0),
210+
{State1, ok, Effects} = apply_cmd(#{index => 42}, {machine_version, 1, 2}, State0),
209211

210212
Stream1 = maps:get(S, State1#?STATE.streams),
211213
?assertEqual(
212-
#{{ListPid, leader} => LeaderPid},
214+
#{{ListPid, leader} => LeaderPid,
215+
{DeadListPid, leader} => LeaderPid}, %% should be cleaned up on DOWN event
213216
Stream1#stream.listeners
214217
),
215218
?assertEqual(
216-
#{ListPid => {#{S => ok}, listener}},
219+
#{ListPid => {#{S => ok}, listener},
220+
DeadListPid => {#{S => ok}, listener}},
217221
State1#?STATE.monitors
218222
),
223+
?assertEqual(
224+
[{monitor, process, DeadListPid}, %% will trigger an immediate DOWN event
225+
{monitor, process, ListPid}],
226+
Effects
227+
),
219228
ok.
220229

221230
new_stream(_) ->

0 commit comments

Comments
 (0)