@@ -529,20 +529,35 @@ apply(Meta, {machine_version, From = 1, To = 2}, State = #?MODULE{streams = Stre
529
529
monitors = Monitors0 }) ->
530
530
rabbit_log :info (" Stream coordinator machine version changes from ~p to ~p , updating state." ,
531
531
[From , To ]),
532
- Streams1 = maps :fold (fun (S , # stream {listeners = L0 } = S0 , StreamAcc ) ->
533
- L1 = maps :fold (fun (ListPid , LeaderPid , LAcc ) ->
534
- LAcc #{{ListPid , leader } => LeaderPid }
535
- end , #{}, L0 ),
536
- StreamAcc #{S => S0 # stream {listeners = L1 }}
537
- end , #{}, Streams0 ),
538
- Monitors1 = maps :fold (fun (P , {StreamId , listener }, Acc ) ->
532
+ % % conversion from old state to new state
533
+ % % additional operation: the stream listeners are never collected in the previous version
534
+ % % so we'll emit monitors for all listener PIDs
535
+ % % this way we'll get the DOWN event for dead listener PIDs and
536
+ % % we'll clean the stream listeners the in the DOWN event callback
537
+
538
+ % % transform the listeners of each stream and accumulate listener PIDs
539
+ {Streams1 , Listeners } =
540
+ maps :fold (fun (S , # stream {listeners = L0 } = S0 , {StreamAcc , GlobalListAcc }) ->
541
+ {L1 , GlobalListAcc1 } = maps :fold (fun (ListPid , LeaderPid , {LAcc , GLAcc }) ->
542
+ {LAcc #{{ListPid , leader } => LeaderPid },
543
+ GLAcc #{ListPid => S }}
544
+ end , {#{}, GlobalListAcc }, L0 ),
545
+ {StreamAcc #{S => S0 # stream {listeners = L1 }}, GlobalListAcc1 }
546
+ end , {#{}, #{}}, Streams0 ),
547
+ % % accumulate monitors for the map and create the effects to emit the monitors
548
+ {ExtraMonitors , Effects } = maps :fold (fun (P , StreamId , {MAcc , EAcc }) ->
549
+ {MAcc #{P => {StreamId , listener }},
550
+ [{monitor , process , P } | EAcc ]}
551
+ end , {#{}, []}, Listeners ),
552
+ Monitors1 = maps :merge (Monitors0 , ExtraMonitors ),
553
+ Monitors2 = maps :fold (fun (P , {StreamId , listener }, Acc ) ->
539
554
Acc #{P => {#{StreamId => ok }, listener }};
540
555
(P , V , Acc ) ->
541
556
Acc #{P => V }
542
- end , #{}, Monitors0 ),
557
+ end , #{}, Monitors1 ),
543
558
return (Meta , State #? MODULE {streams = Streams1 ,
544
- monitors = Monitors1 ,
545
- listeners = undefined }, ok , [] );
559
+ monitors = Monitors2 ,
560
+ listeners = undefined }, ok , Effects );
546
561
apply (Meta , {machine_version , From , To }, State ) ->
547
562
rabbit_log :info (" Stream coordinator machine version changes from ~p to ~p , no state changes required." ,
548
563
[From , To ]),
0 commit comments