Skip to content

Commit 7d2dafb

Browse files
acogoluegnesmergify-bot
authored andcommitted
Do not change state for local member listener in v1
In stream coordinator. A v2 node, even if running v1, can call the function to register a local member when an AMQP listener is registered, so this commit makes such a call a no-op. References #4133 (cherry picked from commit 6133416) (cherry picked from commit aed576f)
1 parent e798ebc commit 7d2dafb

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -442,18 +442,22 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
442442
end;
443443
apply(#{machine_version := MachineVersion} = Meta,
444444
{register_listener, #{pid := Pid,
445-
stream_id := StreamId}},
445+
stream_id := StreamId} = Args},
446446
#?MODULE{streams = Streams,
447447
monitors = Monitors0} = State0) when MachineVersion =< 1 ->
448-
case Streams of
449-
#{StreamId := #stream{listeners = Listeners0} = Stream0} ->
448+
Type = maps:get(type, Args, leader),
449+
case {Streams, Type} of
450+
{#{StreamId := #stream{listeners = Listeners0} = Stream0}, leader} ->
450451
Stream1 = Stream0#stream{listeners = maps:put(Pid, undefined, Listeners0)},
451452
{Stream, Effects} = eval_listeners(MachineVersion, Stream1, []),
452453
Monitors = maps:put(Pid, {StreamId, listener}, Monitors0),
453454
return(Meta,
454455
State0#?MODULE{streams = maps:put(StreamId, Stream, Streams),
455456
monitors = Monitors}, ok,
456457
[{monitor, process, Pid} | Effects]);
458+
{#{StreamId := _Stream}, local_member} ->
459+
%% registering a local member listener does not change the state in v1
460+
return(Meta, State0, ok, []);
457461
_ ->
458462
return(Meta, State0, stream_not_found, [])
459463
end;

0 commit comments

Comments
 (0)