@@ -486,7 +486,7 @@ apply(_, {down, ConsumerPid, noconnection},
486
486
end , Enqs0 ),
487
487
% mark waiting consumers as suspected if necessary
488
488
WaitingConsumers = update_waiting_consumer_status (Node , State0 ,
489
- suspected_down ),
489
+ suspected_down ),
490
490
491
491
Effects2 = case maps :size (Cons ) of
492
492
0 ->
@@ -537,16 +537,17 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
537
537
end , Enqs0 ),
538
538
ConsumerUpdateActiveFun = consumer_active_flag_update_function (State0 ),
539
539
{Cons1 , SQ , Effects } =
540
- maps :fold (fun ({_ , P } = ConsumerId , C , {CAcc , SQAcc , EAcc })
541
- when (node (P ) =:= Node ) and
542
- (C # consumer .status =/= cancelled ) ->
543
- EAcc1 = ConsumerUpdateActiveFun (State0 , ConsumerId , C , true , up , EAcc ),
544
- update_or_remove_sub (
545
- ConsumerId , C # consumer {status = up },
546
- CAcc , SQAcc , EAcc1 );
547
- (_ , _ , Acc ) ->
548
- Acc
549
- end , {Cons0 , SQ0 , Monitors }, Cons0 ),
540
+ maps :fold (fun ({_ , P } = ConsumerId , C , {CAcc , SQAcc , EAcc })
541
+ when (node (P ) =:= Node ) and
542
+ (C # consumer .status =/= cancelled ) ->
543
+ EAcc1 = ConsumerUpdateActiveFun (State0 , ConsumerId , C ,
544
+ true , up , EAcc ),
545
+ update_or_remove_sub (
546
+ ConsumerId , C # consumer {status = up },
547
+ CAcc , SQAcc , EAcc1 );
548
+ (_ , _ , Acc ) ->
549
+ Acc
550
+ end , {Cons0 , SQ0 , Monitors }, Cons0 ),
550
551
551
552
checkout (Meta , State0 # state {consumers = Cons1 , enqueuers = Enqs1 ,
552
553
service_queue = SQ ,
@@ -558,7 +559,8 @@ apply(Meta, #update_config{config = Conf}, State) ->
558
559
559
560
consumer_active_flag_update_function (# state {consumer_strategy = default }) ->
560
561
fun (State , ConsumerId , Consumer , Active , ActivityStatus , Effects ) ->
561
- consumer_update_active_effects (State , ConsumerId , Consumer , Active , ActivityStatus , Effects )
562
+ consumer_update_active_effects (State , ConsumerId , Consumer , Active ,
563
+ ActivityStatus , Effects )
562
564
end ;
563
565
consumer_active_flag_update_function (# state {consumer_strategy = single_active }) ->
564
566
fun (_ , _ , _ , _ , _ , Effects ) ->
@@ -739,47 +741,51 @@ query_consumer_count(#state{consumers = Consumers,
739
741
query_consumers (# state {consumers = Consumers ,
740
742
waiting_consumers = WaitingConsumers ,
741
743
consumer_strategy = ConsumerStrategy } = State ) ->
742
- ActiveActivityStatusFun = case ConsumerStrategy of
743
- default ->
744
- fun (_ConsumerId ,
745
- # consumer {status = Status }) ->
746
- case Status of
747
- suspected_down ->
748
- {false , Status };
749
- _ ->
750
- {true , Status }
751
- end
752
- end ;
753
- single_active ->
754
- SingleActiveConsumer = query_single_active_consumer (State ),
755
- fun ({Tag , Pid } = _Consumer , _ ) ->
756
- case SingleActiveConsumer of
757
- {value , {Tag , Pid }} ->
758
- {true , single_active };
759
- _ ->
760
- {false , waiting }
761
- end
762
- end
763
- end ,
764
- FromConsumers = maps :fold (fun (_ , # consumer {status = cancelled }, Acc ) ->
765
- Acc ;
766
- ({Tag , Pid }, # consumer {meta = Meta } = Consumer , Acc ) ->
767
- {Active , ActivityStatus } = ActiveActivityStatusFun ({Tag , Pid }, Consumer ),
768
- maps :put ({Tag , Pid },
769
- {Pid , Tag ,
770
- maps :get (ack , Meta , undefined ),
771
- maps :get (prefetch , Meta , undefined ),
772
- Active ,
773
- ActivityStatus ,
774
- maps :get (args , Meta , []),
775
- maps :get (username , Meta , undefined )},
776
- Acc )
777
- end , #{}, Consumers ),
744
+ ActiveActivityStatusFun =
745
+ case ConsumerStrategy of
746
+ default ->
747
+ fun (_ConsumerId ,
748
+ # consumer {status = Status }) ->
749
+ case Status of
750
+ suspected_down ->
751
+ {false , Status };
752
+ _ ->
753
+ {true , Status }
754
+ end
755
+ end ;
756
+ single_active ->
757
+ SingleActiveConsumer = query_single_active_consumer (State ),
758
+ fun ({Tag , Pid } = _Consumer , _ ) ->
759
+ case SingleActiveConsumer of
760
+ {value , {Tag , Pid }} ->
761
+ {true , single_active };
762
+ _ ->
763
+ {false , waiting }
764
+ end
765
+ end
766
+ end ,
767
+ FromConsumers =
768
+ maps :fold (fun (_ , # consumer {status = cancelled }, Acc ) ->
769
+ Acc ;
770
+ ({Tag , Pid }, # consumer {meta = Meta } = Consumer , Acc ) ->
771
+ {Active , ActivityStatus } =
772
+ ActiveActivityStatusFun ({Tag , Pid }, Consumer ),
773
+ maps :put ({Tag , Pid },
774
+ {Pid , Tag ,
775
+ maps :get (ack , Meta , undefined ),
776
+ maps :get (prefetch , Meta , undefined ),
777
+ Active ,
778
+ ActivityStatus ,
779
+ maps :get (args , Meta , []),
780
+ maps :get (username , Meta , undefined )},
781
+ Acc )
782
+ end , #{}, Consumers ),
778
783
FromWaitingConsumers =
779
784
lists :foldl (fun ({_ , # consumer {status = cancelled }}, Acc ) ->
780
785
Acc ;
781
786
({{Tag , Pid }, # consumer {meta = Meta } = Consumer }, Acc ) ->
782
- {Active , ActivityStatus } = ActiveActivityStatusFun ({Tag , Pid }, Consumer ),
787
+ {Active , ActivityStatus } =
788
+ ActiveActivityStatusFun ({Tag , Pid }, Consumer ),
783
789
maps :put ({Tag , Pid },
784
790
{Pid , Tag ,
785
791
maps :get (ack , Meta , undefined ),
@@ -882,7 +888,8 @@ cancel_consumer(ConsumerId,
882
888
{Consumer , Cons1 } ->
883
889
% The active consumer is to be removed
884
890
% Cancel it
885
- {State1 , Effects1 } = maybe_return_all (ConsumerId , Consumer , Cons1 , State0 , Effects0 , Reason ),
891
+ {State1 , Effects1 } = maybe_return_all (ConsumerId , Consumer , Cons1 ,
892
+ State0 , Effects0 , Reason ),
886
893
Effects2 = cancel_consumer_effects (ConsumerId , State1 , Effects1 ),
887
894
% Take another one from the waiting consumers and put it in consumers
888
895
[{NewActiveConsumerId , NewActiveConsumer }
@@ -892,7 +899,8 @@ cancel_consumer(ConsumerId,
892
899
NewActiveConsumer ,
893
900
ServiceQueue ),
894
901
State = State1 # state {consumers = maps :put (NewActiveConsumerId ,
895
- NewActiveConsumer , State1 # state .consumers ),
902
+ NewActiveConsumer ,
903
+ State1 # state .consumers ),
896
904
service_queue = ServiceQueue1 ,
897
905
waiting_consumers = RemainingWaitingConsumers },
898
906
Effects = consumer_update_active_effects (State , NewActiveConsumerId ,
@@ -926,7 +934,8 @@ consumer_update_active_effects(#state{queue_resource = QName },
926
934
cancel_consumer0 (ConsumerId , # state {consumers = C0 } = S0 , Effects0 , Reason ) ->
927
935
case maps :take (ConsumerId , C0 ) of
928
936
{Consumer , Cons1 } ->
929
- {S , Effects2 } = maybe_return_all (ConsumerId , Consumer , Cons1 , S0 , Effects0 , Reason ),
937
+ {S , Effects2 } = maybe_return_all (ConsumerId , Consumer , Cons1 , S0 ,
938
+ Effects0 , Reason ),
930
939
Effects = cancel_consumer_effects (ConsumerId , S , Effects2 ),
931
940
case maps :size (S # state .consumers ) of
932
941
0 ->
0 commit comments