@@ -215,7 +215,8 @@ recover() ->
215
215
% % Clear out remnants of old incarnation, in case we restarted
216
216
% % faster than other nodes handled DOWN messages from us.
217
217
on_node_down (node ()),
218
- DurableQueues = find_durable_queues (),
218
+ DurableQueues = queues_to_recover (),
219
+
219
220
L = length (DurableQueues ),
220
221
221
222
% % if there are not enough file handles, the server might hang
@@ -257,6 +258,31 @@ start(Qs) ->
257
258
[Pid ! {self (), go } || # amqqueue {pid = Pid } <- Qs ],
258
259
ok .
259
260
261
+ queues_to_recover () ->
262
+ DurableQueues = find_durable_queues (),
263
+ VHosts = rabbit_vhost :list (),
264
+
265
+ {QueuesWithVhost , QueuesWithoutVhost } = lists :partition (
266
+ fun (# amqqueue {name = # resource {virtual_host = VHost }}) ->
267
+ lists :member (VHost , VHosts )
268
+ end ,
269
+ DurableQueues ),
270
+
271
+ {LocalQueuesWithoutVhost , _RemoteQueuesWithoutVhost } = lists :partition (
272
+ fun (# amqqueue {pid = QPid }) -> node (QPid ) == node () end ,
273
+ QueuesWithoutVhost ),
274
+
275
+ {atomic , ok } =
276
+ mnesia :sync_transaction (
277
+ fun () ->
278
+ rabbit_log :error (" Deleting ~p~n " , [LocalQueuesWithoutVhost ]),
279
+ [ internal_delete1 (Name , false )
280
+ || # amqqueue {name = Name } <- LocalQueuesWithoutVhost ],
281
+ ok
282
+ end ),
283
+
284
+ QueuesWithVhost .
285
+
260
286
find_durable_queues () ->
261
287
Node = node (),
262
288
mnesia :async_dirty (
@@ -576,7 +602,8 @@ list_local_names() ->
576
602
State =/= crashed ,
577
603
node () =:= node (QPid ) ].
578
604
579
- list (VHostPath ) -> list (VHostPath , rabbit_queue ).
605
+ list (VHostPath ) ->
606
+ list (VHostPath , rabbit_queue ).
580
607
581
608
% % Not dirty_match_object since that would not be transactional when used in a
582
609
% % tx context
@@ -590,12 +617,16 @@ list(VHostPath, TableName) ->
590
617
end ).
591
618
592
619
list_down (VHostPath ) ->
593
- Present = list (VHostPath ),
594
- Durable = list (VHostPath , rabbit_durable_queue ),
595
- PresentS = sets :from_list ([N || # amqqueue {name = N } <- Present ]),
596
- sets :to_list (sets :filter (fun (# amqqueue {name = N }) ->
597
- not sets :is_element (N , PresentS )
598
- end , sets :from_list (Durable ))).
620
+ case rabbit_vhost :exists (VHostPath ) of
621
+ false -> [];
622
+ true ->
623
+ Present = list (VHostPath ),
624
+ Durable = list (VHostPath , rabbit_durable_queue ),
625
+ PresentS = sets :from_list ([N || # amqqueue {name = N } <- Present ]),
626
+ sets :to_list (sets :filter (fun (# amqqueue {name = N }) ->
627
+ not sets :is_element (N , PresentS )
628
+ end , sets :from_list (Durable )))
629
+ end .
599
630
600
631
info_keys () -> rabbit_amqqueue_process :info_keys ().
601
632
0 commit comments