@@ -30,6 +30,7 @@ all_tests() ->
30
30
delete_stream ,
31
31
delete_replica_leader ,
32
32
delete_replica ,
33
+ delete_two_replicas ,
33
34
delete_replica_2 ,
34
35
leader_start_failed
35
36
].
@@ -907,6 +908,79 @@ delete_replica(_) ->
907
908
{S4 , []} = evaluate_stream (meta (? LINE ), S4 , []),
908
909
ok .
909
910
911
+ delete_two_replicas (_ ) ->
912
+ % % There was a race condition on the rabbit_stream_queue_SUITE testcases delete_replica
913
+ % % and delete_last_replica. A replica can sometimes restart after deletion as it transitions
914
+ % % again to running state. This test reproduces it. See `rabbit_stream_coordinator.erl`
915
+ % % line 1039, the processing of `member_stopped` command. The new function `update_target`
916
+ % % ensures this transition never happens.
917
+ % % This test reproduces the trace that leads to that error.
918
+ E = 1 ,
919
+ StreamId = atom_to_list (? FUNCTION_NAME ),
920
+ LeaderPid = fake_pid (n1 ),
921
+ [Replica1 , Replica2 ] = [fake_pid (n2 ), fake_pid (n3 )],
922
+ N1 = node (LeaderPid ),
923
+ N2 = node (Replica1 ),
924
+ % % this is to be added
925
+ N3 = node (Replica2 ),
926
+
927
+ S0 = started_stream (StreamId , LeaderPid , [Replica1 , Replica2 ]),
928
+ From = {self (), make_ref ()},
929
+ Idx1 = ? LINE ,
930
+ Meta1 = (meta (Idx1 ))#{from => From },
931
+ S1 = update_stream (Meta1 , {delete_replica , StreamId , #{node => N3 }}, S0 ),
932
+ ? assertMatch (# stream {target = running ,
933
+ nodes = [N1 , N2 ],
934
+ members = #{N1 := # member {target = stopped ,
935
+ current = undefined ,
936
+ state = {running , _ , _ }},
937
+ N2 := # member {target = stopped ,
938
+ current = undefined ,
939
+ state = {running , _ , _ }},
940
+ N3 := # member {target = deleted ,
941
+ current = undefined ,
942
+ state = {running , _ , _ }}
943
+ }},
944
+ S1 ),
945
+ {S2 , Actions1 } = evaluate_stream (Meta1 , S1 , []),
946
+ ? assertMatch ([{aux , {delete_member , StreamId , #{node := N3 }, _ }},
947
+ {aux , {stop , StreamId , #{node := N1 , epoch := E }, _ }},
948
+ {aux , {stop , StreamId , #{node := N2 , epoch := E }, _ }}],
949
+ lists :sort (Actions1 )),
950
+
951
+ Idx2 = ? LINE ,
952
+ Meta2 = (meta (Idx2 ))#{from => From },
953
+ S3 = update_stream (Meta2 , {delete_replica , StreamId , #{node => N2 }}, S2 ),
954
+ ? assertMatch (# stream {target = running ,
955
+ nodes = [N1 ],
956
+ members = #{N1 := # member {target = stopped ,
957
+ current = {stopping , _ },
958
+ state = {running , _ , _ }},
959
+ N2 := # member {target = deleted ,
960
+ current = {stopping , _ },
961
+ state = {running , _ , _ }},
962
+ N3 := # member {target = deleted ,
963
+ current = {deleting , _ },
964
+ state = {running , _ , _ }}
965
+ }},
966
+ S3 ),
967
+ {S4 , []} = evaluate_stream (Meta2 , S3 , []),
968
+
969
+
970
+ Idx3 = ? LINE ,
971
+ S5 = update_stream (meta (Idx3 ),
972
+ {member_stopped , StreamId , #{node => N2 ,
973
+ index => Idx1 ,
974
+ epoch => E ,
975
+ tail => {E , 101 }}},
976
+ S4 ),
977
+ % % A deleted member can never transition to another target.
978
+ ? assertMatch (# stream {members = #{N2 := # member {target = deleted ,
979
+ current = undefined ,
980
+ state = {stopped , _ , _ }}}},
981
+ S5 ),
982
+ ok .
983
+
910
984
delete_replica_2 (_ ) ->
911
985
% % replica is deleted before it has been fully started
912
986
E = 1 ,
0 commit comments