Skip to content

Commit 557c23b

Browse files
committed
Do not restart DLX worker if leader is non-local
The rabbit_fifo_dlx_worker should be co-located with the quorum queue leader. If a new leader on a different node gets elected before the rabbit_fifo_dlx_worker initialises (i.e. registers itself as a consumer), it should stop itself normally, such that it is not restarted by rabbit_fifo_dlx_sup. Another rabbit_fifo_dlx_worker should be created on the new quorum queue leader node.
1 parent 76db0b5 commit 557c23b

File tree

3 files changed

+19
-14
lines changed

3 files changed

+19
-14
lines changed

deps/rabbit/src/rabbit_fifo_dlx_client.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ settle(MsgIds, #state{leader = Leader} = State)
3232
{ok, State}.
3333

3434
-spec checkout(rabbit_amqqueue:name(), ra:server_id(), non_neg_integer()) ->
35-
{ok, state()} | {error, ra_command_failed}.
35+
{ok, state()} | {error, non_local_leader | ra_command_failed}.
3636
checkout(QResource, Leader, NumUnsettled) ->
3737
Cmd = rabbit_fifo_dlx:make_checkout(self(), NumUnsettled),
3838
State = #state{queue_resource = QResource,
@@ -46,10 +46,10 @@ process_command(Cmd, #state{leader = Leader} = State, Tries) ->
4646
case ra:process_command(Leader, Cmd, 60_000) of
4747
{ok, ok, Leader} ->
4848
{ok, State#state{leader = Leader}};
49-
{ok, ok, L} ->
49+
{ok, ok, NonLocalLeader} ->
5050
rabbit_log:warning("Failed to process command ~tp on quorum queue leader ~tp because actual leader is ~tp.",
51-
[Cmd, Leader, L]),
52-
{error, ra_command_failed};
51+
[Cmd, Leader, NonLocalLeader]),
52+
{error, non_local_leader};
5353
Err ->
5454
rabbit_log:warning("Failed to process command ~tp on quorum queue leader ~tp: ~tp~n"
5555
"Trying ~b more time(s)...",

deps/rabbit/src/rabbit_fifo_dlx_sup.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,6 @@ init([]) ->
3030
ChildSpec = #{id => Worker,
3131
start => {Worker, start_link, []},
3232
type => worker,
33+
restart => transient,
3334
modules => [Worker]},
3435
{ok, {SupFlags, [ChildSpec]}}.

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,23 +106,27 @@ init(QRef) ->
106106
{ok, undefined, {continue, QRef}}.
107107

108108
-spec handle_continue(rabbit_amqqueue:name(), undefined) ->
109-
{noreply, state()}.
109+
{noreply, state()} | {stop, term(), undefined}.
110110
handle_continue(QRef, undefined) ->
111111
{ok, Prefetch} = application:get_env(rabbit,
112112
dead_letter_worker_consumer_prefetch),
113113
{ok, SettleTimeout} = application:get_env(rabbit,
114114
dead_letter_worker_publisher_confirm_timeout),
115115
{ok, Q} = rabbit_amqqueue:lookup(QRef),
116116
{ClusterName, _MaybeOldLeaderNode} = amqqueue:get_pid(Q),
117-
{ok, ConsumerState} = rabbit_fifo_dlx_client:checkout(QRef,
118-
{ClusterName, node()},
119-
Prefetch),
120-
{noreply, lookup_topology(#state{queue_ref = QRef,
121-
queue_type_state = rabbit_queue_type:init(),
122-
settle_timeout = SettleTimeout,
123-
dlx_client_state = ConsumerState,
124-
monitor_ref = erlang:monitor(process, ClusterName)
125-
})}.
117+
case rabbit_fifo_dlx_client:checkout(QRef, {ClusterName, node()}, Prefetch) of
118+
{ok, ConsumerState} ->
119+
{noreply, lookup_topology(#state{queue_ref = QRef,
120+
queue_type_state = rabbit_queue_type:init(),
121+
settle_timeout = SettleTimeout,
122+
dlx_client_state = ConsumerState,
123+
monitor_ref = erlang:monitor(process, ClusterName)
124+
})};
125+
{error, non_local_leader = Reason} ->
126+
{stop, {shutdown, Reason}, undefined};
127+
Error ->
128+
{stop, Error, undefined}
129+
end.
126130

127131
terminate(_Reason, State) ->
128132
cancel_timer(State).

0 commit comments

Comments
 (0)