@@ -392,7 +392,7 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
392
392
[]
393
393
end ,
394
394
case maps :take (Pid , Monitors0 ) of
395
- {{StreamId , listener }, Monitors } when MachineVersion =< 1 ->
395
+ {{StreamId , listener }, Monitors } when MachineVersion < 2 ->
396
396
Listeners = case maps :take (StreamId , StateListeners0 ) of
397
397
error ->
398
398
StateListeners0 ;
@@ -407,20 +407,20 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
407
407
return (Meta , State #? MODULE {listeners = Listeners ,
408
408
monitors = Monitors }, ok , Effects0 );
409
409
{{PidStreams , listener }, Monitors } when MachineVersion >= 2 ->
410
- Streams = maps :fold (fun (StreamId , _ , Acc ) ->
410
+ Streams = maps :fold (
411
+ fun (StreamId , _ , Acc ) ->
411
412
case Acc of
412
413
#{StreamId := Stream = # stream {listeners = Listeners0 }} ->
413
414
Listeners = maps :fold (fun ({P , _ } = K , _ , A ) when P == Pid ->
414
415
maps :remove (K , A );
415
416
(K , V , A ) ->
416
417
A #{K => V }
417
- end , #{}, Listeners0
418
- ),
418
+ end , #{}, Listeners0 ),
419
419
Acc #{StreamId => Stream # stream {listeners = Listeners }};
420
420
_ ->
421
421
Acc
422
422
end
423
- end , Streams0 , PidStreams ),
423
+ end , Streams0 , PidStreams ),
424
424
return (Meta , State #? MODULE {streams = Streams ,
425
425
monitors = Monitors }, ok , Effects0 );
426
426
{{StreamId , member }, Monitors1 } ->
@@ -468,18 +468,8 @@ apply(#{machine_version := MachineVersion} = Meta,
468
468
stream_id := StreamId } = Args },
469
469
#? MODULE {streams = Streams ,
470
470
monitors = Monitors0 } = State0 ) when MachineVersion >= 2 ->
471
- Node = case Args of
472
- #{node := N } ->
473
- N ;
474
- _ ->
475
- node (Pid )
476
- end ,
477
- Type = case Args of
478
- #{type := T } ->
479
- T ;
480
- _ ->
481
- leader
482
- end ,
471
+ Node = maps :get (node , Args , node (Pid )),
472
+ Type = maps :get (type , Args , leader ),
483
473
484
474
case Streams of
485
475
#{StreamId := # stream {listeners = Listeners0 } = Stream0 } ->
@@ -1371,15 +1361,16 @@ inform_listeners_eol(MachineVersion, #stream{target = deleted,
1371
1361
{queue_event , QRef , eol },
1372
1362
cast }
1373
1363
end , maps :keys (Listeners ));
1374
- inform_listeners_eol (MachineVersion , # stream { target = deleted ,
1375
- listeners = Listeners ,
1376
- queue_ref = QRef
1377
- }) when MachineVersion >= 2 ->
1364
+ inform_listeners_eol (MachineVersion ,
1365
+ # stream { target = deleted ,
1366
+ listeners = Listeners ,
1367
+ queue_ref = QRef }) when MachineVersion >= 2 ->
1378
1368
LPidsMap = maps :fold (fun ({P , _ }, _V , Acc ) ->
1379
- Acc #{P => ok }
1369
+ Acc #{P => ok }
1380
1370
end , #{}, Listeners ),
1381
1371
lists :map (fun (Pid ) ->
1382
- {send_msg , Pid ,
1372
+ {send_msg ,
1373
+ Pid ,
1383
1374
{queue_event , QRef , eol },
1384
1375
cast }
1385
1376
end , maps :keys (LPidsMap ));
@@ -1418,7 +1409,9 @@ eval_listeners(MachineVersion, #stream{listeners = Listeners0,
1418
1409
maps :fold (fun ({P , leader }, ListLPid0 , {Lsts0 , Effs0 }) ->
1419
1410
% % iterating over member to find the leader
1420
1411
{ListLPid1 , Effs1 } =
1421
- maps :fold (fun (_N , # member {state = {running , _ , LeaderPid }, role = {writer , _ }, target = T }, A )
1412
+ maps :fold (fun (_N , # member {state = {running , _ , LeaderPid },
1413
+ role = {writer , _ },
1414
+ target = T }, A )
1422
1415
when ListLPid0 == LeaderPid , T /= deleted ->
1423
1416
% % it's the leader, same PID, nothing to do
1424
1417
A ;
0 commit comments