Skip to content

Commit 361f9de

Browse files
Merge pull request #4217 from rabbitmq/rabbitmq-server-4133-monitor-stream-local-member
Monitor stream local member in stream queue
2 parents 8dae5d3 + 46662ec commit 361f9de

File tree

6 files changed

+511
-54
lines changed

6 files changed

+511
-54
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1481,7 +1481,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
14811481
[rabbit_misc:rs(QueueName)]);
14821482
{error, no_local_stream_replica_available} ->
14831483
rabbit_misc:protocol_error(
1484-
resource_error, "~s does not not have a running local replica",
1484+
resource_error, "~s does not have a running local replica",
14851485
[rabbit_misc:rs(QueueName)])
14861486
end;
14871487
{ok, _} ->

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 207 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
%% License, v. 2.0. If a copy of the MPL was not distributed with this
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
5-
%% Copyright (c) 2007-2021 VMware, Inc. or its affiliates. All rights reserved.
5+
%% Copyright (c) 2007-2022 VMware, Inc. or its affiliates. All rights reserved.
66
%%
77

88
-module(rabbit_stream_coordinator).
@@ -23,7 +23,8 @@
2323
-export([recover/0,
2424
add_replica/2,
2525
delete_replica/2,
26-
register_listener/1]).
26+
register_listener/1,
27+
register_local_member_listener/1]).
2728

2829
-export([new_stream/2,
2930
delete_stream/2]).
@@ -39,6 +40,10 @@
3940
-export([log_overview/1]).
4041
-export([replay/1]).
4142

43+
%% for testing and debugging
44+
-export([eval_listeners/3,
45+
state/0]).
46+
4247
-rabbit_boot_step({?MODULE,
4348
[{description, "Restart stream coordinator"},
4449
{mfa, {?MODULE, recover, []}},
@@ -68,8 +73,9 @@
6873
{delete_replica, stream_id(), #{node := node()}} |
6974
{policy_changed, stream_id(), #{queue := amqqueue:amqqueue()}} |
7075
{register_listener, #{pid := pid(),
76+
node := node(),
7177
stream_id := stream_id(),
72-
queue_ref := queue_ref()}} |
78+
type := leader | local_member}} |
7379
{action_failed, stream_id(), #{index := ra:index(),
7480
node := node(),
7581
epoch := osiris:epoch(),
@@ -165,6 +171,15 @@ policy_changed(Q) when ?is_amqqueue(Q) ->
165171
StreamId = maps:get(name, amqqueue:get_type_state(Q)),
166172
process_command({policy_changed, StreamId, #{queue => Q}}).
167173

174+
%% for debugging
175+
state() ->
176+
case ra:local_query({?MODULE, node()}, fun(State) -> State end) of
177+
{ok, {_, Res}, _} ->
178+
Res;
179+
Any ->
180+
Any
181+
end.
182+
168183
local_pid(StreamId) when is_list(StreamId) ->
169184
MFA = {?MODULE, query_local_pid, [StreamId, node()]},
170185
case ra:local_query({?MODULE, node()}, MFA) of
@@ -248,6 +263,16 @@ register_listener(Q) when ?is_amqqueue(Q)->
248263
#{pid => self(),
249264
stream_id => StreamId}}).
250265

266+
-spec register_local_member_listener(amqqueue:amqqueue()) ->
267+
{error, term()} | {ok, ok | stream_not_found, atom() | {atom(), atom()}}.
268+
register_local_member_listener(Q) when ?is_amqqueue(Q) ->
269+
#{name := StreamId} = amqqueue:get_type_state(Q),
270+
process_command({register_listener,
271+
#{pid => self(),
272+
node => node(self()),
273+
stream_id => StreamId,
274+
type => local_member}}).
275+
251276
process_command(Cmd) ->
252277
Servers = ensure_coordinator_started(),
253278
process_command(Servers, Cmd).
@@ -313,7 +338,7 @@ all_coord_members() ->
313338
Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
314339
[{?MODULE, Node} || Node <- [node() | Nodes]].
315340

316-
version() -> 1.
341+
version() -> 2.
317342

318343
which_module(_) ->
319344
?MODULE.
@@ -323,7 +348,8 @@ init(_Conf) ->
323348

324349
-spec apply(map(), command(), state()) ->
325350
{state(), term(), ra_machine:effects()}.
326-
apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
351+
apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
352+
{_CmdTag, StreamId, #{}} = Cmd,
327353
#?MODULE{streams = Streams0,
328354
monitors = Monitors0} = State0) ->
329355
Stream0 = maps:get(StreamId, Streams0, undefined),
@@ -341,10 +367,10 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
341367
case Stream1 of
342368
undefined ->
343369
return(Meta, State0#?MODULE{streams = maps:remove(StreamId, Streams0)},
344-
Reply, inform_listeners_eol(Stream0));
370+
Reply, inform_listeners_eol(MachineVersion, Stream0));
345371
_ ->
346372
{Stream2, Effects0} = evaluate_stream(Meta, Stream1, []),
347-
{Stream3, Effects1} = eval_listeners(Stream2, Effects0),
373+
{Stream3, Effects1} = eval_listeners(MachineVersion, Stream2, Effects0),
348374
{Stream, Effects2} = eval_retention(Meta, Stream3, Effects1),
349375
{Monitors, Effects} = ensure_monitors(Stream, Monitors0, Effects2),
350376
return(Meta,
@@ -354,10 +380,10 @@ apply(#{index := _Idx} = Meta0, {_CmdTag, StreamId, #{}} = Cmd,
354380
Reply ->
355381
return(Meta, State0, Reply, [])
356382
end;
357-
apply(Meta, {down, Pid, Reason} = Cmd,
383+
apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
358384
#?MODULE{streams = Streams0,
359-
listeners = Listeners0,
360-
monitors = Monitors0} = State) ->
385+
monitors = Monitors0,
386+
listeners = StateListeners0} = State) ->
361387

362388
Effects0 = case Reason of
363389
noconnection ->
@@ -366,10 +392,10 @@ apply(Meta, {down, Pid, Reason} = Cmd,
366392
[]
367393
end,
368394
case maps:take(Pid, Monitors0) of
369-
{{StreamId, listener}, Monitors} ->
370-
Listeners = case maps:take(StreamId, Listeners0) of
395+
{{StreamId, listener}, Monitors} when MachineVersion < 2 ->
396+
Listeners = case maps:take(StreamId, StateListeners0) of
371397
error ->
372-
Listeners0;
398+
StateListeners0;
373399
{Pids0, Listeners1} ->
374400
case maps:remove(Pid, Pids0) of
375401
Pids when map_size(Pids) == 0 ->
@@ -380,6 +406,23 @@ apply(Meta, {down, Pid, Reason} = Cmd,
380406
end,
381407
return(Meta, State#?MODULE{listeners = Listeners,
382408
monitors = Monitors}, ok, Effects0);
409+
{{PidStreams, listener}, Monitors} when MachineVersion >= 2 ->
410+
Streams = maps:fold(
411+
fun(StreamId, _, Acc) ->
412+
case Acc of
413+
#{StreamId := Stream = #stream{listeners = Listeners0}} ->
414+
Listeners = maps:fold(fun({P, _} = K, _, A) when P == Pid ->
415+
maps:remove(K, A);
416+
(K, V, A) ->
417+
A#{K => V}
418+
end, #{}, Listeners0),
419+
Acc#{StreamId => Stream#stream{listeners = Listeners}};
420+
_ ->
421+
Acc
422+
end
423+
end, Streams0, PidStreams),
424+
return(Meta, State#?MODULE{streams = Streams,
425+
monitors = Monitors}, ok, Effects0);
383426
{{StreamId, member}, Monitors1} ->
384427
case Streams0 of
385428
#{StreamId := Stream0} ->
@@ -398,19 +441,53 @@ apply(Meta, {down, Pid, Reason} = Cmd,
398441
error ->
399442
return(Meta, State, ok, Effects0)
400443
end;
401-
apply(Meta, {register_listener, #{pid := Pid,
402-
stream_id := StreamId}},
444+
apply(#{machine_version := MachineVersion} = Meta,
445+
{register_listener, #{pid := Pid,
446+
stream_id := StreamId} = Args},
403447
#?MODULE{streams = Streams,
404-
monitors = Monitors0} = State0) ->
405-
case Streams of
406-
#{StreamId := #stream{listeners = Listeners0} = Stream0} ->
448+
monitors = Monitors0} = State0) when MachineVersion =< 1 ->
449+
Type = maps:get(type, Args, leader),
450+
case {Streams, Type} of
451+
{#{StreamId := #stream{listeners = Listeners0} = Stream0}, leader} ->
407452
Stream1 = Stream0#stream{listeners = maps:put(Pid, undefined, Listeners0)},
408-
{Stream, Effects} = eval_listeners(Stream1, []),
453+
{Stream, Effects} = eval_listeners(MachineVersion, Stream1, []),
409454
Monitors = maps:put(Pid, {StreamId, listener}, Monitors0),
410455
return(Meta,
411456
State0#?MODULE{streams = maps:put(StreamId, Stream, Streams),
412457
monitors = Monitors}, ok,
413458
[{monitor, process, Pid} | Effects]);
459+
{#{StreamId := _Stream}, local_member} ->
460+
%% registering a local member listener does not change the state in v1
461+
return(Meta, State0, ok, []);
462+
_ ->
463+
return(Meta, State0, stream_not_found, [])
464+
end;
465+
466+
apply(#{machine_version := MachineVersion} = Meta,
467+
{register_listener, #{pid := Pid,
468+
stream_id := StreamId} = Args},
469+
#?MODULE{streams = Streams,
470+
monitors = Monitors0} = State0) when MachineVersion >= 2 ->
471+
Node = maps:get(node, Args, node(Pid)),
472+
Type = maps:get(type, Args, leader),
473+
474+
case Streams of
475+
#{StreamId := #stream{listeners = Listeners0} = Stream0} ->
476+
{LKey, LValue} =
477+
case Type of
478+
leader ->
479+
{{Pid, leader}, undefined};
480+
local_member ->
481+
{{Pid, member}, {Node, undefined}}
482+
end,
483+
Stream1 = Stream0#stream{listeners = maps:put(LKey, LValue, Listeners0)},
484+
{Stream, Effects} = eval_listeners(MachineVersion, Stream1, []),
485+
{PidStreams, listener} = maps:get(Pid, Monitors0, {#{}, listener}),
486+
Monitors = maps:put(Pid, {PidStreams#{StreamId => ok}, listener}, Monitors0),
487+
return(Meta,
488+
State0#?MODULE{streams = maps:put(StreamId, Stream, Streams),
489+
monitors = Monitors}, ok,
490+
[{monitor, process, Pid} | Effects]);
414491
_ ->
415492
return(Meta, State0, stream_not_found, [])
416493
end;
@@ -438,7 +515,43 @@ apply(Meta, {nodeup, Node} = Cmd,
438515
end, {Streams0, Effects0}, Streams0),
439516
return(Meta, State#?MODULE{monitors = Monitors,
440517
streams = Streams}, ok, Effects);
441-
apply(Meta, {machine_version, _From, _To}, State) ->
518+
apply(Meta, {machine_version, From = 1, To = 2}, State = #?MODULE{streams = Streams0,
519+
monitors = Monitors0}) ->
520+
rabbit_log:info("Stream coordinator machine version changes from ~p to ~p, updating state.",
521+
[From, To]),
522+
%% conversion from old state to new state
523+
%% additional operation: the stream listeners are never collected in the previous version
524+
%% so we'll emit monitors for all listener PIDs
525+
%% this way we'll get the DOWN event for dead listener PIDs and
526+
%% we'll clean the stream listeners the in the DOWN event callback
527+
528+
%% transform the listeners of each stream and accumulate listener PIDs
529+
{Streams1, Listeners} =
530+
maps:fold(fun(S, #stream{listeners = L0} = S0, {StreamAcc, GlobalListAcc}) ->
531+
{L1, GlobalListAcc1} = maps:fold(
532+
fun(ListPid, LeaderPid, {LAcc, GLAcc}) ->
533+
{LAcc#{{ListPid, leader} => LeaderPid},
534+
GLAcc#{ListPid => S}}
535+
end, {#{}, GlobalListAcc}, L0),
536+
{StreamAcc#{S => S0#stream{listeners = L1}}, GlobalListAcc1}
537+
end, {#{}, #{}}, Streams0),
538+
%% accumulate monitors for the map and create the effects to emit the monitors
539+
{ExtraMonitors, Effects} = maps:fold(fun(P, StreamId, {MAcc, EAcc}) ->
540+
{MAcc#{P => {StreamId, listener}},
541+
[{monitor, process, P} | EAcc]}
542+
end, {#{}, []}, Listeners),
543+
Monitors1 = maps:merge(Monitors0, ExtraMonitors),
544+
Monitors2 = maps:fold(fun(P, {StreamId, listener}, Acc) ->
545+
Acc#{P => {#{StreamId => ok}, listener}};
546+
(P, V, Acc) ->
547+
Acc#{P => V}
548+
end, #{}, Monitors1),
549+
return(Meta, State#?MODULE{streams = Streams1,
550+
monitors = Monitors2,
551+
listeners = undefined}, ok, Effects);
552+
apply(Meta, {machine_version, From, To}, State) ->
553+
rabbit_log:info("Stream coordinator machine version changes from ~p to ~p, no state changes required.",
554+
[From, To]),
442555
return(Meta, State, ok, []);
443556
apply(Meta, UnkCmd, State) ->
444557
rabbit_log:debug("~s: unknown command ~W",
@@ -1240,21 +1353,35 @@ update_stream0(#{system_time := _Ts},
12401353
update_stream0(_Meta, _Cmd, undefined) ->
12411354
undefined.
12421355

1243-
inform_listeners_eol(#stream{target = deleted,
1356+
inform_listeners_eol(MachineVersion, #stream{target = deleted,
12441357
listeners = Listeners,
12451358
queue_ref = QRef
1246-
}) ->
1359+
}) when MachineVersion =< 1 ->
12471360
lists:map(fun(Pid) ->
12481361
{send_msg, Pid,
12491362
{queue_event, QRef, eol},
12501363
cast}
12511364
end, maps:keys(Listeners));
1252-
inform_listeners_eol(_) ->
1365+
inform_listeners_eol(MachineVersion,
1366+
#stream{target = deleted,
1367+
listeners = Listeners,
1368+
queue_ref = QRef}) when MachineVersion >= 2 ->
1369+
LPidsMap = maps:fold(fun({P, _}, _V, Acc) ->
1370+
Acc#{P => ok}
1371+
end, #{}, Listeners),
1372+
lists:map(fun(Pid) ->
1373+
{send_msg,
1374+
Pid,
1375+
{queue_event, QRef, eol},
1376+
cast}
1377+
end, maps:keys(LPidsMap));
1378+
inform_listeners_eol(_, _) ->
12531379
[].
12541380

1255-
eval_listeners(#stream{listeners = Listeners0,
1256-
queue_ref = QRef,
1257-
members = Members} = Stream, Effects0) ->
1381+
eval_listeners(MachineVersion, #stream{listeners = Listeners0,
1382+
queue_ref = QRef,
1383+
members = Members} = Stream, Effects0)
1384+
when MachineVersion =< 1 ->
12581385
case find_leader(Members) of
12591386
{#member{state = {running, _, LeaderPid}}, _} ->
12601387
%% a leader is running, check all listeners to see if any of them
@@ -1273,7 +1400,60 @@ eval_listeners(#stream{listeners = Listeners0,
12731400
{Stream#stream{listeners = Listeners}, Effects};
12741401
_ ->
12751402
{Stream, Effects0}
1276-
end.
1403+
end;
1404+
eval_listeners(MachineVersion, #stream{listeners = Listeners0,
1405+
queue_ref = QRef,
1406+
members = Members} = Stream0, Effects0) when MachineVersion >= 2 ->
1407+
%% Iterating over stream listeners.
1408+
%% Returning the new map of listeners and the effects (notification of changes)
1409+
{Listeners1, Effects1} =
1410+
maps:fold(fun({P, leader}, ListLPid0, {Lsts0, Effs0}) ->
1411+
%% iterating over member to find the leader
1412+
{ListLPid1, Effs1} =
1413+
maps:fold(fun(_N, #member{state = {running, _, LeaderPid},
1414+
role = {writer, _},
1415+
target = T}, A)
1416+
when ListLPid0 == LeaderPid, T /= deleted ->
1417+
%% it's the leader, same PID, nothing to do
1418+
A;
1419+
(_N, #member{state = {running, _, LeaderPid},
1420+
role = {writer, _},
1421+
target = T}, {_, Efs})
1422+
when T /= deleted ->
1423+
%% it's the leader, not same PID, assign the new leader, add effect
1424+
{LeaderPid, [{send_msg, P,
1425+
{queue_event, QRef,
1426+
{stream_leader_change, LeaderPid}},
1427+
cast} | Efs]};
1428+
(_N, _M, Acc) ->
1429+
%% it's not the leader, nothing to do
1430+
Acc
1431+
end, {ListLPid0, Effs0}, Members),
1432+
{Lsts0#{{P, leader} => ListLPid1}, Effs1};
1433+
({P, member}, {ListNode, ListMPid0}, {Lsts0, Effs0}) ->
1434+
%% listening to a member on a given node
1435+
%% iterating over the members to find the member on this node
1436+
{ListMPid1, Effs1} =
1437+
maps:fold(fun(MNode, #member{state = {running, _, MemberPid}, target = T}, Acc)
1438+
when ListMPid0 == MemberPid, ListNode == MNode, T /= deleted ->
1439+
%% it's the local member of this listener
1440+
%% it has not changed, nothing to do
1441+
Acc;
1442+
(MNode, #member{state = {running, _, MemberPid}, target = T}, {_, Efs})
1443+
when ListNode == MNode, T /= deleted ->
1444+
%% it's the local member of this listener
1445+
%% the PID is not the same, updating it in the listener, add effect
1446+
{MemberPid, [{send_msg, P,
1447+
{queue_event, QRef,
1448+
{stream_local_member_change, MemberPid}},
1449+
cast} | Efs]};
1450+
(_N, _M, Acc) ->
1451+
%% not a replica, nothing to do
1452+
Acc
1453+
end, {ListMPid0, Effs0}, Members),
1454+
{Lsts0#{{P, member} => {ListNode, ListMPid1}}, Effs1}
1455+
end, {Listeners0, Effects0}, Listeners0),
1456+
{Stream0#stream{listeners = Listeners1}, Effects1}.
12771457

12781458
eval_retention(#{index := Idx} = Meta,
12791459
#stream{conf = #{retention := Ret} = Conf,

0 commit comments

Comments
 (0)