@@ -980,23 +980,30 @@ delete_transient_in_khepri(FilterFun) ->
980
980
[? KHEPRI_WILDCARD_STAR ,
981
981
# if_data_matches {
982
982
pattern = amqqueue :pattern_match_on_durable (false )}],
983
- Ret = rabbit_khepri :fold (
984
- PathPattern ,
985
- fun (Path , #{data := Queue }, Acc ) ->
986
- case FilterFun (Queue ) of
987
- true ->
988
- QueueName = khepri_queue_path_to_name (Path ),
989
- case delete_in_khepri (QueueName , false ) of
990
- ok -> Acc ;
991
- Deletions -> [{QueueName , Deletions } | Acc ]
992
- end ;
993
- false ->
994
- Acc
995
- end
996
- end , [],
997
- #{copy_tree_and_run_from_caller => true }),
998
- case Ret of
999
- {ok , Items } ->
983
+ % % The `FilterFun' might try to determine if the queue's process is alive.
984
+ % % This can cause a `calling_self' exception if we use the `FilterFun'
985
+ % % within the function passed to `khepri:fold/5' since the Khepri server
986
+ % % process might call itself. Instead we can fetch all of the transient
987
+ % % queues with `get_many' and then filter and fold the results outside of
988
+ % % Khepri's Ra server process.
989
+ case rabbit_khepri :get_many (PathPattern ) of
990
+ {ok , Qs } ->
991
+ Items = maps :fold (
992
+ fun (Path , #{data := Queue }, Acc ) ->
993
+ case FilterFun (Queue ) of
994
+ true ->
995
+ QueueName = khepri_queue_path_to_name (
996
+ Path ),
997
+ case delete_in_khepri (QueueName , false ) of
998
+ ok ->
999
+ Acc ;
1000
+ Deletions ->
1001
+ [{QueueName , Deletions } | Acc ]
1002
+ end ;
1003
+ false ->
1004
+ Acc
1005
+ end
1006
+ end , [], Qs ),
1000
1007
{QueueNames , Deletions } = lists :unzip (Items ),
1001
1008
{QueueNames , lists :flatten (Deletions )};
1002
1009
{error , _ } = Error ->
0 commit comments