|
70 | 70 | -export([queue/1, queue_names/1]).
|
71 | 71 |
|
72 | 72 | -export([kill_queue/2, kill_queue/3, kill_queue_hard/2, kill_queue_hard/3]).
|
| 73 | +-export([delete_transient_queues_on_node/1]). |
73 | 74 |
|
74 | 75 | %% internal
|
75 | 76 | -export([internal_declare/2, internal_delete/2, run_backing_queue/3,
|
@@ -1839,13 +1840,39 @@ on_node_up(_Node) ->
|
1839 | 1840 | -spec on_node_down(node()) -> 'ok'.
|
1840 | 1841 |
|
1841 | 1842 | on_node_down(Node) ->
|
| 1843 | + case delete_transient_queues_on_node(Node) of |
| 1844 | + ok -> |
| 1845 | + ok; |
| 1846 | + {error, timeout} -> |
| 1847 | + %% This case is possible when running Khepri. The node going down |
| 1848 | + %% could leave the cluster in a minority so the command to delete |
| 1849 | + %% the transient queue records would fail. Also see |
| 1850 | + %% `rabbit_khepri:init/0': we also try this deletion when the node |
| 1851 | + %% restarts - a time that the cluster is very likely to have a |
| 1852 | + %% majority - to ensure these records are deleted. |
| 1853 | + rabbit_log:warning("transient queues for node '~ts' could not be " |
| 1854 | + "deleted because of a timeout. These queues " |
| 1855 | + "will be removed when node '~ts' restarts or " |
| 1856 | + "is removed from the cluster.", [Node, Node]), |
| 1857 | + ok |
| 1858 | + end. |
| 1859 | + |
| 1860 | +-spec delete_transient_queues_on_node(Node) -> Ret when |
| 1861 | + Node :: node(), |
| 1862 | + Ret :: ok | rabbit_khepri:timeout_error(). |
| 1863 | + |
| 1864 | +delete_transient_queues_on_node(Node) -> |
1842 | 1865 | {Time, Ret} = timer:tc(fun() -> rabbit_db_queue:delete_transient(filter_transient_queues_to_delete(Node)) end),
|
1843 | 1866 | case Ret of
|
1844 |
| - ok -> ok; |
1845 |
| - {QueueNames, Deletions} -> |
| 1867 | + ok -> |
| 1868 | + ok; |
| 1869 | + {error, timeout} = Err -> |
| 1870 | + Err; |
| 1871 | + {QueueNames, Deletions} when is_list(QueueNames) -> |
1846 | 1872 | case length(QueueNames) of
|
1847 | 1873 | 0 -> ok;
|
1848 |
| - N -> rabbit_log:info("~b transient queues from an old incarnation of node ~tp deleted in ~fs", |
| 1874 | + N -> rabbit_log:info("~b transient queues from node '~ts' " |
| 1875 | + "deleted in ~fs", |
1849 | 1876 | [N, Node, Time / 1_000_000])
|
1850 | 1877 | end,
|
1851 | 1878 | notify_queue_binding_deletions(Deletions),
|
|
0 commit comments