Skip to content

Commit 1f84a14

Browse files
committed
Merge branch 'main' into ra_tick_only_one_pid
2 parents c443f82 + 557c23b commit 1f84a14

19 files changed

+202
-79
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ _APP_ENV = """[
7878
]},
7979
{halt_on_upgrade_failure, true},
8080
{ssl_apps, [asn1, crypto, public_key, ssl]},
81+
%% classic queue storage implementation version
82+
{classic_queue_default_version, 2},
8183
%% see rabbitmq-server#114
8284
{mirroring_flow_control, true},
8385
{mirroring_sync_batch_size, 4096},

deps/rabbit/Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ define PROJECT_ENV
6565
]},
6666
{halt_on_upgrade_failure, true},
6767
{ssl_apps, [asn1, crypto, public_key, ssl]},
68+
%% classic queue storage implementation version
69+
{classic_queue_default_version, 2},
6870
%% see rabbitmq-server#114
6971
{mirroring_flow_control, true},
7072
{mirroring_sync_batch_size, 4096},

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2408,7 +2408,7 @@ end}.
24082408

24092409
{translation, "rabbit.classic_queue_default_version",
24102410
fun(Conf) ->
2411-
case cuttlefish:conf_get("classic_queue.default_version", Conf, 1) of
2411+
case cuttlefish:conf_get("classic_queue.default_version", Conf, 2) of
24122412
1 -> 1;
24132413
2 -> 2;
24142414
_ -> cuttlefish:unset()

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -133,14 +133,6 @@ recover(VHost) ->
133133
filter_pid_per_type(QPids) ->
134134
lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids).
135135

136-
filter_resource_per_type(Resources) ->
137-
Queues = [begin
138-
{ok, Q} = lookup(Resource),
139-
QPid = amqqueue:get_pid(Q),
140-
{Resource, QPid}
141-
end || Resource <- Resources],
142-
lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues).
143-
144136
-spec stop(rabbit_types:vhost()) -> 'ok'.
145137
stop(VHost) ->
146138
%% Classic queues
@@ -1506,11 +1498,17 @@ delete_immediately(QPids) ->
15061498
end.
15071499

15081500
delete_immediately_by_resource(Resources) ->
1509-
{Classic, Quorum} = filter_resource_per_type(Resources),
1510-
[gen_server2:cast(QPid, delete_immediately) || {_, QPid} <- Classic],
1511-
[rabbit_quorum_queue:delete_immediately(Resource, QPid)
1512-
|| {Resource, QPid} <- Quorum],
1513-
ok.
1501+
lists:foreach(
1502+
fun(Resource) ->
1503+
{ok, Q} = lookup(Resource),
1504+
QPid = amqqueue:get_pid(Q),
1505+
case ?IS_CLASSIC(QPid) of
1506+
true ->
1507+
gen_server2:cast(QPid, delete_immediately);
1508+
_ ->
1509+
rabbit_quorum_queue:delete_immediately(Q)
1510+
end
1511+
end, Resources).
15141512

15151513
-spec delete
15161514
(amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) ->
@@ -1676,12 +1674,13 @@ notify_sent_queue_down(QPid) ->
16761674
resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast,
16771675
[{resume, ChPid}]}).
16781676

1679-
-spec internal_delete(name(), rabbit_types:username()) -> 'ok'.
1677+
-spec internal_delete(amqqueue:amqqueue(), rabbit_types:username()) -> 'ok'.
16801678

1681-
internal_delete(QueueName, ActingUser) ->
1682-
internal_delete(QueueName, ActingUser, normal).
1679+
internal_delete(Queue, ActingUser) ->
1680+
internal_delete(Queue, ActingUser, normal).
16831681

1684-
internal_delete(QueueName, ActingUser, Reason) ->
1682+
internal_delete(Queue, ActingUser, Reason) ->
1683+
QueueName = amqqueue:get_name(Queue),
16851684
case rabbit_db_queue:delete(QueueName, Reason) of
16861685
ok ->
16871686
ok;
@@ -1691,6 +1690,7 @@ internal_delete(QueueName, ActingUser, Reason) ->
16911690
rabbit_core_metrics:queue_deleted(QueueName),
16921691
ok = rabbit_event:notify(queue_deleted,
16931692
[{name, QueueName},
1693+
{type, amqqueue:get_type(Queue)},
16941694
{user_who_performed_action, ActingUser}])
16951695
end.
16961696

@@ -1874,7 +1874,7 @@ on_node_down(Node) ->
18741874
end,
18751875
notify_queue_binding_deletions(Deletions),
18761876
rabbit_core_metrics:queues_deleted(QueueNames),
1877-
notify_queues_deleted(QueueNames),
1877+
notify_transient_queues_deleted(QueueNames),
18781878
ok
18791879
end.
18801880

@@ -1897,11 +1897,12 @@ notify_queue_binding_deletions(QueueDeletions) ->
18971897
Deletions = rabbit_binding:process_deletions(QueueDeletions),
18981898
rabbit_binding:notify_deletions(Deletions, ?INTERNAL_USER).
18991899

1900-
notify_queues_deleted(QueueDeletions) ->
1900+
notify_transient_queues_deleted(QueueDeletions) ->
19011901
lists:foreach(
19021902
fun(Queue) ->
19031903
ok = rabbit_event:notify(queue_deleted,
19041904
[{name, Queue},
1905+
{kind, rabbit_classic_queue},
19051906
{user, ?INTERNAL_USER}])
19061907
end,
19071908
QueueDeletions).

deps/rabbit/src/rabbit_amqqueue_process.erl

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,6 @@ terminate_delete(EmitStats, Reason0,
314314
State = #q{q = Q,
315315
backing_queue = BQ,
316316
status = Status}) ->
317-
QName = amqqueue:get_name(Q),
318317
ActingUser = terminated_by(Status),
319318
fun (BQS) ->
320319
Reason = case Reason0 of
@@ -330,7 +329,7 @@ terminate_delete(EmitStats, Reason0,
330329
%% logged.
331330
try
332331
%% don't care if the internal delete doesn't return 'ok'.
333-
rabbit_amqqueue:internal_delete(QName, ActingUser, Reason0)
332+
rabbit_amqqueue:internal_delete(Q, ActingUser, Reason0)
334333
catch
335334
{error, ReasonE} -> error(ReasonE)
336335
end,
@@ -471,12 +470,10 @@ init_queue_mode(Mode, State = #q {backing_queue = BQ,
471470

472471
init_queue_version(Version0, State = #q {backing_queue = BQ,
473472
backing_queue_state = BQS}) ->
474-
%% When the version is undefined we use the default version 1.
475-
%% We want to BQ:set_queue_version in all cases because a v2
476-
%% policy might have been deleted, for example, and we want
477-
%% the queue to go back to v1.
473+
%% When the version is undefined we use the default version 2 starting with
474+
%% RabbitMQ 3.12.0.
478475
Version = case Version0 of
479-
undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 1);
476+
undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 2);
480477
_ -> Version0
481478
end,
482479
BQS1 = BQ:set_queue_version(Version, BQS),

deps/rabbit/src/rabbit_classic_queue.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,10 +466,9 @@ delete_crashed(Q, ActingUser) ->
466466
[Q, ActingUser]).
467467

468468
delete_crashed_internal(Q, ActingUser) ->
469-
QName = amqqueue:get_name(Q),
470469
{ok, BQ} = application:get_env(rabbit, backing_queue_module),
471470
BQ:delete_crashed(Q),
472-
ok = rabbit_amqqueue:internal_delete(QName, ActingUser).
471+
ok = rabbit_amqqueue:internal_delete(Q, ActingUser).
473472

474473
recover_durable_queues(QueuesAndRecoveryTerms) ->
475474
{Results, Failures} =

deps/rabbit/src/rabbit_fifo_dlx.erl

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,20 @@ apply(_, {dlx, #checkout{consumer = Pid,
114114
apply(_, {dlx, #checkout{consumer = ConsumerPid,
115115
prefetch = Prefetch}},
116116
at_least_once,
117-
#?MODULE{consumer = #dlx_consumer{checked_out = CheckedOutOldConsumer},
117+
#?MODULE{consumer = #dlx_consumer{pid = OldConsumerPid,
118+
checked_out = CheckedOutOldConsumer},
118119
discards = Discards0,
119120
msg_bytes = Bytes,
120121
msg_bytes_checkout = BytesCheckout} = State0) ->
121122
%% Since we allow only a single consumer, the new consumer replaces the old consumer.
123+
case ConsumerPid of
124+
OldConsumerPid ->
125+
ok;
126+
_ ->
127+
rabbit_log:debug("Terminating ~p since ~p becomes active rabbit_fifo_dlx_worker",
128+
[OldConsumerPid, ConsumerPid]),
129+
ensure_worker_terminated(State0)
130+
end,
122131
%% All checked out messages to the old consumer need to be returned to the discards queue
123132
%% such that these messages will be re-delivered to the new consumer.
124133
%% When inserting back into the discards queue, we respect the original order in which messages

deps/rabbit/src/rabbit_fifo_dlx_client.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ settle(MsgIds, #state{leader = Leader} = State)
3232
{ok, State}.
3333

3434
-spec checkout(rabbit_amqqueue:name(), ra:server_id(), non_neg_integer()) ->
35-
{ok, state()} | {error, ra_command_failed}.
35+
{ok, state()} | {error, non_local_leader | ra_command_failed}.
3636
checkout(QResource, Leader, NumUnsettled) ->
3737
Cmd = rabbit_fifo_dlx:make_checkout(self(), NumUnsettled),
3838
State = #state{queue_resource = QResource,
@@ -46,10 +46,10 @@ process_command(Cmd, #state{leader = Leader} = State, Tries) ->
4646
case ra:process_command(Leader, Cmd, 60_000) of
4747
{ok, ok, Leader} ->
4848
{ok, State#state{leader = Leader}};
49-
{ok, ok, L} ->
49+
{ok, ok, NonLocalLeader} ->
5050
rabbit_log:warning("Failed to process command ~tp on quorum queue leader ~tp because actual leader is ~tp.",
51-
[Cmd, Leader, L]),
52-
{error, ra_command_failed};
51+
[Cmd, Leader, NonLocalLeader]),
52+
{error, non_local_leader};
5353
Err ->
5454
rabbit_log:warning("Failed to process command ~tp on quorum queue leader ~tp: ~tp~n"
5555
"Trying ~b more time(s)...",

deps/rabbit/src/rabbit_fifo_dlx_sup.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,12 @@ start_link() ->
2424

2525
init([]) ->
2626
SupFlags = #{strategy => simple_one_for_one,
27-
intensity => 1,
28-
period => 5},
27+
intensity => 100,
28+
period => 1},
2929
Worker = rabbit_fifo_dlx_worker,
3030
ChildSpec = #{id => Worker,
3131
start => {Worker, start_link, []},
3232
type => worker,
33+
restart => transient,
3334
modules => [Worker]},
3435
{ok, {SupFlags, [ChildSpec]}}.

deps/rabbit/src/rabbit_fifo_dlx_worker.erl

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -106,23 +106,27 @@ init(QRef) ->
106106
{ok, undefined, {continue, QRef}}.
107107

108108
-spec handle_continue(rabbit_amqqueue:name(), undefined) ->
109-
{noreply, state()}.
109+
{noreply, state()} | {stop, term(), undefined}.
110110
handle_continue(QRef, undefined) ->
111111
{ok, Prefetch} = application:get_env(rabbit,
112112
dead_letter_worker_consumer_prefetch),
113113
{ok, SettleTimeout} = application:get_env(rabbit,
114114
dead_letter_worker_publisher_confirm_timeout),
115115
{ok, Q} = rabbit_amqqueue:lookup(QRef),
116116
{ClusterName, _MaybeOldLeaderNode} = amqqueue:get_pid(Q),
117-
{ok, ConsumerState} = rabbit_fifo_dlx_client:checkout(QRef,
118-
{ClusterName, node()},
119-
Prefetch),
120-
{noreply, lookup_topology(#state{queue_ref = QRef,
121-
queue_type_state = rabbit_queue_type:init(),
122-
settle_timeout = SettleTimeout,
123-
dlx_client_state = ConsumerState,
124-
monitor_ref = erlang:monitor(process, ClusterName)
125-
})}.
117+
case rabbit_fifo_dlx_client:checkout(QRef, {ClusterName, node()}, Prefetch) of
118+
{ok, ConsumerState} ->
119+
{noreply, lookup_topology(#state{queue_ref = QRef,
120+
queue_type_state = rabbit_queue_type:init(),
121+
settle_timeout = SettleTimeout,
122+
dlx_client_state = ConsumerState,
123+
monitor_ref = erlang:monitor(process, ClusterName)
124+
})};
125+
{error, non_local_leader = Reason} ->
126+
{stop, {shutdown, Reason}, undefined};
127+
Error ->
128+
{stop, Error, undefined}
129+
end.
126130

127131
terminate(_Reason, State) ->
128132
cancel_timer(State).

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
restart_server/1,
2121
stop_server/1,
2222
delete/4,
23-
delete_immediately/2]).
23+
delete_immediately/1]).
2424
-export([state_info/1, info/2, stat/1, infos/1]).
2525
-export([settle/5, dequeue/5, consume/3, cancel/5]).
2626
-export([credit/5]).
@@ -223,20 +223,20 @@ start_cluster(Q) ->
223223
ActingUser}]),
224224
{new, NewQ};
225225
{error, Error} ->
226-
declare_queue_error(Error, QName, Leader, ActingUser)
226+
declare_queue_error(Error, NewQ, Leader, ActingUser)
227227
catch
228228
error:Error ->
229-
declare_queue_error(Error, QName, Leader, ActingUser)
229+
declare_queue_error(Error, NewQ, Leader, ActingUser)
230230
end;
231231
{existing, _} = Ex ->
232232
Ex
233233
end.
234234

235-
declare_queue_error(Error, QName, Leader, ActingUser) ->
236-
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
235+
declare_queue_error(Error, Queue, Leader, ActingUser) ->
236+
_ = rabbit_amqqueue:internal_delete(Queue, ActingUser),
237237
{protocol_error, internal_error,
238238
"Cannot declare quorum ~ts on node '~ts' with leader on node '~ts': ~255p",
239-
[rabbit_misc:rs(QName), node(), Leader, Error]}.
239+
[rabbit_misc:rs(amqqueue:get_name(Queue)), node(), Leader, Error]}.
240240

241241
ra_machine(Q) ->
242242
{module, rabbit_fifo, ra_machine_config(Q)}.
@@ -660,7 +660,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
660660
ok = force_delete_queue(Servers)
661661
end,
662662
notify_decorators(QName, shutdown),
663-
ok = delete_queue_data(QName, ActingUser),
663+
ok = delete_queue_data(Q, ActingUser),
664664
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
665665
?RPC_TIMEOUT),
666666
{ok, ReadyMsgs};
@@ -671,7 +671,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
671671
true ->
672672
%% If all ra nodes were already down, the delete
673673
%% has succeed
674-
delete_queue_data(QName, ActingUser),
674+
delete_queue_data(Q, ActingUser),
675675
{ok, ReadyMsgs};
676676
false ->
677677
%% attempt forced deletion of all servers
@@ -682,7 +682,7 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
682682
[rabbit_misc:rs(QName), Errs]),
683683
ok = force_delete_queue(Servers),
684684
notify_decorators(QName, shutdown),
685-
delete_queue_data(QName, ActingUser),
685+
delete_queue_data(Q, ActingUser),
686686
{ok, ReadyMsgs}
687687
end
688688
end.
@@ -701,15 +701,15 @@ force_delete_queue(Servers) ->
701701
end || S <- Servers],
702702
ok.
703703

704-
delete_queue_data(QName, ActingUser) ->
705-
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
704+
delete_queue_data(Queue, ActingUser) ->
705+
_ = rabbit_amqqueue:internal_delete(Queue, ActingUser),
706706
ok.
707707

708708

709-
delete_immediately(Resource, {_Name, _} = QPid) ->
710-
_ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER),
711-
{ok, _} = ra:delete_cluster([QPid]),
712-
rabbit_core_metrics:queue_deleted(Resource),
709+
delete_immediately(Queue) ->
710+
_ = rabbit_amqqueue:internal_delete(Queue, ?INTERNAL_USER),
711+
{ok, _} = ra:delete_cluster([amqqueue:get_pid(Queue)]),
712+
rabbit_core_metrics:queue_deleted(amqqueue:get_name(Queue)),
713713
ok.
714714

715715
settle(_QName, complete, CTag, MsgIds, QState) ->

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,7 @@ delete_stream(Q, ActingUser)
177177
#{name := StreamId} = amqqueue:get_type_state(Q),
178178
case process_command({delete_stream, StreamId, #{}}) of
179179
{ok, ok, _} ->
180-
QName = amqqueue:get_name(Q),
181-
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
180+
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
182181
{ok, {ok, 0}};
183182
Err ->
184183
Err

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ create_stream(Q0) ->
160160
ActingUser}]),
161161
{new, Q};
162162
Error ->
163-
164-
_ = rabbit_amqqueue:internal_delete(QName, ActingUser),
163+
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
165164
{protocol_error, internal_error, "Cannot declare a queue '~ts' on node '~ts': ~255p",
166165
[rabbit_misc:rs(QName), node(), Error]}
167166
end;

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,8 +492,9 @@ process_recovery_terms(Terms) ->
492492

493493
queue_version(Q) ->
494494
Resolve = fun(_, ArgVal) -> ArgVal end,
495+
%% If queue-version is undefined, we assume v2 starting with RabbitMQ 3.12.0.
495496
case rabbit_queue_type_util:args_policy_lookup(<<"queue-version">>, Resolve, Q) of
496-
undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 1);
497+
undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 2);
497498
Vsn when is_integer(Vsn) -> Vsn;
498499
Vsn -> binary_to_integer(Vsn)
499500
end.

deps/rabbit/src/rabbit_vhost.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -413,8 +413,7 @@ assert_benign({error, not_found}, _) -> ok;
413413
assert_benign({error, {absent, Q, _}}, ActingUser) ->
414414
%% Removing the database entries here is safe. If/when the down node
415415
%% restarts, it will clear out the on-disk storage of the queue.
416-
QName = amqqueue:get_name(Q),
417-
rabbit_amqqueue:internal_delete(QName, ActingUser).
416+
rabbit_amqqueue:internal_delete(Q, ActingUser).
418417

419418
-spec exists(vhost:name()) -> boolean().
420419

deps/rabbit/test/backing_queue_SUITE.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -911,7 +911,7 @@ bq_queue_recover1(Config) ->
911911
rabbit_variable_queue:fetch(true, VQ1),
912912
CountMinusOne = rabbit_variable_queue:len(VQ2),
913913
_VQ3 = rabbit_variable_queue:delete_and_terminate(shutdown, VQ2),
914-
ok = rabbit_amqqueue:internal_delete(QName, <<"acting-user">>)
914+
ok = rabbit_amqqueue:internal_delete(Q1, <<"acting-user">>)
915915
end),
916916
passed.
917917

0 commit comments

Comments
 (0)