@@ -646,8 +646,8 @@ phase_start_replica(StreamId, #{epoch := Epoch,
646
646
fun () ->
647
647
try osiris_replica :start (Node , Conf0 ) of
648
648
{ok , Pid } ->
649
- rabbit_log :debug (" ~s : ~s : replica started on ~s in ~b pid ~w " ,
650
- [? MODULE , StreamId , Node , Epoch , Pid ]),
649
+ rabbit_log :info (" ~s : ~s : replica started on ~s in ~b pid ~w " ,
650
+ [? MODULE , StreamId , Node , Epoch , Pid ]),
651
651
send_self_command ({member_started , StreamId ,
652
652
Args #{pid => Pid }});
653
653
{error , already_present } ->
@@ -662,13 +662,13 @@ phase_start_replica(StreamId, #{epoch := Epoch,
662
662
send_self_command ({member_started , StreamId ,
663
663
Args #{pid => Pid }});
664
664
{error , Reason } ->
665
+ rabbit_log :warning (" ~s : Error while starting replica for ~s on node ~s in ~b : ~W " ,
666
+ [? MODULE , maps :get (name , Conf0 ), Node , Epoch , Reason , 10 ]),
665
667
maybe_sleep (Reason ),
666
- rabbit_log :warning (" ~s : Error while starting replica for ~s : ~W " ,
667
- [? MODULE , maps :get (name , Conf0 ), Reason , 10 ]),
668
668
send_action_failed (StreamId , starting , Args )
669
669
catch _ :Error ->
670
- rabbit_log :warning (" ~s : Error while starting replica for ~s : ~W " ,
671
- [? MODULE , maps :get (name , Conf0 ), Error , 10 ]),
670
+ rabbit_log :warning (" ~s : Error while starting replica for ~s on node ~s in ~b : ~W " ,
671
+ [? MODULE , maps :get (name , Conf0 ), Node , Epoch , Error , 10 ]),
672
672
maybe_sleep (Error ),
673
673
send_action_failed (StreamId , starting , Args )
674
674
end
@@ -710,25 +710,25 @@ phase_stop_member(StreamId, #{node := Node,
710
710
[? MODULE , StreamId , Node , Epoch , Tail ]),
711
711
send_self_command ({member_stopped , StreamId , Arg });
712
712
Err ->
713
- rabbit_log :warning (" Stream coordinator failed to get tail
714
- of member ~s ~w Error: ~w " ,
715
- [ StreamId , Node , Err ] ),
713
+ rabbit_log :warning (" ~s : failed to get tail of member ~s on ~s in ~b Error: ~w " ,
714
+ [ ? MODULE , StreamId , Node , Epoch , Err ]) ,
715
+ maybe_sleep ( Err ),
716
716
send_action_failed (StreamId , stopping , Arg0 )
717
717
catch _ :Err ->
718
- rabbit_log :warning (" Stream coordinator failed to get
719
- tail of member ~s ~w Error: ~w " ,
720
- [ StreamId , Node , Err ] ),
718
+ rabbit_log :warning (" ~s : failed to get tail of member ~s on ~s in ~b Error: ~w " ,
719
+ [ ? MODULE , StreamId , Node , Epoch , Err ]) ,
720
+ maybe_sleep ( Err ),
721
721
send_action_failed (StreamId , stopping , Arg0 )
722
722
end ;
723
723
Err ->
724
- rabbit_log :warning (" Stream coordinator failed to stop
725
- member ~s ~w Error: ~w " ,
726
- [StreamId , Node , Err ]),
724
+ rabbit_log :warning (" ~s : failed to stop "
725
+ " member ~s ~w Error: ~w " ,
726
+ [? MODULE , StreamId , Node , Err ]),
727
+ maybe_sleep (Err ),
727
728
send_action_failed (StreamId , stopping , Arg0 )
728
729
catch _ :Err ->
729
- rabbit_log :warning (" Stream coordinator failed to stop
730
- member ~s ~w Error: ~w " ,
731
- [StreamId , Node , Err ]),
730
+ rabbit_log :warning (" ~s : failed to stop member ~s ~w Error: ~w " ,
731
+ [? MODULE , StreamId , Node , Err ]),
732
732
maybe_sleep (Err ),
733
733
send_action_failed (StreamId , stopping , Arg0 )
734
734
end
@@ -740,19 +740,18 @@ phase_start_writer(StreamId, #{epoch := Epoch,
740
740
try osiris_writer :start (Conf ) of
741
741
{ok , Pid } ->
742
742
Args = Args0 #{epoch => Epoch , pid => Pid },
743
- rabbit_log :warning (" ~s : started writer ~s on ~w in ~b " ,
744
- [? MODULE , StreamId , Node , Epoch ]),
743
+ rabbit_log :info (" ~s : started writer ~s on ~w in ~b " ,
744
+ [? MODULE , StreamId , Node , Epoch ]),
745
745
send_self_command ({member_started , StreamId , Args });
746
746
Err ->
747
- % % no sleep for writer failures
748
- rabbit_log : warning ( " ~s : failed to start
749
- writer ~s ~w Error: ~w " ,
750
- [? MODULE , StreamId , Node , Err ]),
747
+ % % no sleep for writer failures as we want to trigger a new
748
+ % % election asap
749
+ rabbit_log : warning ( " ~s : failed to start writer ~s on ~s in ~b Error: ~w " ,
750
+ [? MODULE , StreamId , Node , Epoch , Err ]),
751
751
send_action_failed (StreamId , starting , Args0 )
752
752
catch _ :Err ->
753
- rabbit_log :warning (" ~s : failed to start
754
- writer ~s ~w Error: ~w " ,
755
- [? MODULE , StreamId , Node , Err ]),
753
+ rabbit_log :warning (" ~s : failed to start writer ~s on ~s in ~b Error: ~w " ,
754
+ [? MODULE , StreamId , Node , Epoch , Err ]),
756
755
send_action_failed (StreamId , starting , Args0 )
757
756
end
758
757
end .
@@ -763,14 +762,13 @@ phase_update_retention(StreamId, #{pid := Pid,
763
762
try osiris :update_retention (Pid , Retention ) of
764
763
ok ->
765
764
send_self_command ({retention_updated , StreamId , Args });
766
- {error , Err } ->
767
- rabbit_log :warning (" ~s : failed to update
768
- retention for ~s ~w Error: ~w " ,
769
- [ ? MODULE , StreamId , node ( Pid ), Err ] ),
765
+ {error , Reason } = Err ->
766
+ rabbit_log :warning (" ~s : failed to update retention for ~s ~w Reason: ~w " ,
767
+ [ ? MODULE , StreamId , node ( Pid ), Reason ]) ,
768
+ maybe_sleep ( Err ),
770
769
send_action_failed (StreamId , update_retention , Args )
771
770
catch _ :Err ->
772
- rabbit_log :warning (" ~s : failed to update
773
- retention for ~s ~w Error: ~w " ,
771
+ rabbit_log :warning (" ~s : failed to update retention for ~s ~w Error: ~w " ,
774
772
[? MODULE , StreamId , node (Pid ), Err ]),
775
773
maybe_sleep (Err ),
776
774
send_action_failed (StreamId , update_retention , Args )
@@ -1512,9 +1510,13 @@ select_leader(Offsets) ->
1512
1510
Node .
1513
1511
1514
1512
maybe_sleep ({{nodedown , _ }, _ }) ->
1515
- timer :sleep (5000 );
1513
+ timer :sleep (10000 );
1516
1514
maybe_sleep ({noproc , _ }) ->
1517
1515
timer :sleep (5000 );
1516
+ maybe_sleep ({error , nodedown }) ->
1517
+ timer :sleep (5000 );
1518
+ maybe_sleep ({error , _ }) ->
1519
+ timer :sleep (5000 );
1518
1520
maybe_sleep (_ ) ->
1519
1521
ok .
1520
1522
0 commit comments