Skip to content

Commit 170862c

Browse files
Merge pull request #7659 from rabbitmq/rin/queue-deleted-events-include-queue-type
Include the queue type in the queue_deleted rabbit_event
2 parents 25c15bf + fd23782 commit 170862c

9 files changed

+43
-47
lines changed

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,6 @@ recover(VHost) ->
133133
filter_pid_per_type(QPids) ->
134134
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
135135

136-
filter_resource_per_type(Resources) ->
137-
Queues = [begin
138-
{ok, Q} = lookup(Resource),
139-
QPid = amqqueue:get_pid(Q),
140-
{Resource, QPid}
141-
end || Resource <- Resources],
142-
lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
143-
144136
-spec stop(rabbit_types:vhost()) -> 'ok'.
145137
stop(VHost) ->
146138
%% Classic queues
@@ -1506,11 +1498,17 @@ delete_immediately(QPids) ->
15061498
end.
15071499

15081500
delete_immediately_by_resource(Resources) ->
1509-
{Classic, Quorum} = filter_resource_per_type(Resources),
1510-
[gen_server2:cast(QPid, delete_immediately) || {_, QPid} <- Classic],
1511-
[rabbit_quorum_queue:delete_immediately(Resource, QPid)
1512-
|| {Resource, QPid} <- Quorum],
1513-
ok.
1501+
lists:foreach(
1502+
fun(Resource) ->
1503+
{ok, Q} = lookup(Resource),
1504+
QPid = amqqueue:get_pid(Q),
1505+
case ?IS_CLASSIC(QPid) of
1506+
true ->
1507+
gen_server2:cast(QPid, delete_immediately);
1508+
_ ->
1509+
rabbit_quorum_queue:delete_immediately(Q)
1510+
end
1511+
end, Resources).
15141512

15151513
-spec delete
15161514
(amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) ->
@@ -1676,12 +1674,13 @@ notify_sent_queue_down(QPid) ->
16761674
resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast,
16771675
[{resume, ChPid}]}).
16781676

1679-
-spec internal_delete(name(), rabbit_types:username()) -> 'ok'.
1677+
-spec internal_delete(amqqueue:amqqueue(), rabbit_types:username()) -> 'ok'.
16801678

1681-
internal_delete(QueueName, ActingUser) ->
1682-
internal_delete(QueueName, ActingUser, normal).
1679+
internal_delete(Queue, ActingUser) ->
1680+
internal_delete(Queue, ActingUser, normal).
16831681

1684-
internal_delete(QueueName, ActingUser, Reason) ->
1682+
internal_delete(Queue, ActingUser, Reason) ->
1683+
QueueName = amqqueue:get_name(Queue),
16851684
case rabbit_db_queue:delete(QueueName, Reason) of
16861685
ok ->
16871686
ok;
@@ -1691,6 +1690,7 @@ internal_delete(QueueName, ActingUser, Reason) ->
16911690
rabbit_core_metrics:queue_deleted(QueueName),
16921691
ok = rabbit_event:notify(queue_deleted,
16931692
[{name, QueueName},
1693+
{type, amqqueue:get_type(Queue)},
16941694
{user_who_performed_action, ActingUser}])
16951695
end.
16961696

@@ -1874,7 +1874,7 @@ on_node_down(Node) ->
18741874
end,
18751875
notify_queue_binding_deletions(Deletions),
18761876
rabbit_core_metrics:queues_deleted(QueueNames),
1877-
notify_queues_deleted(QueueNames),
1877+
notify_transient_queues_deleted(QueueNames),
18781878
ok
18791879
end.
18801880

@@ -1897,11 +1897,12 @@ notify_queue_binding_deletions(QueueDeletions) ->
18971897
Deletions = rabbit_binding:process_deletions(QueueDeletions),
18981898
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER).
18991899

1900-
notify_queues_deleted(QueueDeletions) ->
1900+
notify_transient_queues_deleted(QueueDeletions) ->
19011901
lists:foreach(
19021902
fun(Queue) ->
19031903
ok = rabbit_event:notify(queue_deleted,
19041904
[{name, Queue},
1905+
{kind, rabbit_classic_queue},
19051906
{user, ?INTERNAL_USER}])
19061907
end,
19071908
QueueDeletions).

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,6 @@ terminate_delete(EmitStats, Reason0,
314314
State = #q{q = Q,
315315
backing_queue = BQ,
316316
status = Status}) ->
317-
QName = amqqueue:get_name(Q),
318317
ActingUser = terminated_by(Status),
319318
fun (BQS) ->
320319
Reason = case Reason0 of
@@ -330,7 +329,7 @@ terminate_delete(EmitStats, Reason0,
330329
%% logged.
331330
try
332331
%% don't care if the internal delete doesn't return 'ok'.
333-
rabbit_amqqueue:internal_delete(QName, ActingUser, Reason0)
332+
rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0)
334333
catch
335334
{error, ReasonE} -> error(ReasonE)
336335
end,

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,10 +466,9 @@ delete_crashed(Q, ActingUser) ->
466466
[Q, ActingUser]).
467467

468468
delete_crashed_internal(Q, ActingUser) ->
469-
QName = amqqueue:get_name(Q),
470469
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
471470
BQ:delete_crashed(Q),
472-
ok = rabbit_amqqueue:internal_delete(QName, ActingUser).
471+
ok = rabbit_amqqueue:internal_delete(Q, ActingUser).
473472

474473
recover_durable_queues(QueuesAndRecoveryTerms) ->
475474
{Results, Failures} =

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
restart_server/1,
2121
stop_server/1,
2222
delete/4,
23-
delete_immediately/2]).
23+
delete_immediately/1]).
2424
-export([state_info/1, info/2, stat/1, infos/1]).
2525
-export([settle/5, dequeue/5, consume/3, cancel/5]).
2626
-export([credit/5]).
@@ -223,20 +223,20 @@ start_cluster(Q) ->
223223
ActingUser}]),
224224
{new, NewQ};
225225
{error, Error} ->
226-
declare_queue_error(Error, QName, Leader, ActingUser)
226+
declare_queue_error(Error, NewQ, Leader, ActingUser)
227227
catch
228228
error:Error ->
229-
declare_queue_error(Error, QName, Leader, ActingUser)
229+
declare_queue_error(Error, NewQ, Leader, ActingUser)
230230
end;
231231
{existing, _} = Ex ->
232232
Ex
233233
end.
234234

235-
declare_queue_error(Error, QName, Leader, ActingUser) ->
236-
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
235+
declare_queue_error(Error, Queue, Leader, ActingUser) ->
236+
_ = rabbit_amqqueue:internal_delete(Queue, ActingUser),
237237
{protocol_error, internal_error,
238238
"Cannot declare quorum ~ts on node '~ts' with leader on node '~ts': ~255p",
239-
[rabbit_misc:rs(QName), node(), Leader, Error]}.
239+
[rabbit_misc:rs(amqqueue:get_name(Queue)), node(), Leader, Error]}.
240240

241241
ra_machine(Q) ->
242242
{module, rabbit_fifo, ra_machine_config(Q)}.
@@ -660,7 +660,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
660660
ok = force_delete_queue(Servers)
661661
end,
662662
notify_decorators(QName, shutdown),
663-
ok = delete_queue_data(QName, ActingUser),
663+
ok = delete_queue_data(Q, ActingUser),
664664
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
665665
?RPC_TIMEOUT),
666666
{ok, ReadyMsgs};
@@ -671,7 +671,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
671671
true ->
672672
%% If all ra nodes were already down, the delete
673673
%% has succeed
674-
delete_queue_data(QName, ActingUser),
674+
delete_queue_data(Q, ActingUser),
675675
{ok, ReadyMsgs};
676676
false ->
677677
%% attempt forced deletion of all servers
@@ -682,7 +682,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
682682
[rabbit_misc:rs(QName), Errs]),
683683
ok = force_delete_queue(Servers),
684684
notify_decorators(QName, shutdown),
685-
delete_queue_data(QName, ActingUser),
685+
delete_queue_data(Q, ActingUser),
686686
{ok, ReadyMsgs}
687687
end
688688
end.
@@ -701,15 +701,15 @@ force_delete_queue(Servers) ->
701701
end || S <- Servers],
702702
ok.
703703

704-
delete_queue_data(QName, ActingUser) ->
705-
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
704+
delete_queue_data(Queue, ActingUser) ->
705+
_ = rabbit_amqqueue:internal_delete(Queue, ActingUser),
706706
ok.
707707

708708

709-
delete_immediately(Resource, {_Name, _} = QPid) ->
710-
_ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER),
711-
{ok, _} = ra:delete_cluster([QPid]),
712-
rabbit_core_metrics:queue_deleted(Resource),
709+
delete_immediately(Queue) ->
710+
_ = rabbit_amqqueue:internal_delete(Queue, ?INTERNAL_USER),
711+
{ok, _} = ra:delete_cluster([amqqueue:get_pid(Queue)]),
712+
rabbit_core_metrics:queue_deleted(amqqueue:get_name(Queue)),
713713
ok.
714714

715715
settle(_QName, complete, CTag, MsgIds, QState) ->

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,7 @@ delete_stream(Q, ActingUser)
177177
#{name := StreamId} = amqqueue:get_type_state(Q),
178178
case process_command({delete_stream, StreamId, #{}}) of
179179
{ok, ok, _} ->
180-
QName = amqqueue:get_name(Q),
181-
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
180+
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
182181
{ok, {ok, 0}};
183182
Err ->
184183
Err

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ create_stream(Q0) ->
160160
ActingUser}]),
161161
{new, Q};
162162
Error ->
163-
164-
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
163+
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
165164
{protocol_error, internal_error, "Cannot declare a queue '~ts' on node '~ts': ~255p",
166165
[rabbit_misc:rs(QName), node(), Error]}
167166
end;

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,7 @@ assert_benign({error, not_found}, _) -> ok;
413413
assert_benign({error, {absent, Q, _}}, ActingUser) ->
414414
%% Removing the database entries here is safe. If/when the down node
415415
%% restarts, it will clear out the on-disk storage of the queue.
416-
QName = amqqueue:get_name(Q),
417-
rabbit_amqqueue:internal_delete(QName, ActingUser).
416+
rabbit_amqqueue:internal_delete(Q, ActingUser).
418417

419418
-spec exists(vhost:name()) -> boolean().
420419

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -911,7 +911,7 @@ bq_queue_recover1(Config) ->
911911
rabbit_variable_queue:fetch(true, VQ1),
912912
CountMinusOne = rabbit_variable_queue:len(VQ2),
913913
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
914-
ok = rabbit_amqqueue:internal_delete(QName, <<"acting-user">>)
914+
ok = rabbit_amqqueue:internal_delete(Q1, <<"acting-user">>)
915915
end),
916916
passed.
917917

deps/rabbitmq_mqtt/src/rabbit_mqtt_qos0_queue.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ declare(Q0, _Node) ->
9797
delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
9898
QName = amqqueue:get_name(Q),
9999
log_delete(QName, amqqueue:get_exclusive_owner(Q)),
100-
ok = rabbit_amqqueue:internal_delete(QName, ActingUser),
100+
ok = rabbit_amqqueue:internal_delete(Q, ActingUser),
101101
{ok, 0}.
102102

103103
-spec deliver([{amqqueue:amqqueue(), stateless}], Delivery :: term()) ->
@@ -164,7 +164,7 @@ recover(_VHost, Queues) ->
164164
true = is_recoverable(Q),
165165
QName = amqqueue:get_name(Q),
166166
log_delete(QName, amqqueue:get_exclusive_owner(Q)),
167-
rabbit_amqqueue:internal_delete(QName, ?INTERNAL_USER)
167+
rabbit_amqqueue:internal_delete(Q, ?INTERNAL_USER)
168168
end, Queues),
169169
%% We mark the queue recovery as failed because these queues are not really
170170
%% recovered, but deleted.

0 commit comments

Comments
 (0)