11
11
% % The Original Code is RabbitMQ.
12
12
% %
13
13
% % The Initial Developer of the Original Code is GoPivotal, Inc.
14
- % % Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
14
+ % % Copyright (c) 2007-2018 Pivotal Software, Inc. All rights reserved.
15
15
% %
16
16
17
17
-module (rabbit_amqqueue_process ).
36
36
-record (q , {
37
37
% % an #amqqueue record
38
38
q ,
39
- % % none | {exclusive consumer channel PID, consumer tag}
40
- exclusive_consumer ,
39
+ % % none | {exclusive consumer channel PID, consumer tag} | {single active consumer channel PID, consumer}
40
+ active_consumer ,
41
41
% % Set to true if a queue has ever had a consumer.
42
42
% % This is used to determine when to delete auto-delete queues.
43
43
has_had_consumers ,
94
94
% % example.
95
95
mirroring_policy_version = 0 ,
96
96
% % running | flow | idle
97
- status
97
+ status ,
98
+ % % true | false
99
+ single_active_consumer_on
98
100
}).
99
101
100
102
% %----------------------------------------------------------------------------
@@ -155,15 +157,20 @@ init(Q) ->
155
157
? MODULE }.
156
158
157
159
init_state (Q ) ->
158
- State = # q {q = Q ,
159
- exclusive_consumer = none ,
160
- has_had_consumers = false ,
161
- consumers = rabbit_queue_consumers :new (),
162
- senders = pmon :new (delegate ),
163
- msg_id_to_channel = gb_trees :empty (),
164
- status = running ,
165
- args_policy_version = 0 ,
166
- overflow = 'drop-head' },
160
+ SingleActiveConsumerOn = case rabbit_misc :table_lookup (Q # amqqueue .arguments , <<" x-single-active-consumer" >>) of
161
+ {bool , true } -> true ;
162
+ _ -> false
163
+ end ,
164
+ State = # q {q = Q ,
165
+ active_consumer = none ,
166
+ has_had_consumers = false ,
167
+ consumers = rabbit_queue_consumers :new (),
168
+ senders = pmon :new (delegate ),
169
+ msg_id_to_channel = gb_trees :empty (),
170
+ status = running ,
171
+ args_policy_version = 0 ,
172
+ overflow = 'drop-head' ,
173
+ single_active_consumer_on = SingleActiveConsumerOn },
167
174
rabbit_event :init_stats_timer (State , # q .stats_timer ).
168
175
169
176
init_it (Recover , From , State = # q {q = # amqqueue {exclusive_owner = none }}) ->
@@ -545,7 +552,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
545
552
ensure_stats_timer (State ) ->
546
553
rabbit_event :ensure_stats_timer (State , # q .stats_timer , emit_stats ).
547
554
548
- assert_invariant (State = # q {consumers = Consumers }) ->
555
+ assert_invariant (# q {single_active_consumer_on = true }) ->
556
+ % % queue may contain messages and have available consumers with exclusive consumer
557
+ ok ;
558
+ assert_invariant (State = # q {consumers = Consumers , single_active_consumer_on = false }) ->
549
559
true = (rabbit_queue_consumers :inactive (Consumers ) orelse is_empty (State )).
550
560
551
561
is_empty (# q {backing_queue = BQ , backing_queue_state = BQS }) -> BQ :is_empty (BQS ).
@@ -619,7 +629,8 @@ run_message_queue(ActiveConsumersChanged, State) ->
619
629
true -> maybe_notify_decorators (ActiveConsumersChanged , State );
620
630
false -> case rabbit_queue_consumers :deliver (
621
631
fun (AckRequired ) -> fetch (AckRequired , State ) end ,
622
- qname (State ), State # q .consumers ) of
632
+ qname (State ), State # q .consumers ,
633
+ State # q .single_active_consumer_on , State # q .active_consumer ) of
623
634
{delivered , ActiveConsumersChanged1 , State1 , Consumers } ->
624
635
run_message_queue (
625
636
ActiveConsumersChanged or ActiveConsumersChanged1 ,
@@ -645,7 +656,7 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
645
656
{{Message , Delivered , AckTag }, {BQS1 , MTC }};
646
657
(false ) -> {{Message , Delivered , undefined },
647
658
discard (Delivery , BQ , BQS , MTC )}
648
- end , qname (State ), State # q .consumers ) of
659
+ end , qname (State ), State # q .consumers , State # q . single_active_consumer_on , State # q . active_consumer ) of
649
660
{delivered , ActiveConsumersChanged , {BQS1 , MTC1 }, Consumers } ->
650
661
{delivered , maybe_notify_decorators (
651
662
ActiveConsumersChanged ,
@@ -814,9 +825,10 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
814
825
should_auto_delete (# q {has_had_consumers = false }) -> false ;
815
826
should_auto_delete (State ) -> is_unused (State ).
816
827
817
- handle_ch_down (DownPid , State = # q {consumers = Consumers ,
818
- exclusive_consumer = Holder ,
819
- senders = Senders }) ->
828
+ handle_ch_down (DownPid , State = # q {consumers = Consumers ,
829
+ active_consumer = Holder ,
830
+ single_active_consumer_on = SingleActiveConsumerOn ,
831
+ senders = Senders }) ->
820
832
State1 = State # q {senders = case pmon :is_monitored (DownPid , Senders ) of
821
833
false ->
822
834
Senders ;
@@ -840,12 +852,9 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
840
852
{ChAckTags , ChCTags , Consumers1 } ->
841
853
QName = qname (State1 ),
842
854
[emit_consumer_deleted (DownPid , CTag , QName , ? INTERNAL_USER ) || CTag <- ChCTags ],
843
- Holder1 = case Holder of
844
- {DownPid , _ } -> none ;
845
- Other -> Other
846
- end ,
855
+ Holder1 = new_single_active_consumer_after_channel_down (DownPid , Holder , SingleActiveConsumerOn , Consumers1 ),
847
856
State2 = State1 # q {consumers = Consumers1 ,
848
- exclusive_consumer = Holder1 },
857
+ active_consumer = Holder1 },
849
858
notify_decorators (State2 ),
850
859
case should_auto_delete (State2 ) of
851
860
true ->
@@ -860,6 +869,22 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
860
869
end
861
870
end .
862
871
872
+ new_single_active_consumer_after_channel_down (DownChPid , CurrentSingleActiveConsumer , _SingleActiveConsumerIsOn = true , Consumers ) ->
873
+ case CurrentSingleActiveConsumer of
874
+ {DownChPid , _ } ->
875
+ case rabbit_queue_consumers :get_consumer (Consumers ) of
876
+ undefined -> none ;
877
+ Consumer -> Consumer
878
+ end ;
879
+ false ->
880
+ CurrentSingleActiveConsumer
881
+ end ;
882
+ new_single_active_consumer_after_channel_down (DownChPid , CurrentSingleActiveConsumer , _SingleActiveConsumerIsOn = false , _Consumers ) ->
883
+ case CurrentSingleActiveConsumer of
884
+ {DownChPid , _ } -> none ;
885
+ Other -> Other
886
+ end .
887
+
863
888
check_exclusive_access ({_ChPid , _ConsumerTag }, _ExclusiveConsume , _State ) ->
864
889
in_use ;
865
890
check_exclusive_access (none , false , _State ) ->
@@ -1007,14 +1032,14 @@ i(effective_policy_definition, #q{q = Q}) ->
1007
1032
undefined -> [];
1008
1033
Def -> Def
1009
1034
end ;
1010
- i (exclusive_consumer_pid , # q {exclusive_consumer = none }) ->
1011
- '' ;
1012
- i (exclusive_consumer_pid , # q {exclusive_consumer = {ChPid , _ConsumerTag }}) ->
1035
+ i (exclusive_consumer_pid , # q {active_consumer = {ChPid , _ConsumerTag }, single_active_consumer_on = false }) ->
1013
1036
ChPid ;
1014
- i (exclusive_consumer_tag , # q { exclusive_consumer = none } ) ->
1037
+ i (exclusive_consumer_pid , _ ) ->
1015
1038
'' ;
1016
- i (exclusive_consumer_tag , # q {exclusive_consumer = {_ChPid , ConsumerTag }}) ->
1039
+ i (exclusive_consumer_tag , # q {active_consumer = {_ChPid , ConsumerTag }, single_active_consumer_on = false }) ->
1017
1040
ConsumerTag ;
1041
+ i (exclusive_consumer_tag , _ ) ->
1042
+ '' ;
1018
1043
i (messages_ready , # q {backing_queue_state = BQS , backing_queue = BQ }) ->
1019
1044
BQ :len (BQS );
1020
1045
i (messages_unacknowledged , _ ) ->
@@ -1213,49 +1238,81 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
1213
1238
1214
1239
handle_call ({basic_consume , NoAck , ChPid , LimiterPid , LimiterActive ,
1215
1240
PrefetchCount , ConsumerTag , ExclusiveConsume , Args , OkMsg , ActingUser },
1216
- _From , State = # q {consumers = Consumers ,
1217
- exclusive_consumer = Holder }) ->
1218
- case check_exclusive_access (Holder , ExclusiveConsume , State ) of
1219
- in_use -> reply ({error , exclusive_consume_unavailable }, State );
1220
- ok -> Consumers1 = rabbit_queue_consumers :add (
1221
- ChPid , ConsumerTag , NoAck ,
1222
- LimiterPid , LimiterActive ,
1223
- PrefetchCount , Args , is_empty (State ),
1224
- ActingUser , Consumers ),
1225
- ExclusiveConsumer =
1226
- if ExclusiveConsume -> {ChPid , ConsumerTag };
1227
- true -> Holder
1228
- end ,
1229
- State1 = State # q {consumers = Consumers1 ,
1230
- has_had_consumers = true ,
1231
- exclusive_consumer = ExclusiveConsumer },
1232
- ok = maybe_send_reply (ChPid , OkMsg ),
1233
- QName = qname (State1 ),
1234
- AckRequired = not NoAck ,
1235
- rabbit_core_metrics :consumer_created (
1236
- ChPid , ConsumerTag , ExclusiveConsume , AckRequired , QName ,
1237
- PrefetchCount , Args ),
1238
- emit_consumer_created (ChPid , ConsumerTag , ExclusiveConsume ,
1239
- AckRequired , QName , PrefetchCount ,
1240
- Args , none , ActingUser ),
1241
- notify_decorators (State1 ),
1242
- reply (ok , run_message_queue (State1 ))
1241
+ _From , State = # q {consumers = Consumers ,
1242
+ active_consumer = Holder ,
1243
+ single_active_consumer_on = SingleActiveConsumerOn }) ->
1244
+ ConsumerRegistration = case SingleActiveConsumerOn of
1245
+ true ->
1246
+ case ExclusiveConsume of
1247
+ true ->
1248
+ {error , reply ({error , exclusive_consume_unavailable }, State )};
1249
+ false ->
1250
+ Consumers1 = rabbit_queue_consumers :add (
1251
+ ChPid , ConsumerTag , NoAck ,
1252
+ LimiterPid , LimiterActive ,
1253
+ PrefetchCount , Args , is_empty (State ),
1254
+ ActingUser , Consumers ),
1255
+
1256
+ case Holder of
1257
+ none ->
1258
+ NewConsumer = rabbit_queue_consumers :get (ChPid , ConsumerTag , Consumers1 ),
1259
+ {state , State # q {consumers = Consumers1 ,
1260
+ has_had_consumers = true ,
1261
+ active_consumer = NewConsumer }};
1262
+ _ ->
1263
+ {state , State # q {consumers = Consumers1 ,
1264
+ has_had_consumers = true }}
1265
+ end
1266
+ end ;
1267
+ false ->
1268
+ case check_exclusive_access (Holder , ExclusiveConsume , State ) of
1269
+ in_use -> {error , reply ({error , exclusive_consume_unavailable }, State )};
1270
+ ok ->
1271
+ Consumers1 = rabbit_queue_consumers :add (
1272
+ ChPid , ConsumerTag , NoAck ,
1273
+ LimiterPid , LimiterActive ,
1274
+ PrefetchCount , Args , is_empty (State ),
1275
+ ActingUser , Consumers ),
1276
+ ExclusiveConsumer =
1277
+ if ExclusiveConsume -> {ChPid , ConsumerTag };
1278
+ true -> Holder
1279
+ end ,
1280
+ {state , State # q {consumers = Consumers1 ,
1281
+ has_had_consumers = true ,
1282
+ active_consumer = ExclusiveConsumer }}
1283
+ end
1284
+ end ,
1285
+ case ConsumerRegistration of
1286
+ {error , Reply } ->
1287
+ Reply ;
1288
+ {state , State1 } ->
1289
+ ok = maybe_send_reply (ChPid , OkMsg ),
1290
+ QName = qname (State1 ),
1291
+ AckRequired = not NoAck ,
1292
+ rabbit_core_metrics :consumer_created (
1293
+ ChPid , ConsumerTag , ExclusiveConsume , AckRequired , QName ,
1294
+ PrefetchCount , Args ),
1295
+ emit_consumer_created (ChPid , ConsumerTag , ExclusiveConsume ,
1296
+ AckRequired , QName , PrefetchCount ,
1297
+ Args , none , ActingUser ),
1298
+ notify_decorators (State1 ),
1299
+ reply (ok , run_message_queue (State1 ))
1243
1300
end ;
1244
1301
1245
1302
handle_call ({basic_cancel , ChPid , ConsumerTag , OkMsg , ActingUser }, _From ,
1246
- State = # q {consumers = Consumers ,
1247
- exclusive_consumer = Holder }) ->
1303
+ State = # q {consumers = Consumers ,
1304
+ active_consumer = Holder ,
1305
+ single_active_consumer_on = SingleActiveConsumerOn }) ->
1248
1306
ok = maybe_send_reply (ChPid , OkMsg ),
1249
1307
case rabbit_queue_consumers :remove (ChPid , ConsumerTag , Consumers ) of
1250
1308
not_found ->
1251
1309
reply (ok , State );
1252
1310
Consumers1 ->
1253
- Holder1 = case Holder of
1254
- {ChPid , ConsumerTag } -> none ;
1255
- _ -> Holder
1256
- end ,
1311
+ Holder1 = new_single_active_consumer_after_basic_cancel (ChPid , ConsumerTag ,
1312
+ Holder , SingleActiveConsumerOn , Consumers1
1313
+ ),
1257
1314
State1 = State # q {consumers = Consumers1 ,
1258
- exclusive_consumer = Holder1 },
1315
+ active_consumer = Holder1 },
1259
1316
emit_consumer_deleted (ChPid , ConsumerTag , qname (State1 ), ActingUser ),
1260
1317
notify_decorators (State1 ),
1261
1318
case should_auto_delete (State1 ) of
@@ -1325,6 +1382,24 @@ handle_call(sync_mirrors, _From, State) ->
1325
1382
handle_call (cancel_sync_mirrors , _From , State ) ->
1326
1383
reply ({ok , not_syncing }, State ).
1327
1384
1385
+ new_single_active_consumer_after_basic_cancel (ChPid , ConsumerTag , CurrentSingleActiveConsumer ,
1386
+ _SingleActiveConsumerIsOn = true , Consumers ) ->
1387
+ case rabbit_queue_consumers :is_same (ChPid , ConsumerTag , CurrentSingleActiveConsumer ) of
1388
+ true ->
1389
+ case rabbit_queue_consumers :get_consumer (Consumers ) of
1390
+ undefined -> none ;
1391
+ Consumer -> Consumer
1392
+ end ;
1393
+ false ->
1394
+ CurrentSingleActiveConsumer
1395
+ end ;
1396
+ new_single_active_consumer_after_basic_cancel (ChPid , ConsumerTag , CurrentSingleActiveConsumer ,
1397
+ _SingleActiveConsumerIsOn = false , _Consumers ) ->
1398
+ case CurrentSingleActiveConsumer of
1399
+ {ChPid , ConsumerTag } -> none ;
1400
+ _ -> CurrentSingleActiveConsumer
1401
+ end .
1402
+
1328
1403
handle_cast (init , State ) ->
1329
1404
try
1330
1405
init_it ({no_barrier , non_clean_shutdown }, none , State )
@@ -1432,7 +1507,6 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
1432
1507
{unblocked , Consumers1 } -> State1 = State # q {consumers = Consumers1 },
1433
1508
run_message_queue (true , State1 )
1434
1509
end );
1435
-
1436
1510
handle_cast (notify_decorators , State ) ->
1437
1511
notify_decorators (State ),
1438
1512
noreply (State );
0 commit comments