Skip to content

Commit 41dfc4d

Browse files
committed
Bump machine version from 1 to 2 in stream coordinator
Version bump necessary because of the state changes to handle local member listeners. References #4133
1 parent 8614c5d commit 41dfc4d

File tree

3 files changed

+101
-26
lines changed

3 files changed

+101
-26
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 93 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
-export([replay/1]).
4242

4343
%% for testing and debugging
44-
-export([eval_listeners/2,
44+
-export([eval_listeners/3,
4545
state/0]).
4646

4747
-rabbit_boot_step({?MODULE,
@@ -339,7 +339,7 @@ all_coord_members() ->
339339
Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
340340
[{?MODULE, Node} || Node <- [node() | Nodes]].
341341

342-
version() -> 1.
342+
version() -> 2.
343343

344344
which_module(_) ->
345345
?MODULE.
@@ -349,7 +349,8 @@ init(_Conf) ->
349349

350350
-spec apply(map(), command(), state()) ->
351351
{state(), term(), ra_machine:effects()}.
352-
apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
352+
apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
353+
{_CmdTag, StreamId, #{}} = Cmd,
353354
#?MODULE{streams = Streams0,
354355
monitors = Monitors0} = State0) ->
355356
Stream0 = maps:get(StreamId, Streams0, undefined),
@@ -367,10 +368,10 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
367368
case Stream1 of
368369
undefined ->
369370
return(Meta, State0#?MODULE{streams = maps:remove(StreamId, Streams0)},
370-
Reply, inform_listeners_eol(Stream0));
371+
Reply, inform_listeners_eol(MachineVersion, Stream0));
371372
_ ->
372373
{Stream2, Effects0} = evaluate_stream(Meta, Stream1, []),
373-
{Stream3, Effects1} = eval_listeners(Stream2, Effects0),
374+
{Stream3, Effects1} = eval_listeners(MachineVersion, Stream2, Effects0),
374375
{Stream, Effects2} = eval_retention(Meta, Stream3, Effects1),
375376
{Monitors, Effects} = ensure_monitors(Stream, Monitors0, Effects2),
376377
return(Meta,
@@ -380,9 +381,10 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
380381
Reply ->
381382
return(Meta, State0, Reply, [])
382383
end;
383-
apply(Meta, {down, Pid, Reason} = Cmd,
384+
apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
384385
#?MODULE{streams = Streams0,
385-
monitors = Monitors0} = State) ->
386+
monitors = Monitors0,
387+
listeners = StateListeners0} = State) ->
386388

387389
Effects0 = case Reason of
388390
noconnection ->
@@ -391,7 +393,21 @@ apply(Meta, {down, Pid, Reason} = Cmd,
391393
[]
392394
end,
393395
case maps:take(Pid, Monitors0) of
394-
{{PidStreams, listener}, Monitors} ->
396+
{{StreamId, listener}, Monitors} when MachineVersion =< 1 ->
397+
Listeners = case maps:take(StreamId, StateListeners0) of
398+
error ->
399+
StateListeners0;
400+
{Pids0, Listeners1} ->
401+
case maps:remove(Pid, Pids0) of
402+
Pids when map_size(Pids) == 0 ->
403+
Listeners1;
404+
Pids ->
405+
Listeners1#{StreamId => Pids}
406+
end
407+
end,
408+
return(Meta, State#?MODULE{listeners = Listeners,
409+
monitors = Monitors}, ok, Effects0);
410+
{{PidStreams, listener}, Monitors} when MachineVersion >= 2 ->
395411
Streams = maps:fold(fun(StreamId, _, Acc) ->
396412
case Acc of
397413
#{StreamId := Stream = #stream{listeners = Listeners0}} ->
@@ -426,12 +442,31 @@ apply(Meta, {down, Pid, Reason} = Cmd,
426442
error ->
427443
return(Meta, State, ok, Effects0)
428444
end;
429-
apply(Meta, {register_listener, #{pid := Pid,
430-
node := Node,
431-
stream_id := StreamId,
432-
type := Type}},
445+
apply(#{machine_version := MachineVersion} = Meta,
446+
{register_listener, #{pid := Pid,
447+
stream_id := StreamId}},
433448
#?MODULE{streams = Streams,
434-
monitors = Monitors0} = State0) ->
449+
monitors = Monitors0} = State0) when MachineVersion =< 1 ->
450+
case Streams of
451+
#{StreamId := #stream{listeners = Listeners0} = Stream0} ->
452+
Stream1 = Stream0#stream{listeners = maps:put(Pid, undefined, Listeners0)},
453+
{Stream, Effects} = eval_listeners(MachineVersion, Stream1, []),
454+
Monitors = maps:put(Pid, {StreamId, listener}, Monitors0),
455+
return(Meta,
456+
State0#?MODULE{streams = maps:put(StreamId, Stream, Streams),
457+
monitors = Monitors}, ok,
458+
[{monitor, process, Pid} | Effects]);
459+
_ ->
460+
return(Meta, State0, stream_not_found, [])
461+
end;
462+
463+
apply(#{machine_version := MachineVersion} = Meta,
464+
{register_listener, #{pid := Pid,
465+
node := Node,
466+
stream_id := StreamId,
467+
type := Type}},
468+
#?MODULE{streams = Streams,
469+
monitors = Monitors0} = State0) when MachineVersion >= 2 ->
435470
case Streams of
436471
#{StreamId := #stream{listeners = Listeners0} = Stream0} ->
437472
{LKey, LValue} =
@@ -442,7 +477,7 @@ apply(Meta, {register_listener, #{pid := Pid,
442477
{{Pid, member}, {Node, undefined}}
443478
end,
444479
Stream1 = Stream0#stream{listeners = maps:put(LKey, LValue, Listeners0)},
445-
{Stream, Effects} = eval_listeners(Stream1, []),
480+
{Stream, Effects} = eval_listeners(MachineVersion, Stream1, []),
446481
{PidStreams, listener} = maps:get(Pid, Monitors0, {#{}, listener}),
447482
Monitors = maps:put(Pid, {PidStreams#{StreamId => ok}, listener}, Monitors0),
448483
return(Meta,
@@ -476,6 +511,12 @@ apply(Meta, {nodeup, Node} = Cmd,
476511
end, {Streams0, Effects0}, Streams0),
477512
return(Meta, State#?MODULE{monitors = Monitors,
478513
streams = Streams}, ok, Effects);
514+
apply(Meta, {machine_version, 1, 2}, State = #?MODULE{streams = Streams0}) ->
515+
Streams1 = maps:fold(fun(ListPid, LeaderPid, Acc) ->
516+
Acc#{{ListPid, leader} => LeaderPid}
517+
end, #{}, Streams0),
518+
return(Meta, State#?MODULE{streams = Streams1,
519+
listeners = undefined}, ok, []);
479520
apply(Meta, {machine_version, _From, _To}, State) ->
480521
return(Meta, State, ok, []);
481522
apply(Meta, UnkCmd, State) ->
@@ -1278,10 +1319,19 @@ update_stream0(#{system_time := _Ts},
12781319
update_stream0(_Meta, _Cmd, undefined) ->
12791320
undefined.
12801321

1281-
inform_listeners_eol(#stream{target = deleted,
1322+
inform_listeners_eol(MachineVersion, #stream{target = deleted,
12821323
listeners = Listeners,
12831324
queue_ref = QRef
1284-
}) ->
1325+
}) when MachineVersion =< 1 ->
1326+
lists:map(fun(Pid) ->
1327+
{send_msg, Pid,
1328+
{queue_event, QRef, eol},
1329+
cast}
1330+
end, maps:keys(Listeners));
1331+
inform_listeners_eol(MachineVersion, #stream{target = deleted,
1332+
listeners = Listeners,
1333+
queue_ref = QRef
1334+
}) when MachineVersion >= 2 ->
12851335
LPidsMap = maps:fold(fun({P, _}, _V, Acc) ->
12861336
Acc#{P => ok}
12871337
end, #{}, Listeners),
@@ -1290,12 +1340,35 @@ inform_listeners_eol(#stream{target = deleted,
12901340
{queue_event, QRef, eol},
12911341
cast}
12921342
end, maps:keys(LPidsMap));
1293-
inform_listeners_eol(_) ->
1343+
inform_listeners_eol(_, _) ->
12941344
[].
12951345

1296-
eval_listeners(#stream{listeners = Listeners0,
1297-
queue_ref = QRef,
1298-
members = Members} = Stream0, Effects0) ->
1346+
eval_listeners(MachineVersion, #stream{listeners = Listeners0,
1347+
queue_ref = QRef,
1348+
members = Members} = Stream, Effects0)
1349+
when MachineVersion =< 1 ->
1350+
case find_leader(Members) of
1351+
{#member{state = {running, _, LeaderPid}}, _} ->
1352+
%% a leader is running, check all listeners to see if any of them
1353+
%% has not been notified of the current leader pid
1354+
{Listeners, Effects} =
1355+
maps:fold(
1356+
fun(_, P, Acc) when P == LeaderPid ->
1357+
Acc;
1358+
(LPid, _, {L, Acc}) ->
1359+
{L#{LPid => LeaderPid},
1360+
[{send_msg, LPid,
1361+
{queue_event, QRef,
1362+
{stream_leader_change, LeaderPid}},
1363+
cast} | Acc]}
1364+
end, {Listeners0, Effects0}, Listeners0),
1365+
{Stream#stream{listeners = Listeners}, Effects};
1366+
_ ->
1367+
{Stream, Effects0}
1368+
end;
1369+
eval_listeners(MachineVersion, #stream{listeners = Listeners0,
1370+
queue_ref = QRef,
1371+
members = Members} = Stream0, Effects0) when MachineVersion >= 2 ->
12991372
%% Iterating over stream listeners.
13001373
%% Returning the new map of listeners and the effects (notification of changes)
13011374
{Listeners1, Effects1} =

deps/rabbit/src/rabbit_stream_coordinator.hrl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@
4848
conf :: osiris:config(),
4949
nodes :: [node()],
5050
members = #{} :: #{node() := #member{}},
51-
listeners = #{} :: #{{pid(), leader | member} := LeaderPid :: pid()} | {node(), LocalPid :: pid()},
51+
listeners = #{} :: #{pid() | %% v0 & v1
52+
{pid(), leader | member} %% v2
53+
:= LeaderPid :: pid()} | {node(), LocalPid :: pid()},
5254
reply_to :: undefined | from(),
5355
mnesia = {updated, 0} :: {updated | updating, osiris:epoch()},
5456
target = running :: running | deleted
@@ -57,8 +59,8 @@
5759
-record(rabbit_stream_coordinator, {streams = #{} :: #{stream_id() => #stream{}},
5860
monitors = #{} :: #{pid() => {stream_id(), monitor_role()}},
5961
%% not used as of v2
60-
listeners = #{} :: #{stream_id() =>
61-
#{pid() := queue_ref()}},
62+
listeners = #{} :: undefined | #{stream_id() =>
63+
#{pid() := queue_ref()}},
6264
%% future extensibility
6365
reserved_1,
6466
reserved_2}).

deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,13 @@ apply_cmd(M, C, S) ->
7575
rabbit_stream_coordinator:apply(M, C, S).
7676

7777
register_listener(Args, S) ->
78-
apply_cmd(#{index => 42}, {register_listener, Args}, S).
78+
apply_cmd(#{index => 42, machine_version => 2}, {register_listener, Args}, S).
7979

8080
eval_listeners(Stream) ->
81-
rabbit_stream_coordinator:eval_listeners(Stream, []).
81+
rabbit_stream_coordinator:eval_listeners(2, Stream, []).
8282

8383
down(Pid, S) ->
84-
apply_cmd(#{index => 42}, {down, Pid, reason}, S).
84+
apply_cmd(#{index => 42, machine_version => 2}, {down, Pid, reason}, S).
8585

8686

8787
listeners(_) ->

0 commit comments

Comments
 (0)