Skip to content

Commit facddb3

Browse files
committed
Adopt new rabbit_backing_queue:discard implementation
Signed-off-by: Matteo Cafasso <[email protected]>
1 parent 1f7a27c commit facddb3

File tree

3 files changed

+9
-19
lines changed

3 files changed

+9
-19
lines changed

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ discard(#delivery{confirm = Confirm,
648648
true -> confirm_messages([MsgId], MTC, QName);
649649
false -> MTC
650650
end,
651-
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
651+
BQS1 = BQ:discard(Msg, SenderPid, BQS),
652652
{BQS1, MTC1}.
653653

654654
run_message_queue(ActiveConsumersChanged, State) ->
@@ -815,7 +815,7 @@ send_reject_publish(#delivery{confirm = true,
815815
amqqueue:get_name(Q), MsgSeqNo),
816816

817817
MTC1 = maps:remove(MsgId, MTC),
818-
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
818+
BQS1 = BQ:discard(Msg, SenderPid, BQS),
819819
State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
820820
send_reject_publish(#delivery{confirm = false}, State) ->
821821
State.

deps/rabbit/src/rabbit_priority_queue.erl

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -218,22 +218,12 @@ publish_delivered(Msg, MsgProps, ChPid,
218218
State = #passthrough{bq = BQ, bqs = BQS}) ->
219219
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, BQS)).
220220

221-
%% TODO this is a hack. The BQ api does not give us enough information
222-
%% here - if we had the Msg we could look at its priority and forward
223-
%% to the appropriate sub-BQ. But we don't so we are stuck.
224-
%%
225-
%% But fortunately VQ ignores discard/4, so we can too, *assuming we
226-
%% are talking to VQ*. discard/4 is used by HA, but that's "above" us
227-
%% (if in use) so we don't break that either, just some hypothetical
228-
%% alternate BQ implementation.
229-
discard(_MsgId, _ChPid, State = #state{}) ->
230-
State;
231-
%% We should have something a bit like this here:
232-
%% pick1(fun (_P, BQSN) ->
233-
%% BQ:discard(MsgId, ChPid, BQSN)
234-
%% end, Msg, State);
235-
discard(MsgId, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) ->
236-
?passthrough1(discard(MsgId, ChPid, BQS)).
221+
discard(Msg, ChPid, State = #state{bq = BQ}) ->
222+
pick1(fun (_P, BQSN) ->
223+
BQ:discard(Msg, ChPid, BQSN)
224+
end, Msg, State);
225+
discard(Msg, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) ->
226+
?passthrough1(discard(Msg, ChPid, BQS)).
237227

238228
drain_confirmed(State = #state{bq = BQ}) ->
239229
fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State);

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ publish_delivered(Msg, MsgProps, ChPid, State) ->
544544
State),
545545
{SeqId, a(maybe_update_rates(State1))}.
546546

547-
discard(_MsgId, _ChPid, State) -> State.
547+
discard(_Msg, _ChPid, State) -> State.
548548

549549
drain_confirmed(State = #vqstate { confirmed = C }) ->
550550
case sets:is_empty(C) of

0 commit comments

Comments
 (0)