94
94
# credit {} |
95
95
# purge {} |
96
96
# update_config {}.
97
+
97
98
- type command () :: protocol () | ra_machine :builtin_command ().
98
99
% % all the command types supported by ra fifo
99
100
@@ -134,7 +135,7 @@ update_config(Conf, State) ->
134
135
true ->
135
136
single_active ;
136
137
false ->
137
- default
138
+ competing
138
139
end ,
139
140
Cfg = State #? MODULE .cfg ,
140
141
State #? MODULE {cfg = Cfg # cfg {release_cursor_interval = SHI ,
@@ -248,7 +249,8 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
248
249
{State0 , {dequeue , empty }};
249
250
Ready ->
250
251
State1 = update_consumer (ConsumerId , ConsumerMeta ,
251
- {once , 1 , simple_prefetch }, State0 ),
252
+ {once , 1 , simple_prefetch },
253
+ State0 ),
252
254
{success , _ , MsgId , Msg , State2 } = checkout_one (State1 ),
253
255
case Settlement of
254
256
unsettled ->
@@ -257,10 +259,9 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
257
259
[{monitor , process , Pid }]};
258
260
settled ->
259
261
% % immediately settle the checkout
260
- {State , _ , Effects } = apply (Meta ,
261
- make_settle (ConsumerId ,
262
- [MsgId ]),
263
- State2 ),
262
+ {State , _ , Effects } =
263
+ apply (Meta , make_settle (ConsumerId , [MsgId ]),
264
+ State2 ),
264
265
{State , {dequeue , {MsgId , Msg }, Ready - 1 }, Effects }
265
266
end
266
267
end ;
@@ -294,27 +295,64 @@ apply(#{index := RaftIdx}, #purge{},
294
295
% % reverse the effects ourselves
295
296
{State , {purge , Total },
296
297
lists :reverse ([garbage_collection | Effects ])};
297
- apply (_ , {down , ConsumerPid , noconnection },
298
+ apply (_ , {down , Pid , noconnection },
299
+ #? MODULE {consumers = Cons0 ,
300
+ cfg = # cfg {consumer_strategy = single_active },
301
+ waiting_consumers = Waiting0 ,
302
+ enqueuers = Enqs0 } = State0 ) ->
303
+ Node = node (Pid ),
304
+ % % if the pid refers to the active consumer, mark it as suspected and return
305
+ % % it to the waiting queue
306
+ {State1 , Effects0 } = case maps :to_list (Cons0 ) of
307
+ [{{_ , P } = Cid , C }] when node (P ) =:= Node ->
308
+ % % the consumer should be returned to waiting
309
+ % %
310
+ Effs = consumer_update_active_effects (
311
+ State0 , Cid , C , false , suspected_down , []),
312
+ {State0 #? MODULE {consumers = #{},
313
+ waiting_consumers = Waiting0 ++ [{Cid , C }]},
314
+ Effs };
315
+ _ -> {State0 , []}
316
+ end ,
317
+ WaitingConsumers = update_waiting_consumer_status (Node , State1 ,
318
+ suspected_down ),
319
+
320
+ % % select a new consumer from the waiting queue and run a checkout
321
+ State2 = State1 #? MODULE {waiting_consumers = WaitingConsumers },
322
+ {State , Effects1 } = activate_next_consumer (State2 , Effects0 ),
323
+
324
+ % % mark any enquers as suspected
325
+ Enqs = maps :map (fun (P , E ) when node (P ) =:= Node ->
326
+ E # enqueuer {status = suspected_down };
327
+ (_ , E ) -> E
328
+ end , Enqs0 ),
329
+ Effects = [{monitor , node , Node } | Effects1 ],
330
+ {State #? MODULE {enqueuers = Enqs }, ok , Effects };
331
+ apply (_ , {down , Pid , noconnection },
298
332
#? MODULE {consumers = Cons0 ,
299
333
enqueuers = Enqs0 } = State0 ) ->
300
- Node = node (ConsumerPid ),
334
+ % % A node has been disconnected. This doesn't necessarily mean that
335
+ % % any processes on this node are down, they _may_ come back so here
336
+ % % we just mark them as suspected (effectively deactivated)
337
+ % % and return all checked out messages to the main queue for delivery to any
338
+ % % live consumers
339
+ % %
340
+ % % all pids for the disconnected node will be marked as suspected not just
341
+ % % the one we got the `down' command for
342
+ Node = node (Pid ),
301
343
ConsumerUpdateActiveFun = consumer_active_flag_update_function (State0 ),
302
- % mark all consumers and enqueuers as suspected down
303
- % and monitor the node so that we can find out the final state of the
304
- % process at some later point
344
+
305
345
{Cons , State , Effects1 } =
306
- maps :fold (fun ({_ , P } = K ,
307
- # consumer {checked_out = Checked0 } = C ,
308
- {Co , St0 , Eff }) when (node (P ) =:= Node ) and
309
- (C # consumer .status =/= cancelled )->
346
+ maps :fold (fun ({_ , P } = K , # consumer {checked_out = Checked0 ,
347
+ status = up } = C ,
348
+ {Co , St0 , Eff }) when node (P ) =:= Node ->
310
349
{St , Eff0 } = return_all (St0 , Checked0 , Eff , K , C ),
311
350
Credit = increase_credit (C , maps :size (Checked0 )),
312
351
Eff1 = ConsumerUpdateActiveFun (St , K , C , false ,
313
352
suspected_down , Eff0 ),
314
- {maps :put (K ,
315
- C # consumer {status = suspected_down ,
316
- credit = Credit ,
317
- checked_out = #{}}, Co ),
353
+ {maps :put (K , C # consumer {status = suspected_down ,
354
+ credit = Credit ,
355
+ checked_out = #{}}, Co ),
318
356
St , Eff1 };
319
357
(K , C , {Co , St , Eff }) ->
320
358
{maps :put (K , C , Co ), St , Eff }
@@ -323,19 +361,18 @@ apply(_, {down, ConsumerPid, noconnection},
323
361
E # enqueuer {status = suspected_down };
324
362
(_ , E ) -> E
325
363
end , Enqs0 ),
326
- % mark waiting consumers as suspected if necessary
327
- WaitingConsumers = update_waiting_consumer_status (Node , State0 ,
328
- suspected_down ),
329
364
365
+ % Monitor the node so that we can "unsuspect" these processes when the node
366
+ % comes back, then re-issue all monitors and discover the final fate of
367
+ % these processes
330
368
Effects2 = case maps :size (Cons ) of
331
369
0 ->
332
370
[{aux , inactive }, {monitor , node , Node }];
333
371
_ ->
334
372
[{monitor , node , Node }]
335
373
end ++ Effects1 ,
336
374
% % TODO: should we run a checkout here?
337
- {State #? MODULE {consumers = Cons , enqueuers = Enqs ,
338
- waiting_consumers = WaitingConsumers }, ok , Effects2 };
375
+ {State #? MODULE {consumers = Cons , enqueuers = Enqs }, ok , Effects2 };
339
376
apply (Meta , {down , Pid , _Info }, #? MODULE {consumers = Cons0 ,
340
377
enqueuers = Enqs0 } = State0 ) ->
341
378
% Remove any enqueuer for the same pid and enqueue any pending messages
@@ -367,36 +404,36 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
367
404
Monitors = [{monitor , process , P }
368
405
|| P <- suspected_pids_for (Node , State0 )],
369
406
370
- % un-suspect waiting consumers when necessary
371
- WaitingConsumers = update_waiting_consumer_status (Node , State0 , up ),
372
-
373
407
Enqs1 = maps :map (fun (P , E ) when node (P ) =:= Node ->
374
408
E # enqueuer {status = up };
375
409
(_ , E ) -> E
376
410
end , Enqs0 ),
377
411
ConsumerUpdateActiveFun = consumer_active_flag_update_function (State0 ),
378
- {Cons1 , SQ , Effects } =
379
- maps :fold (fun ({_ , P } = ConsumerId , C , {CAcc , SQAcc , EAcc })
380
- when (node (P ) =:= Node ) and
381
- (C # consumer .status =/= cancelled ) ->
382
- EAcc1 = ConsumerUpdateActiveFun (State0 , ConsumerId , C ,
383
- true , up , EAcc ),
384
- update_or_remove_sub (
385
- ConsumerId , C # consumer {status = up },
386
- CAcc , SQAcc , EAcc1 );
387
- (_ , _ , Acc ) ->
388
- Acc
389
- end , {Cons0 , SQ0 , Monitors }, Cons0 ),
390
-
391
- checkout (Meta , State0 #? MODULE {consumers = Cons1 , enqueuers = Enqs1 ,
392
- service_queue = SQ ,
393
- waiting_consumers = WaitingConsumers }, Effects );
412
+ % % mark all consumers as up
413
+ {Cons1 , SQ , Effects1 } =
414
+ maps :fold (fun ({_ , P } = ConsumerId , C , {CAcc , SQAcc , EAcc })
415
+ when (node (P ) =:= Node ) and
416
+ (C # consumer .status =/= cancelled ) ->
417
+ EAcc1 = ConsumerUpdateActiveFun (State0 , ConsumerId ,
418
+ C , true , up , EAcc ),
419
+ update_or_remove_sub (ConsumerId ,
420
+ C # consumer {status = up }, CAcc ,
421
+ SQAcc , EAcc1 );
422
+ (_ , _ , Acc ) ->
423
+ Acc
424
+ end , {Cons0 , SQ0 , Monitors }, Cons0 ),
425
+ Waiting = update_waiting_consumer_status (Node , State0 , up ),
426
+ State1 = State0 #? MODULE {consumers = Cons1 , enqueuers = Enqs1 ,
427
+ service_queue = SQ ,
428
+ waiting_consumers = Waiting },
429
+ {State , Effects } = activate_next_consumer (State1 , Effects1 ),
430
+ checkout (Meta , State , Effects );
394
431
apply (_ , {nodedown , _Node }, State ) ->
395
432
{State , ok };
396
433
apply (Meta , # update_config {config = Conf }, State ) ->
397
434
checkout (Meta , update_config (Conf , State ), []).
398
435
399
- consumer_active_flag_update_function (#? MODULE {cfg = # cfg {consumer_strategy = default }}) ->
436
+ consumer_active_flag_update_function (#? MODULE {cfg = # cfg {consumer_strategy = competing }}) ->
400
437
fun (State , ConsumerId , Consumer , Active , ActivityStatus , Effects ) ->
401
438
consumer_update_active_effects (State , ConsumerId , Consumer , Active ,
402
439
ActivityStatus , Effects )
@@ -407,7 +444,7 @@ consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = sin
407
444
end .
408
445
409
446
handle_waiting_consumer_down (_Pid ,
410
- #? MODULE {cfg = # cfg {consumer_strategy = default }} = State ) ->
447
+ #? MODULE {cfg = # cfg {consumer_strategy = competing }} = State ) ->
411
448
{[], State };
412
449
handle_waiting_consumer_down (_Pid ,
413
450
#? MODULE {cfg = # cfg {consumer_strategy = single_active },
@@ -429,27 +466,18 @@ handle_waiting_consumer_down(Pid,
429
466
State = State0 #? MODULE {waiting_consumers = StillUp },
430
467
{Effects , State }.
431
468
432
- update_waiting_consumer_status (_Node , #? MODULE {cfg = # cfg {consumer_strategy = default }},
433
- _Status ) ->
434
- [];
435
- update_waiting_consumer_status (_Node ,
436
- #? MODULE {cfg = # cfg {consumer_strategy = single_active },
437
- waiting_consumers = []},
438
- _Status ) ->
439
- [];
440
469
update_waiting_consumer_status (Node ,
441
- #? MODULE {cfg = # cfg {consumer_strategy = single_active },
442
- waiting_consumers = WaitingConsumers },
470
+ #? MODULE {waiting_consumers = WaitingConsumers },
443
471
Status ) ->
444
472
[begin
445
- case node (P ) of
473
+ case node (Pid ) of
446
474
Node ->
447
475
{ConsumerId , Consumer # consumer {status = Status }};
448
476
_ ->
449
477
{ConsumerId , Consumer }
450
478
end
451
- end || {{_ , P } = ConsumerId , Consumer } <- WaitingConsumers ,
452
- Consumer # consumer .status =/= cancelled ].
479
+ end || {{_ , Pid } = ConsumerId , Consumer } <- WaitingConsumers ,
480
+ Consumer # consumer .status =/= cancelled ].
453
481
454
482
- spec state_enter (ra_server :ra_state (), state ()) -> ra_machine :effects ().
455
483
state_enter (leader , #? MODULE {consumers = Cons ,
@@ -583,7 +611,7 @@ query_consumers(#?MODULE{consumers = Consumers,
583
611
cfg = # cfg {consumer_strategy = ConsumerStrategy }} = State ) ->
584
612
ActiveActivityStatusFun =
585
613
case ConsumerStrategy of
586
- default ->
614
+ competing ->
587
615
fun (_ConsumerId ,
588
616
# consumer {status = Status }) ->
589
617
case Status of
@@ -709,8 +737,8 @@ num_checked_out(#?MODULE{consumers = Cons}) ->
709
737
end , 0 , maps :values (Cons )).
710
738
711
739
cancel_consumer (ConsumerId ,
712
- #? MODULE {cfg = # cfg {consumer_strategy = default }} = State , Effects , Reason ) ->
713
- % % general case, single active consumer off
740
+ #? MODULE {cfg = # cfg {consumer_strategy = competing }} = State ,
741
+ Effects , Reason ) ->
714
742
cancel_consumer0 (ConsumerId , State , Effects , Reason );
715
743
cancel_consumer (ConsumerId ,
716
744
#? MODULE {cfg = # cfg {consumer_strategy = single_active },
@@ -721,41 +749,23 @@ cancel_consumer(ConsumerId,
721
749
cancel_consumer (ConsumerId ,
722
750
#? MODULE {consumers = Cons0 ,
723
751
cfg = # cfg {consumer_strategy = single_active },
724
- waiting_consumers = WaitingConsumers0 } = State0 ,
752
+ waiting_consumers = Waiting0 } = State0 ,
725
753
Effects0 , Reason ) ->
726
754
% % single active consumer on, consumers are waiting
727
- case maps :take (ConsumerId , Cons0 ) of
728
- { Consumer , Cons1 } ->
755
+ case maps :is_key (ConsumerId , Cons0 ) of
756
+ true ->
729
757
% The active consumer is to be removed
730
- % Cancel it
731
- {State1 , Effects1 } = maybe_return_all (ConsumerId , Consumer , Cons1 ,
732
- State0 , Effects0 , Reason ),
733
- Effects2 = cancel_consumer_effects (ConsumerId , State1 , Effects1 ),
734
- % Take another one from the waiting consumers and put it in consumers
735
- [{NewActiveConsumerId , NewActiveConsumer }
736
- | RemainingWaitingConsumers ] = WaitingConsumers0 ,
737
- #? MODULE {service_queue = ServiceQueue } = State1 ,
738
- ServiceQueue1 = maybe_queue_consumer (NewActiveConsumerId ,
739
- NewActiveConsumer ,
740
- ServiceQueue ),
741
- State = State1 #? MODULE {consumers = maps :put (NewActiveConsumerId ,
742
- NewActiveConsumer ,
743
- State1 #? MODULE .consumers ),
744
- service_queue = ServiceQueue1 ,
745
- waiting_consumers = RemainingWaitingConsumers },
746
- Effects = consumer_update_active_effects (State , NewActiveConsumerId ,
747
- NewActiveConsumer , true ,
748
- single_active , Effects2 ),
749
- {State , Effects };
750
- error ->
758
+ {State1 , Effects1 } = cancel_consumer0 (ConsumerId , State0 ,
759
+ Effects0 , Reason ),
760
+ activate_next_consumer (State1 , Effects1 );
761
+ false ->
751
762
% The cancelled consumer is not the active one
752
763
% Just remove it from idle_consumers
753
- WaitingConsumers = lists :keydelete (ConsumerId , 1 ,
754
- WaitingConsumers0 ),
764
+ Waiting = lists :keydelete (ConsumerId , 1 , Waiting0 ),
755
765
Effects = cancel_consumer_effects (ConsumerId , State0 , Effects0 ),
756
766
% A waiting consumer isn't supposed to have any checked out messages,
757
767
% so nothing special to do here
758
- {State0 #? MODULE {waiting_consumers = WaitingConsumers }, Effects }
768
+ {State0 #? MODULE {waiting_consumers = Waiting }, Effects }
759
769
end .
760
770
761
771
consumer_update_active_effects (#? MODULE {cfg = # cfg {resource = QName }},
@@ -765,9 +775,7 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
765
775
Ack = maps :get (ack , Meta , undefined ),
766
776
Prefetch = maps :get (prefetch , Meta , undefined ),
767
777
Args = maps :get (args , Meta , []),
768
- [{mod_call ,
769
- rabbit_quorum_queue ,
770
- update_consumer_handler ,
778
+ [{mod_call , rabbit_quorum_queue , update_consumer_handler ,
771
779
[QName , ConsumerId , false , Ack , Prefetch , Active , ActivityStatus , Args ]}
772
780
| Effects ].
773
781
@@ -788,6 +796,32 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
788
796
{S0 , Effects0 }
789
797
end .
790
798
799
+ activate_next_consumer (#? MODULE {consumers = Cons } = State0 , Effects )
800
+ when map_size (Cons ) == 1 ->
801
+ {State0 , Effects };
802
+ activate_next_consumer (#? MODULE {waiting_consumers = Waiting0 } = State0 ,
803
+ Effects0 ) ->
804
+ case lists :filter (fun ({_ , # consumer {status = Status }}) ->
805
+ Status == up
806
+ end , Waiting0 ) of
807
+ [{NextConsumerId , NextConsumer } | _ ] ->
808
+ Remaining = lists :keydelete (NextConsumerId , 1 , Waiting0 ),
809
+ #? MODULE {service_queue = ServiceQueue } = State0 ,
810
+ ServiceQueue1 = maybe_queue_consumer (NextConsumerId ,
811
+ NextConsumer ,
812
+ ServiceQueue ),
813
+ State = State0 #? MODULE {consumers = #{NextConsumerId => NextConsumer },
814
+ service_queue = ServiceQueue1 ,
815
+ waiting_consumers = Remaining },
816
+ Effects = consumer_update_active_effects (State , NextConsumerId ,
817
+ NextConsumer , true ,
818
+ single_active , Effects0 ),
819
+ {State , Effects };
820
+ [] ->
821
+ {State0 , Effects0 }
822
+ end .
823
+
824
+
791
825
maybe_return_all (ConsumerId , # consumer {checked_out = Checked0 } = Consumer ,
792
826
Cons1 , #? MODULE {consumers = C0 ,
793
827
service_queue = SQ0 } = S0 ,
@@ -1296,7 +1330,7 @@ uniq_queue_in(Key, Queue) ->
1296
1330
end .
1297
1331
1298
1332
update_consumer (ConsumerId , Meta , Spec ,
1299
- #? MODULE {cfg = # cfg {consumer_strategy = default }} = State0 ) ->
1333
+ #? MODULE {cfg = # cfg {consumer_strategy = competing }} = State0 ) ->
1300
1334
% % general case, single active consumer off
1301
1335
update_consumer0 (ConsumerId , Meta , Spec , State0 );
1302
1336
update_consumer (ConsumerId , Meta , Spec ,
@@ -1331,7 +1365,6 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
1331
1365
end , Init , Cons0 ),
1332
1366
ServiceQueue = maybe_queue_consumer (ConsumerId , maps :get (ConsumerId , Cons ),
1333
1367
ServiceQueue0 ),
1334
-
1335
1368
State0 #? MODULE {consumers = Cons , service_queue = ServiceQueue }.
1336
1369
1337
1370
maybe_queue_consumer (ConsumerId , # consumer {credit = Credit },
0 commit comments