|
75 | 75 | -export([queue/1, queue_names/1]).
|
76 | 76 |
|
77 | 77 | -export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]).
|
| 78 | +-export([delete_transient_queues_on_node/1]). |
78 | 79 |
|
79 | 80 | %% internal
|
80 | 81 | -export([internal_declare/2, internal_delete/2, run_backing_queue/3,
|
@@ -2055,13 +2056,39 @@ maybe_clear_recoverable_node(Node) ->
|
2055 | 2056 | -spec on_node_down(node()) -> 'ok'.
|
2056 | 2057 |
|
2057 | 2058 | on_node_down(Node) ->
|
| 2059 | + case delete_transient_queues_on_node(Node) of |
| 2060 | + ok -> |
| 2061 | + ok; |
| 2062 | + {error, timeout} -> |
| 2063 | + %% This case is possible when running Khepri. The node going down |
| 2064 | + %% could leave the cluster in a minority so the command to delete |
| 2065 | + %% the transient queue records would fail. Also see |
| 2066 | + %% `rabbit_khepri:init/0': we also try this deletion when the node |
| 2067 | + %% restarts - a time that the cluster is very likely to have a |
| 2068 | + %% majority - to ensure these records are deleted. |
| 2069 | + rabbit_log:warning("transient queues for node '~ts' could not be " |
| 2070 | + "deleted because of a timeout. These queues " |
| 2071 | + "will be removed when node '~ts' restarts or " |
| 2072 | + "is removed from the cluster.", [Node, Node]), |
| 2073 | + ok |
| 2074 | + end. |
| 2075 | + |
| 2076 | +-spec delete_transient_queues_on_node(Node) -> Ret when |
| 2077 | + Node :: node(), |
| 2078 | + Ret :: ok | rabbit_khepri:timeout_error(). |
| 2079 | + |
| 2080 | +delete_transient_queues_on_node(Node) -> |
2058 | 2081 | {Time, Ret} = timer:tc(fun() -> rabbit_db_queue:delete_transient(filter_transient_queues_to_delete(Node)) end),
|
2059 | 2082 | case Ret of
|
2060 |
| - ok -> ok; |
2061 |
| - {QueueNames, Deletions} -> |
| 2083 | + ok -> |
| 2084 | + ok; |
| 2085 | + {error, timeout} = Err -> |
| 2086 | + Err; |
| 2087 | + {QueueNames, Deletions} when is_list(QueueNames) -> |
2062 | 2088 | case length(QueueNames) of
|
2063 | 2089 | 0 -> ok;
|
2064 |
| - N -> rabbit_log:info("~b transient queues from an old incarnation of node ~tp deleted in ~fs", |
| 2090 | + N -> rabbit_log:info("~b transient queues from node '~ts' " |
| 2091 | + "deleted in ~fs", |
2065 | 2092 | [N, Node, Time / 1_000_000])
|
2066 | 2093 | end,
|
2067 | 2094 | notify_queue_binding_deletions(Deletions),
|
|
0 commit comments