Skip to content

Commit 32c48cd

Browse files
author
Daniil Fedotov
committed
Merge branch 'stable'
2 parents 21879a9 + 7da27f3 commit 32c48cd

File tree

3 files changed

+23
-9
lines changed

3 files changed

+23
-9
lines changed

src/rabbit.erl

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
status/0, is_running/0, alarms/0,
2424
is_running/1, environment/0, rotate_logs/0, force_event_refresh/1,
2525
start_fhc/0]).
26+
2627
-export([start/2, stop/1, prep_stop/1]).
2728
-export([start_apps/1, start_apps/2, stop_apps/1]).
2829
-export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent
@@ -230,6 +231,7 @@
230231

231232
%%----------------------------------------------------------------------------
232233

234+
-type restart_type() :: 'permanent' | 'transient' | 'temporary'.
233235
%% this really should be an abstract type
234236
-type log_location() :: string().
235237
-type param() :: atom().
@@ -267,7 +269,7 @@
267269
-spec recover() -> 'ok'.
268270
-spec start_apps([app_name()]) -> 'ok'.
269271
-spec start_apps([app_name()],
270-
#{app_name() => permanent|transient|temporary}) -> 'ok'.
272+
#{app_name() => restart_type()}) -> 'ok'.
271273
-spec stop_apps([app_name()]) -> 'ok'.
272274

273275
%%----------------------------------------------------------------------------
@@ -506,7 +508,7 @@ stop_and_halt() ->
506508
start_apps(Apps) ->
507509
start_apps(Apps, #{}).
508510

509-
start_apps(Apps, AppModes) ->
511+
start_apps(Apps, RestartTypes) ->
510512
app_utils:load_applications(Apps),
511513

512514
ConfigEntryDecoder = case application:get_env(rabbit, config_entry_decoder) of
@@ -547,7 +549,7 @@ start_apps(Apps, AppModes) ->
547549
end,
548550
ok = app_utils:start_applications(OrderedApps,
549551
handle_app_error(could_not_start),
550-
AppModes).
552+
RestartTypes).
551553

552554
%% This function retrieves the correct IoDevice for requesting
553555
%% input. The problem with using the default IoDevice is that

src/rabbit_amqqueue_process.erl

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
-define(SYNC_INTERVAL, 200). %% milliseconds
2424
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
25+
-define(CONSUMER_BIAS_RATIO, 2.0). %% i.e. consume 100% faster
2526

2627
-export([info_keys/0]).
2728

@@ -1013,18 +1014,18 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName, ActingUser) ->
10131014

10141015
%%----------------------------------------------------------------------------
10151016

1016-
prioritise_call(Msg, _From, _Len, _State) ->
1017+
prioritise_call(Msg, _From, _Len, State) ->
10171018
case Msg of
10181019
info -> 9;
10191020
{info, _Items} -> 9;
10201021
consumers -> 9;
10211022
stat -> 7;
1022-
{basic_consume, _, _, _, _, _, _, _, _, _, _} -> 1;
1023-
{basic_cancel, _, _, _} -> 1;
1023+
{basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State, 0, 2);
1024+
{basic_cancel, _, _, _} -> consumer_bias(State, 0, 2);
10241025
_ -> 0
10251026
end.
10261027

1027-
prioritise_cast(Msg, _Len, _State) ->
1028+
prioritise_cast(Msg, _Len, State) ->
10281029
case Msg of
10291030
delete_immediately -> 8;
10301031
{delete_exclusive, _Pid} -> 8;
@@ -1033,7 +1034,7 @@ prioritise_cast(Msg, _Len, _State) ->
10331034
{run_backing_queue, _Mod, _Fun} -> 6;
10341035
{ack, _AckTags, _ChPid} -> 4; %% [1]
10351036
{resume, _ChPid} -> 3;
1036-
{notify_sent, _ChPid, _Credit} -> 2;
1037+
{notify_sent, _ChPid, _Credit} -> consumer_bias(State, 0, 2);
10371038
_ -> 0
10381039
end.
10391040

@@ -1049,6 +1050,13 @@ prioritise_cast(Msg, _Len, _State) ->
10491050
%% credit to self is hard to reason about. Consumers can continue while
10501051
%% reduce_memory_use is in progress.
10511052

1053+
consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) ->
1054+
case BQ:msg_rates(BQS) of
1055+
{0.0, _} -> Low;
1056+
{Ingress, Egress} when Egress / Ingress < ?CONSUMER_BIAS_RATIO -> High;
1057+
{_, _} -> Low
1058+
end.
1059+
10521060
prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
10531061
case Msg of
10541062
{'DOWN', _, process, DownPid, _} -> 8;

test/clustering_management_SUITE.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -532,7 +532,9 @@ erlang_config(Config) ->
532532
ok = start_app(Hare),
533533
assert_clustered([Rabbit, Hare]),
534534

535-
%% If we use an invalid node name, the node fails to start.
535+
%% If we use an invalid node type, the node fails to start.
536+
%% The Erlang VM has stopped after previous rabbit app failure
537+
ok = rabbit_ct_broker_helpers:start_node(Config, Hare),
536538
ok = stop_app(Hare),
537539
ok = reset(Hare),
538540
ok = rpc:call(Hare, application, set_env,
@@ -703,6 +705,8 @@ assert_failure(Fun) ->
703705
{error, Reason} -> Reason;
704706
{error_string, Reason} -> Reason;
705707
{badrpc, {'EXIT', Reason}} -> Reason;
708+
%% Failure to start an app result in node shutdown
709+
{badrpc, nodedown} -> nodedown;
706710
{badrpc_multi, Reason, _Nodes} -> Reason;
707711
Other -> exit({expected_failure, Other})
708712
end.

0 commit comments

Comments
 (0)