184
184
% % command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
185
185
credit_mode = simple_prefetch :: credit_mode (), % part of snapshot data
186
186
lifetime = once :: once | auto ,
187
- suspected_down = false :: 'cancel' | boolean ()
187
+ status = up :: up | suspected_down | cancelled
188
188
}).
189
189
190
190
- type consumer () :: # consumer {}.
193
193
{next_seqno = 1 :: msg_seqno (),
194
194
% out of order enqueues - sorted list
195
195
pending = [] :: [{msg_seqno (), ra_index (), raw_msg ()}],
196
- suspected_down = false :: boolean ()
196
+ status = up :: up | suspected_down
197
197
}).
198
198
199
199
- record (state ,
@@ -463,27 +463,30 @@ apply(_, {down, ConsumerPid, noconnection},
463
463
% mark all consumers and enqueuers as suspected down
464
464
% and monitor the node so that we can find out the final state of the
465
465
% process at some later point
466
- {Cons , State , Effects1 } = maps :fold (
467
- fun ({_ , P } = K ,
468
- # consumer {checked_out = Checked0 } = C ,
469
- {Co , St0 , Eff }) when (node (P ) =:= Node ) and
470
- (C # consumer .suspected_down =/= cancel )->
471
- St = return_all (St0 , Checked0 ),
472
- Credit = increase_credit (C , maps :size (Checked0 )),
473
- Eff1 = ConsumerUpdateActiveFun (St , K , C , false , suspected_down , Eff ),
474
- {maps :put (K , C # consumer {suspected_down = true ,
475
- credit = Credit ,
476
- checked_out = #{}}, Co ),
477
- St , Eff1 };
478
- (K , C , {Co , St , Eff }) ->
479
- {maps :put (K , C , Co ), St , Eff }
480
- end , {#{}, State0 , []}, Cons0 ),
466
+ {Cons , State , Effects1 } =
467
+ maps :fold (fun ({_ , P } = K ,
468
+ # consumer {checked_out = Checked0 } = C ,
469
+ {Co , St0 , Eff }) when (node (P ) =:= Node ) and
470
+ (C # consumer .status =/= cancelled )->
471
+ St = return_all (St0 , Checked0 ),
472
+ Credit = increase_credit (C , maps :size (Checked0 )),
473
+ Eff1 = ConsumerUpdateActiveFun (St , K , C , false ,
474
+ suspected_down , Eff ),
475
+ {maps :put (K ,
476
+ C # consumer {status = suspected_down ,
477
+ credit = Credit ,
478
+ checked_out = #{}}, Co ),
479
+ St , Eff1 };
480
+ (K , C , {Co , St , Eff }) ->
481
+ {maps :put (K , C , Co ), St , Eff }
482
+ end , {#{}, State0 , []}, Cons0 ),
481
483
Enqs = maps :map (fun (P , E ) when node (P ) =:= Node ->
482
- E # enqueuer {suspected_down = true };
484
+ E # enqueuer {status = suspected_down };
483
485
(_ , E ) -> E
484
486
end , Enqs0 ),
485
487
% mark waiting consumers as suspected if necessary
486
- WaitingConsumers = maybe_mark_suspect_waiting_consumers (Node , State0 , true ),
488
+ WaitingConsumers = update_waiting_consumer_status (Node , State0 ,
489
+ suspected_down ),
487
490
488
491
Effects2 = case maps :size (Cons ) of
489
492
0 ->
@@ -516,8 +519,8 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
516
519
end , {State2 , Effects1 }, DownConsumers ),
517
520
checkout (Meta , State , Effects );
518
521
apply (Meta , {nodeup , Node }, # state {consumers = Cons0 ,
519
- enqueuers = Enqs0 ,
520
- service_queue = SQ0 } = State0 ) ->
522
+ enqueuers = Enqs0 ,
523
+ service_queue = SQ0 } = State0 ) ->
521
524
% % A node we are monitoring has come back.
522
525
% % If we have suspected any processes of being
523
526
% % down we should now re-issue the monitors for them to detect if they're
@@ -526,20 +529,20 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
526
529
|| P <- suspected_pids_for (Node , State0 )],
527
530
528
531
% un-suspect waiting consumers when necessary
529
- WaitingConsumers = maybe_mark_suspect_waiting_consumers (Node , State0 ,
530
- false ),
532
+ WaitingConsumers = update_waiting_consumer_status (Node , State0 , up ),
531
533
532
534
Enqs1 = maps :map (fun (P , E ) when node (P ) =:= Node ->
533
- E # enqueuer {suspected_down = false };
535
+ E # enqueuer {status = up };
534
536
(_ , E ) -> E
535
537
end , Enqs0 ),
536
538
ConsumerUpdateActiveFun = consumer_active_flag_update_function (State0 ),
537
539
{Cons1 , SQ , Effects } =
538
540
maps :fold (fun ({_ , P } = ConsumerId , C , {CAcc , SQAcc , EAcc })
539
- when (node (P ) =:= Node ) and (C # consumer .suspected_down =/= cancel ) ->
541
+ when (node (P ) =:= Node ) and
542
+ (C # consumer .status =/= cancelled ) ->
540
543
EAcc1 = ConsumerUpdateActiveFun (State0 , ConsumerId , C , true , up , EAcc ),
541
544
update_or_remove_sub (
542
- ConsumerId , C # consumer {suspected_down = false },
545
+ ConsumerId , C # consumer {status = up },
543
546
CAcc , SQAcc , EAcc1 );
544
547
(_ , _ , Acc ) ->
545
548
Acc
@@ -585,27 +588,27 @@ handle_waiting_consumer_down(Pid,
585
588
State = State0 # state {waiting_consumers = StillUp },
586
589
{Effects , State }.
587
590
588
- maybe_mark_suspect_waiting_consumers (_Node , # state {consumer_strategy = default },
589
- _Suspected ) ->
591
+ update_waiting_consumer_status (_Node , # state {consumer_strategy = default },
592
+ _Status ) ->
590
593
[];
591
- maybe_mark_suspect_waiting_consumers (_Node ,
592
- # state {consumer_strategy = single_active ,
593
- waiting_consumers = []},
594
- _Suspected ) ->
594
+ update_waiting_consumer_status (_Node ,
595
+ # state {consumer_strategy = single_active ,
596
+ waiting_consumers = []},
597
+ _Status ) ->
595
598
[];
596
- maybe_mark_suspect_waiting_consumers (Node ,
597
- # state {consumer_strategy = single_active ,
598
- waiting_consumers = WaitingConsumers },
599
- Suspected ) ->
599
+ update_waiting_consumer_status (Node ,
600
+ # state {consumer_strategy = single_active ,
601
+ waiting_consumers = WaitingConsumers },
602
+ Status ) ->
600
603
[begin
601
604
case node (P ) of
602
605
Node ->
603
- {ConsumerId , Consumer # consumer {suspected_down = Suspected }};
606
+ {ConsumerId , Consumer # consumer {status = Status }};
604
607
_ ->
605
608
{ConsumerId , Consumer }
606
609
end
607
610
end || {{_ , P } = ConsumerId , Consumer } <- WaitingConsumers ,
608
- Consumer # consumer .suspected_down =/= cancel ].
611
+ Consumer # consumer .status =/= cancelled ].
609
612
610
613
- spec state_enter (ra_server :ra_state (), state ()) -> ra_machine :effects ().
611
614
state_enter (leader , # state {consumers = Cons ,
@@ -738,12 +741,13 @@ query_consumers(#state{consumers = Consumers,
738
741
consumer_strategy = ConsumerStrategy } = State ) ->
739
742
ActiveActivityStatusFun = case ConsumerStrategy of
740
743
default ->
741
- fun (_ConsumerId , # consumer {suspected_down = SuspectedDown }) ->
742
- case SuspectedDown of
743
- true ->
744
- {false , suspected_down };
745
- false ->
746
- {true , up }
744
+ fun (_ConsumerId ,
745
+ # consumer {status = Status }) ->
746
+ case Status of
747
+ suspected_down ->
748
+ {false , Status };
749
+ _ ->
750
+ {true , Status }
747
751
end
748
752
end ;
749
753
single_active ->
@@ -757,7 +761,7 @@ query_consumers(#state{consumers = Consumers,
757
761
end
758
762
end
759
763
end ,
760
- FromConsumers = maps :fold (fun (_ , # consumer {suspected_down = cancel }, Acc ) ->
764
+ FromConsumers = maps :fold (fun (_ , # consumer {status = cancelled }, Acc ) ->
761
765
Acc ;
762
766
({Tag , Pid }, # consumer {meta = Meta } = Consumer , Acc ) ->
763
767
{Active , ActivityStatus } = ActiveActivityStatusFun ({Tag , Pid }, Consumer ),
@@ -772,7 +776,7 @@ query_consumers(#state{consumers = Consumers,
772
776
Acc )
773
777
end , #{}, Consumers ),
774
778
FromWaitingConsumers =
775
- lists :foldl (fun ({_ , # consumer {suspected_down = cancel }}, Acc ) ->
779
+ lists :foldl (fun ({_ , # consumer {status = cancelled }}, Acc ) ->
776
780
Acc ;
777
781
({{Tag , Pid }, # consumer {meta = Meta } = Consumer }, Acc ) ->
778
782
{Active , ActivityStatus } = ActiveActivityStatusFun ({Tag , Pid }, Consumer ),
@@ -944,7 +948,7 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
944
948
update_or_remove_sub (ConsumerId ,
945
949
Consumer # consumer {lifetime = once ,
946
950
credit = 0 ,
947
- suspected_down = cancel },
951
+ status = cancelled },
948
952
C0 , SQ0 , Effects0 ),
949
953
{S0 # state {consumers = Cons , service_queue = SQ }, Effects1 };
950
954
down ->
@@ -1324,9 +1328,9 @@ checkout_one(#state{service_queue = SQ0,
1324
1328
% % can happen when draining
1325
1329
% % recurse without consumer on queue
1326
1330
checkout_one (InitState # state {service_queue = SQ1 });
1327
- {ok , # consumer {suspected_down = cancel }} ->
1331
+ {ok , # consumer {status = cancelled }} ->
1328
1332
checkout_one (InitState # state {service_queue = SQ1 });
1329
- {ok , # consumer {suspected_down = true }} ->
1333
+ {ok , # consumer {status = suspected_down }} ->
1330
1334
checkout_one (InitState # state {service_queue = SQ1 });
1331
1335
{ok , # consumer {checked_out = Checked0 ,
1332
1336
next_msg_id = Next ,
@@ -1381,8 +1385,9 @@ update_or_remove_sub(ConsumerId, #consumer{lifetime = once,
1381
1385
case maps :size (Checked ) of
1382
1386
0 ->
1383
1387
% we're done with this consumer
1384
- {maps :remove (ConsumerId , Cons ), ServiceQueue ,
1385
- [{demonitor , process , ConsumerId } | Effects ]};
1388
+ % TODO: demonitor consumer pid but _only_ if there are no other
1389
+ % monitors for this pid
1390
+ {maps :remove (ConsumerId , Cons ), ServiceQueue , Effects };
1386
1391
_ ->
1387
1392
% there are unsettled items so need to keep around
1388
1393
{maps :put (ConsumerId , Con , Cons ), ServiceQueue , Effects }
@@ -1402,7 +1407,6 @@ uniq_queue_in(Key, Queue) ->
1402
1407
queue :in (Key , Queue )
1403
1408
end .
1404
1409
1405
-
1406
1410
update_consumer (ConsumerId , Meta , Spec ,
1407
1411
# state {consumer_strategy = default } = State0 ) ->
1408
1412
% % general case, single active consumer off
@@ -1576,18 +1580,18 @@ message_size(Msg) ->
1576
1580
suspected_pids_for (Node , # state {consumers = Cons0 ,
1577
1581
enqueuers = Enqs0 ,
1578
1582
waiting_consumers = WaitingConsumers0 }) ->
1579
- Cons = maps :fold (fun ({_ , P }, # consumer {suspected_down = true }, Acc )
1583
+ Cons = maps :fold (fun ({_ , P }, # consumer {status = suspected_down }, Acc )
1580
1584
when node (P ) =:= Node ->
1581
1585
[P | Acc ];
1582
1586
(_ , _ , Acc ) -> Acc
1583
1587
end , [], Cons0 ),
1584
- Enqs = maps :fold (fun (P , # enqueuer {suspected_down = true }, Acc )
1588
+ Enqs = maps :fold (fun (P , # enqueuer {status = suspected_down }, Acc )
1585
1589
when node (P ) =:= Node ->
1586
1590
[P | Acc ];
1587
1591
(_ , _ , Acc ) -> Acc
1588
1592
end , Cons , Enqs0 ),
1589
1593
lists :foldl (fun ({{_ , P },
1590
- # consumer {suspected_down = true }}, Acc )
1594
+ # consumer {status = suspected_down }}, Acc )
1591
1595
when node (P ) =:= Node ->
1592
1596
[P | Acc ];
1593
1597
(_ , Acc ) -> Acc
@@ -1837,10 +1841,11 @@ return_checked_out_test() ->
1837
1841
{State0 , [_ , _ ]} = enq (1 , 1 , first , test_init (test )),
1838
1842
{State1 , [_Monitor ,
1839
1843
{send_msg , _ , {delivery , _ , [{MsgId , _ }]}, ra_event },
1840
- {aux , active } | _
1841
- ]} = check (Cid , 2 , State0 ),
1842
- % return
1843
- {_State2 , _ , [_ ]} = apply (meta (3 ), make_return (Cid , [MsgId ]), State1 ),
1844
+ {aux , active } | _ ]} = check_auto (Cid , 2 , State0 ),
1845
+ % returning immediately checks out the same message again
1846
+ {_ , ok , [{send_msg , _ , {delivery , _ , [{_ , _ }]}, ra_event },
1847
+ {aux , active }]} =
1848
+ apply (meta (3 ), make_return (Cid , [MsgId ]), State1 ),
1844
1849
ok .
1845
1850
1846
1851
return_auto_checked_out_test () ->
@@ -1867,15 +1872,19 @@ cancelled_checkout_out_test() ->
1867
1872
{State00 , [_ , _ ]} = enq (1 , 1 , first , test_init (test )),
1868
1873
{State0 , [_ ]} = enq (2 , 2 , second , State00 ),
1869
1874
{State1 , _ } = check_auto (Cid , 2 , State0 ),
1870
- % cancelled checkout should return all pending messages to queue
1875
+ % cancelled checkout should not return pending messages to queue
1871
1876
{State2 , _ , _ } = apply (meta (3 ), make_checkout (Cid , cancel , #{}), State1 ),
1872
1877
? assertEqual (1 , maps :size (State2 # state .messages )),
1873
- ? assertEqual (1 , lqueue :len (State2 # state .returns )),
1878
+ ? assertEqual (0 , lqueue :len (State2 # state .returns )),
1874
1879
1875
- {State3 , {dequeue , { 0 , { _ , first }}, _ }, _ } =
1880
+ {State3 , {dequeue , empty } } =
1876
1881
apply (meta (3 ), make_checkout (Cid , {dequeue , settled }, #{}), State2 ),
1882
+ % % settle
1883
+ {State4 , ok , _ } =
1884
+ apply (meta (4 ), make_settle (Cid , [0 ]), State3 ),
1885
+
1877
1886
{_State , {dequeue , {_ , {_ , second }}, _ }, _ } =
1878
- apply (meta (4 ), make_checkout (Cid , {dequeue , settled }, #{}), State3 ),
1887
+ apply (meta (5 ), make_checkout (Cid , {dequeue , settled }, #{}), State4 ),
1879
1888
ok .
1880
1889
1881
1890
down_with_noproc_consumer_returns_unsettled_test () ->
@@ -1937,16 +1946,6 @@ down_with_noproc_enqueuer_is_cleaned_up_test() ->
1937
1946
? assert (0 =:= maps :size (State1 # state .enqueuers )),
1938
1947
ok .
1939
1948
1940
- completed_consumer_yields_demonitor_effect_test () ->
1941
- Cid = {<<" completed_consumer_yields_demonitor_effect_test" >>, self ()},
1942
- {State0 , [_ , _ ]} = enq (1 , 1 , second , test_init (test )),
1943
- {State1 , [{monitor , process , _ } | _ ]} = check (Cid , 2 , State0 ),
1944
- {_ , Effects } = settle (Cid , 3 , 0 , State1 ),
1945
- ? ASSERT_EFF ({demonitor , _ , _ }, Effects ),
1946
- % release cursor for empty queue
1947
- ? ASSERT_EFF ({release_cursor , 3 , _ }, Effects ),
1948
- ok .
1949
-
1950
1949
discarded_message_without_dead_letter_handler_is_removed_test () ->
1951
1950
Cid = {<<" completed_consumer_yields_demonitor_effect_test" >>, self ()},
1952
1951
{State0 , [_ , _ ]} = enq (1 , 1 , first , test_init (test )),
@@ -2322,7 +2321,10 @@ single_active_consumer_test() ->
2322
2321
? assertEqual (1 , length (Effects1 )),
2323
2322
2324
2323
% cancelling the active consumer
2325
- {State3 , _ , Effects2 } = apply (meta (3 ), # checkout {spec = cancel , consumer_id = {<<" ctag1" >>, self ()}}, State2 ),
2324
+ {State3 , _ , Effects2 } = apply (meta (3 ),
2325
+ # checkout {spec = cancel ,
2326
+ consumer_id = {<<" ctag1" >>, self ()}},
2327
+ State2 ),
2326
2328
% the second registered consumer is now the active one
2327
2329
? assertEqual (1 , map_size (State3 # state .consumers )),
2328
2330
? assert (maps :is_key ({<<" ctag2" >>, self ()}, State3 # state .consumers )),
@@ -2425,24 +2427,25 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
2425
2427
State ),
2426
2428
NewState
2427
2429
end ,
2428
- State1 = lists :foldl (AddConsumer , State0 , [<<" ctag1" >>, <<" ctag2" >>, <<" ctag3" >>, <<" ctag4" >>]),
2430
+ State1 = lists :foldl (AddConsumer , State0 ,
2431
+ [<<" ctag1" >>, <<" ctag2" >>, <<" ctag3" >>, <<" ctag4" >>]),
2429
2432
2430
2433
% simulate node goes down
2431
2434
{State2 , _ , _ } = apply (#{}, {down , self (), noconnection }, State1 ),
2432
2435
2433
2436
% all the waiting consumers should be suspected down
2434
2437
? assertEqual (3 , length (State2 # state .waiting_consumers )),
2435
- lists :foreach (fun ({_ , # consumer {suspected_down = SuspectedDown }}) ->
2436
- ? assert (SuspectedDown )
2438
+ lists :foreach (fun ({_ , # consumer {status = Status }}) ->
2439
+ ? assert (Status == suspected_down )
2437
2440
end , State2 # state .waiting_consumers ),
2438
2441
2439
2442
% simulate node goes back up
2440
2443
{State3 , _ , _ } = apply (#{index => 2 }, {nodeup , node (self ())}, State2 ),
2441
2444
2442
2445
% all the waiting consumers should be un-suspected
2443
2446
? assertEqual (3 , length (State3 # state .waiting_consumers )),
2444
- lists :foreach (fun ({_ , # consumer {suspected_down = SuspectedDown }}) ->
2445
- ? assertNot ( SuspectedDown )
2447
+ lists :foreach (fun ({_ , # consumer {status = Status }}) ->
2448
+ ? assert ( Status /= suspected_down )
2446
2449
end , State3 # state .waiting_consumers ),
2447
2450
2448
2451
ok .
@@ -2527,7 +2530,8 @@ query_consumers_test() ->
2527
2530
State1 = lists :foldl (AddConsumer , State0 , [<<" ctag1" >>, <<" ctag2" >>, <<" ctag3" >>, <<" ctag4" >>]),
2528
2531
Consumers0 = State1 # state .consumers ,
2529
2532
Consumer = maps :get ({<<" ctag2" >>, self ()}, Consumers0 ),
2530
- Consumers1 = maps :put ({<<" ctag2" >>, self ()}, Consumer # consumer {suspected_down = true }, Consumers0 ),
2533
+ Consumers1 = maps :put ({<<" ctag2" >>, self ()},
2534
+ Consumer # consumer {status = suspected_down }, Consumers0 ),
2531
2535
State2 = State1 # state {consumers = Consumers1 },
2532
2536
2533
2537
? assertEqual (4 , query_consumer_count (State2 )),
0 commit comments