Skip to content

Commit 2816238

Browse files
acogoluegnesmergify-bot
authored andcommitted
Fix stream coordinator v1-to-v2 machine state change
Monitors were not converted. References #4133 (cherry picked from commit df61e44) (cherry picked from commit 1ec796b)
1 parent 7d2dafb commit 2816238

File tree

3 files changed

+44
-5
lines changed

3 files changed

+44
-5
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
{delete_replica, stream_id(), #{node := node()}} |
7474
{policy_changed, stream_id(), #{queue := amqqueue:amqqueue()}} |
7575
{register_listener, #{pid := pid(),
76+
node := node(),
7677
stream_id := stream_id(),
7778
type := leader | local_member}} |
7879
{action_failed, stream_id(), #{index := ra:index(),
@@ -524,13 +525,27 @@ apply(Meta, {nodeup, Node} = Cmd,
524525
end, {Streams0, Effects0}, Streams0),
525526
return(Meta, State#?MODULE{monitors = Monitors,
526527
streams = Streams}, ok, Effects);
527-
apply(Meta, {machine_version, 1, 2}, State = #?MODULE{streams = Streams0}) ->
528-
Streams1 = maps:fold(fun(ListPid, LeaderPid, Acc) ->
529-
Acc#{{ListPid, leader} => LeaderPid}
528+
apply(Meta, {machine_version, From = 1, To = 2}, State = #?MODULE{streams = Streams0,
529+
monitors = Monitors0}) ->
530+
rabbit_log:info("Stream coordinator machine version changes from ~p to ~p, updating state.",
531+
[From, To]),
532+
Streams1 = maps:fold(fun(S, #stream{listeners = L0} = S0, StreamAcc) ->
533+
L1 = maps:fold(fun(ListPid, LeaderPid, LAcc) ->
534+
LAcc#{{ListPid, leader} => LeaderPid}
535+
end, #{}, L0),
536+
StreamAcc#{S => S0#stream{listeners = L1}}
530537
end, #{}, Streams0),
538+
Monitors1 = maps:fold(fun(P, {StreamId, listener}, Acc) ->
539+
Acc#{P => {#{StreamId => ok}, listener}};
540+
(P, V, Acc) ->
541+
Acc#{P => V}
542+
end, #{}, Monitors0),
531543
return(Meta, State#?MODULE{streams = Streams1,
544+
monitors = Monitors1,
532545
listeners = undefined}, ok, []);
533-
apply(Meta, {machine_version, _From, _To}, State) ->
546+
apply(Meta, {machine_version, From, To}, State) ->
547+
rabbit_log:info("Stream coordinator machine version changes from ~p to ~p, no state changes required.",
548+
[From, To]),
534549
return(Meta, State, ok, []);
535550
apply(Meta, UnkCmd, State) ->
536551
rabbit_log:debug("~s: unknown command ~W",

deps/rabbit/src/rabbit_stream_coordinator.hrl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,9 @@
5757
}).
5858

5959
-record(rabbit_stream_coordinator, {streams = #{} :: #{stream_id() => #stream{}},
60-
monitors = #{} :: #{pid() => {stream_id(), monitor_role()}},
60+
monitors = #{} :: #{pid() => {stream_id() | %% v0 & v1
61+
#{stream_id() => ok}, %% v2
62+
monitor_role()}},
6163
%% not used as of v2
6264
listeners = #{} :: undefined | #{stream_id() =>
6365
#{pid() := queue_ref()}},

deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ all() ->
2525
all_tests() ->
2626
[
2727
listeners,
28+
machine_version_from_1_to_2,
2829
new_stream,
2930
leader_down,
3031
leader_down_scenario_1,
@@ -195,6 +196,27 @@ listeners(_) ->
195196

196197
ok.
197198

199+
machine_version_from_1_to_2(_) ->
200+
S = <<"stream">>,
201+
LeaderPid = spawn(fun() -> ok end),
202+
ListPid = spawn(fun() -> ok end),
203+
State0 = #?STATE{streams = #{S =>
204+
#stream{listeners = #{ListPid => LeaderPid}}},
205+
monitors = #{ListPid => {S, listener}}},
206+
207+
{State1, ok, []} = apply_cmd(#{index => 42}, {machine_version, 1, 2}, State0),
208+
209+
Stream1 = maps:get(S, State1#?STATE.streams),
210+
?assertEqual(
211+
#{{ListPid, leader} => LeaderPid},
212+
Stream1#stream.listeners
213+
),
214+
?assertEqual(
215+
#{ListPid => {#{S => ok}, listener}},
216+
State1#?STATE.monitors
217+
),
218+
ok.
219+
198220
new_stream(_) ->
199221
[N1, N2, N3] = Nodes = [r@n1, r@n2, r@n3],
200222
StreamId = atom_to_list(?FUNCTION_NAME),

0 commit comments

Comments
 (0)