@@ -392,7 +392,7 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
392
392
[]
393
393
end ,
394
394
case maps :take (Pid , Monitors0 ) of
395
- {{StreamId , listener }, Monitors } when MachineVersion =< 1 ->
395
+ {{StreamId , listener }, Monitors } when MachineVersion < 2 ->
396
396
Listeners = case maps :take (StreamId , StateListeners0 ) of
397
397
error ->
398
398
StateListeners0 ;
@@ -407,20 +407,20 @@ apply(#{machine_version := MachineVersion} = Meta, {down, Pid, Reason} = Cmd,
407
407
return (Meta , State #? MODULE {listeners = Listeners ,
408
408
monitors = Monitors }, ok , Effects0 );
409
409
{{PidStreams , listener }, Monitors } when MachineVersion >= 2 ->
410
- Streams = maps :fold (fun (StreamId , _ , Acc ) ->
410
+ Streams = maps :fold (
411
+ fun (StreamId , _ , Acc ) ->
411
412
case Acc of
412
413
#{StreamId := Stream = # stream {listeners = Listeners0 }} ->
413
414
Listeners = maps :fold (fun ({P , _ } = K , _ , A ) when P == Pid ->
414
415
maps :remove (K , A );
415
416
(K , V , A ) ->
416
417
A #{K => V }
417
- end , #{}, Listeners0
418
- ),
418
+ end , #{}, Listeners0 ),
419
419
Acc #{StreamId => Stream # stream {listeners = Listeners }};
420
420
_ ->
421
421
Acc
422
422
end
423
- end , Streams0 , PidStreams ),
423
+ end , Streams0 , PidStreams ),
424
424
return (Meta , State #? MODULE {streams = Streams ,
425
425
monitors = Monitors }, ok , Effects0 );
426
426
{{StreamId , member }, Monitors1 } ->
@@ -468,18 +468,8 @@ apply(#{machine_version := MachineVersion} = Meta,
468
468
stream_id := StreamId } = Args },
469
469
#? MODULE {streams = Streams ,
470
470
monitors = Monitors0 } = State0 ) when MachineVersion >= 2 ->
471
- Node = case Args of
472
- #{node := N } ->
473
- N ;
474
- _ ->
475
- node (Pid )
476
- end ,
477
- Type = case Args of
478
- #{type := T } ->
479
- T ;
480
- _ ->
481
- leader
482
- end ,
471
+ Node = maps :get (node , Args , node (Pid )),
472
+ Type = maps :get (type , Args , leader ),
483
473
484
474
case Streams of
485
475
#{StreamId := # stream {listeners = Listeners0 } = Stream0 } ->
@@ -1369,15 +1359,16 @@ inform_listeners_eol(MachineVersion, #stream{target = deleted,
1369
1359
{queue_event , QRef , eol },
1370
1360
cast }
1371
1361
end , maps :keys (Listeners ));
1372
- inform_listeners_eol (MachineVersion , # stream { target = deleted ,
1373
- listeners = Listeners ,
1374
- queue_ref = QRef
1375
- }) when MachineVersion >= 2 ->
1362
+ inform_listeners_eol (MachineVersion ,
1363
+ # stream { target = deleted ,
1364
+ listeners = Listeners ,
1365
+ queue_ref = QRef }) when MachineVersion >= 2 ->
1376
1366
LPidsMap = maps :fold (fun ({P , _ }, _V , Acc ) ->
1377
- Acc #{P => ok }
1367
+ Acc #{P => ok }
1378
1368
end , #{}, Listeners ),
1379
1369
lists :map (fun (Pid ) ->
1380
- {send_msg , Pid ,
1370
+ {send_msg ,
1371
+ Pid ,
1381
1372
{queue_event , QRef , eol },
1382
1373
cast }
1383
1374
end , maps :keys (LPidsMap ));
@@ -1416,7 +1407,9 @@ eval_listeners(MachineVersion, #stream{listeners = Listeners0,
1416
1407
maps :fold (fun ({P , leader }, ListLPid0 , {Lsts0 , Effs0 }) ->
1417
1408
% % iterating over member to find the leader
1418
1409
{ListLPid1 , Effs1 } =
1419
- maps :fold (fun (_N , # member {state = {running , _ , LeaderPid }, role = {writer , _ }, target = T }, A )
1410
+ maps :fold (fun (_N , # member {state = {running , _ , LeaderPid },
1411
+ role = {writer , _ },
1412
+ target = T }, A )
1420
1413
when ListLPid0 == LeaderPid , T /= deleted ->
1421
1414
% % it's the leader, same PID, nothing to do
1422
1415
A ;
0 commit comments