Skip to content

Commit a0c9dd6

Browse files
committed
Quorum queue: Run cancel_consumer handler async
If cancle_consumer handler needs to run on a remote node use rpc:cast instead of call to avoid potential blocking.
1 parent 7c1b567 commit a0c9dd6

File tree

3 files changed

+29
-28
lines changed

3 files changed

+29
-28
lines changed

src/rabbit_amqqueue.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1476,7 +1476,8 @@ deliver(Qs, Delivery = #delivery{flow = Flow,
14761476
lists:foldl(
14771477
fun({{Name, _} = Pid, QName}, QStates) ->
14781478
QState0 = get_quorum_state(Pid, QName, QStates),
1479-
case rabbit_quorum_queue:deliver(Confirm, Delivery, QState0) of
1479+
case rabbit_quorum_queue:deliver(Confirm, Delivery,
1480+
QState0) of
14801481
{ok, QState} ->
14811482
maps:put(Name, QState, QStates);
14821483
{slow, QState} ->

src/rabbit_fifo.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -392,8 +392,7 @@ apply(_, {down, ConsumerPid, noconnection},
392392
apply(_, {down, Pid, _Info}, Effects0,
393393
#state{consumers = Cons0,
394394
enqueuers = Enqs0} = State0) ->
395-
% remove any enqueuer for the same pid
396-
% TODO: if there are any pending enqueuers these should be enqueued
395+
% Remove any enqueuer for the same pid and enqueue any pending messages
397396
% This should be ok as we won't see any more enqueues from this pid
398397
State1 = case maps:take(Pid, Enqs0) of
399398
{#enqueuer{pending = Pend}, Enqs} ->

src/rabbit_quorum_queue.erl

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ init_state({Name, _}, QName) ->
104104
fun() -> credit_flow:block(Name), ok end,
105105
fun() -> credit_flow:unblock(Name), ok end).
106106

107-
handle_event({ra_event, From, Evt}, FState) ->
108-
rabbit_fifo_client:handle_ra_event(From, Evt, FState).
107+
handle_event({ra_event, From, Evt}, QState) ->
108+
rabbit_fifo_client:handle_ra_event(From, Evt, QState).
109109

110110
declare(#amqqueue{name = QName,
111111
durable = Durable,
@@ -160,9 +160,10 @@ cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) ->
160160
% QName = queue_name(Name),
161161
case Node == node() of
162162
true -> cancel_consumer(QName, ChPid, ConsumerTag);
163-
false -> rabbit_misc:rpc_call(Node, rabbit_quorum_queue,
164-
cancel_consumer,
165-
[QName, ChPid, ConsumerTag])
163+
false ->
164+
rpc:cast(Node, rabbit_quorum_queue,
165+
cancel_consumer,
166+
[QName, ChPid, ConsumerTag])
166167
end.
167168

168169
cancel_consumer(QName, ChPid, ConsumerTag) ->
@@ -306,32 +307,32 @@ delete_immediately({Name, _} = QPid) ->
306307
rabbit_core_metrics:queue_deleted(QName),
307308
ok.
308309

309-
ack(CTag, MsgIds, FState) ->
310-
rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, FState).
310+
ack(CTag, MsgIds, QState) ->
311+
rabbit_fifo_client:settle(quorum_ctag(CTag), MsgIds, QState).
311312

312-
reject(true, CTag, MsgIds, FState) ->
313-
rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, FState);
314-
reject(false, CTag, MsgIds, FState) ->
315-
rabbit_fifo_client:discard(quorum_ctag(CTag), MsgIds, FState).
313+
reject(true, CTag, MsgIds, QState) ->
314+
rabbit_fifo_client:return(quorum_ctag(CTag), MsgIds, QState);
315+
reject(false, CTag, MsgIds, QState) ->
316+
rabbit_fifo_client:discard(quorum_ctag(CTag), MsgIds, QState).
316317

317318
credit(CTag, Credit, Drain, QState) ->
318319
rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState).
319320

320321
basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
321-
CTag0, FState0) ->
322+
CTag0, QState0) ->
322323
CTag = quorum_ctag(CTag0),
323324
Settlement = case NoAck of
324325
true ->
325326
settled;
326327
false ->
327328
unsettled
328329
end,
329-
case rabbit_fifo_client:dequeue(CTag, Settlement, FState0) of
330-
{ok, empty, FState} ->
331-
{ok, empty, FState};
332-
{ok, {MsgId, {MsgHeader, Msg}}, FState} ->
330+
case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of
331+
{ok, empty, QState} ->
332+
{ok, empty, QState};
333+
{ok, {MsgId, {MsgHeader, Msg}}, QState} ->
333334
IsDelivered = maps:is_key(delivery_count, MsgHeader),
334-
{ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, FState};
335+
{ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState};
335336
{timeout, _} ->
336337
{error, timeout}
337338
end.
@@ -352,19 +353,19 @@ basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid,
352353
ConsumerPrefetchCount, Args),
353354
{ok, QState}.
354355

355-
basic_cancel(ConsumerTag, ChPid, OkMsg, FState0) ->
356+
basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) ->
356357
maybe_send_reply(ChPid, OkMsg),
357-
rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), FState0).
358+
rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), QState0).
358359

359360
stateless_deliver(ServerId, Delivery) ->
360361
ok = rabbit_fifo_client:untracked_enqueue([ServerId],
361362
Delivery#delivery.message).
362363

363-
deliver(false, Delivery, FState0) ->
364-
rabbit_fifo_client:enqueue(Delivery#delivery.message, FState0);
365-
deliver(true, Delivery, FState0) ->
364+
deliver(false, Delivery, QState0) ->
365+
rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0);
366+
deliver(true, Delivery, QState0) ->
366367
rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
367-
Delivery#delivery.message, FState0).
368+
Delivery#delivery.message, QState0).
368369

369370
info(Q) ->
370371
info(Q, [name, durable, auto_delete, arguments, pid, state, messages,
@@ -387,8 +388,8 @@ stat(_Q) ->
387388
purge(Node) ->
388389
rabbit_fifo_client:purge(Node).
389390

390-
requeue(ConsumerTag, MsgIds, FState) ->
391-
rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, FState).
391+
requeue(ConsumerTag, MsgIds, QState) ->
392+
rabbit_fifo_client:return(quorum_ctag(ConsumerTag), MsgIds, QState).
392393

393394
cleanup_data_dir() ->
394395
Names = [Name || #amqqueue{pid = {Name, _}, quorum_nodes = Nodes}

0 commit comments

Comments
 (0)