Skip to content

Commit 623f1de

Browse files
committed
Merge branch 'stable' into rabbitmq-server-289
2 parents 7507179 + 1085e24 commit 623f1de

File tree

8 files changed

+263
-66
lines changed

8 files changed

+263
-66
lines changed

ebin/rabbit_app.in

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,8 @@
8383
gen_fsm, ssl]},
8484
{ssl_apps, [asn1, crypto, public_key, ssl]},
8585
%% see rabbitmq-server#114
86-
{mirroring_flow_control, true}
86+
{mirroring_flow_control, true},
87+
%% see rabbitmq-server#227 and related tickets
88+
{msg_store_credit_disc_bound, {2000, 500}},
89+
{msg_store_io_batch_size, 2048}
8790
]}]}.

include/rabbit.hrl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,10 @@
122122
-define(HIBERNATE_AFTER_MIN, 1000).
123123
-define(DESIRED_HIBERNATE, 10000).
124124
-define(CREDIT_DISC_BOUND, {2000, 500}).
125+
%% When we discover that we should write some indices to disk for some
126+
%% betas, the IO_BATCH_SIZE sets the number of betas that we must be
127+
%% due to write indices for before we do any work at all.
128+
-define(IO_BATCH_SIZE, 2048). %% next power-of-2 after ?CREDIT_DISC_BOUND
125129

126130
-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>).
127131
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).

src/rabbit.erl

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
%% Boot steps.
3131
-export([maybe_insert_default_data/0, boot_delegate/0, recover/0]).
3232

33+
%% for tests
34+
-export([validate_msg_store_io_batch_size_and_credit_disc_bound/2]).
35+
3336
-rabbit_boot_step({pre_boot, [{description, "rabbit boot start"}]}).
3437

3538
-rabbit_boot_step({codec_correctness_check,
@@ -520,6 +523,7 @@ start(normal, []) ->
520523
print_banner(),
521524
log_banner(),
522525
warn_if_kernel_config_dubious(),
526+
warn_if_disc_io_options_dubious(),
523527
run_boot_steps(),
524528
{ok, SupPid};
525529
Error ->
@@ -848,6 +852,86 @@ warn_if_kernel_config_dubious() ->
848852
true -> ok
849853
end.
850854

855+
warn_if_disc_io_options_dubious() ->
856+
%% if these values are not set, it doesn't matter since
857+
%% rabbit_variable_queue will pick up the values defined in the
858+
%% IO_BATCH_SIZE and CREDIT_DISC_BOUND constants.
859+
CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
860+
undefined),
861+
IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
862+
undefined),
863+
case catch validate_msg_store_io_batch_size_and_credit_disc_bound(
864+
CreditDiscBound, IoBatchSize) of
865+
ok -> ok;
866+
{error, {Reason, Vars}} ->
867+
rabbit_log:warning(Reason, Vars)
868+
end.
869+
870+
validate_msg_store_io_batch_size_and_credit_disc_bound(CreditDiscBound,
871+
IoBatchSize) ->
872+
case IoBatchSize of
873+
undefined ->
874+
ok;
875+
IoBatchSize when is_integer(IoBatchSize) ->
876+
if IoBatchSize < ?IO_BATCH_SIZE ->
877+
throw({error,
878+
{"io_batch_size of ~b lower than recommended value ~b, "
879+
"paging performance may worsen~n",
880+
[IoBatchSize, ?IO_BATCH_SIZE]}});
881+
true ->
882+
ok
883+
end;
884+
IoBatchSize ->
885+
throw({error,
886+
{"io_batch_size should be an integer, but ~b given",
887+
[IoBatchSize]}})
888+
end,
889+
890+
%% CreditDiscBound = {InitialCredit, MoreCreditAfter}
891+
{RIC, RMCA} = ?CREDIT_DISC_BOUND,
892+
case CreditDiscBound of
893+
undefined ->
894+
ok;
895+
{IC, MCA} when is_integer(IC), is_integer(MCA) ->
896+
if IC < RIC; MCA < RMCA ->
897+
throw({error,
898+
{"msg_store_credit_disc_bound {~b, ~b} lower than"
899+
"recommended value {~b, ~b},"
900+
" paging performance may worsen~n",
901+
[IC, MCA, RIC, RMCA]}});
902+
true ->
903+
ok
904+
end;
905+
{IC, MCA} ->
906+
throw({error,
907+
{"both msg_store_credit_disc_bound values should be integers, but ~p given",
908+
[{IC, MCA}]}});
909+
CreditDiscBound ->
910+
throw({error,
911+
{"invalid msg_store_credit_disc_bound value given: ~p",
912+
[CreditDiscBound]}})
913+
end,
914+
915+
case {CreditDiscBound, IoBatchSize} of
916+
{undefined, undefined} ->
917+
ok;
918+
{_CDB, undefined} ->
919+
ok;
920+
{undefined, _IBS} ->
921+
ok;
922+
{{InitialCredit, _MCA}, IoBatchSize} ->
923+
if IoBatchSize < InitialCredit ->
924+
throw(
925+
{error,
926+
{"msg_store_io_batch_size ~b should be bigger than the initial "
927+
"credit value from msg_store_credit_disc_bound ~b,"
928+
" paging performance may worsen~n",
929+
[IoBatchSize, InitialCredit]}});
930+
true ->
931+
ok
932+
end
933+
end.
934+
851935
home_dir() ->
852936
case init:get_argument(home) of
853937
{ok, [[Home]]} -> Home;

src/rabbit_msg_store.erl

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@
7777
%% to callbacks
7878
successfully_recovered, %% boolean: did we recover state?
7979
file_size_limit, %% how big are our files allowed to get?
80-
cref_to_msg_ids %% client ref to synced messages mapping
80+
cref_to_msg_ids, %% client ref to synced messages mapping
81+
credit_disc_bound %% See rabbit.hrl CREDIT_DISC_BOUND
8182
}).
8283

8384
-record(client_msstate,
@@ -91,7 +92,8 @@
9192
file_handles_ets,
9293
file_summary_ets,
9394
cur_file_cache_ets,
94-
flying_ets
95+
flying_ets,
96+
credit_disc_bound
9597
}).
9698

9799
-record(file_summary,
@@ -134,7 +136,8 @@
134136
file_handles_ets :: ets:tid(),
135137
file_summary_ets :: ets:tid(),
136138
cur_file_cache_ets :: ets:tid(),
137-
flying_ets :: ets:tid()}).
139+
flying_ets :: ets:tid(),
140+
credit_disc_bound :: {pos_integer(), pos_integer()}}).
138141
-type(msg_ref_delta_gen(A) ::
139142
fun ((A) -> 'finished' |
140143
{rabbit_types:msg_id(), non_neg_integer(), A})).
@@ -442,6 +445,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
442445
gen_server2:call(
443446
Server, {new_client_state, Ref, self(), MsgOnDiskFun, CloseFDsFun},
444447
infinity),
448+
CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
449+
?CREDIT_DISC_BOUND),
445450
#client_msstate { server = Server,
446451
client_ref = Ref,
447452
file_handle_cache = dict:new(),
@@ -452,7 +457,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
452457
file_handles_ets = FileHandlesEts,
453458
file_summary_ets = FileSummaryEts,
454459
cur_file_cache_ets = CurFileCacheEts,
455-
flying_ets = FlyingEts }.
460+
flying_ets = FlyingEts,
461+
credit_disc_bound = CreditDiscBound }.
456462

457463
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
458464
close_all_handles(CState),
@@ -465,8 +471,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
465471

466472
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
467473

468-
write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) ->
469-
credit_flow:send(whereis(Server), ?CREDIT_DISC_BOUND),
474+
write_flow(MsgId, Msg,
475+
CState = #client_msstate {
476+
server = Server,
477+
credit_disc_bound = CreditDiscBound }) ->
478+
credit_flow:send(whereis(Server), CreditDiscBound),
470479
client_write(MsgId, Msg, flow, CState).
471480

472481
write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
@@ -709,6 +718,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
709718
msg_store = self()
710719
}),
711720

721+
CreditDiscBound = rabbit_misc:get_env(rabbit, msg_store_credit_disc_bound,
722+
?CREDIT_DISC_BOUND),
723+
712724
State = #msstate { dir = Dir,
713725
index_module = IndexModule,
714726
index_state = IndexState,
@@ -728,7 +740,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
728740
clients = Clients,
729741
successfully_recovered = CleanShutdown,
730742
file_size_limit = FileSizeLimit,
731-
cref_to_msg_ids = dict:new()
743+
cref_to_msg_ids = dict:new(),
744+
credit_disc_bound = CreditDiscBound
732745
},
733746

734747
%% If we didn't recover the msg location index then we need to
@@ -812,10 +825,11 @@ handle_cast({client_delete, CRef},
812825

813826
handle_cast({write, CRef, MsgId, Flow},
814827
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
815-
clients = Clients }) ->
828+
clients = Clients,
829+
credit_disc_bound = CreditDiscBound }) ->
816830
case Flow of
817831
flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
818-
credit_flow:ack(CPid, ?CREDIT_DISC_BOUND);
832+
credit_flow:ack(CPid, CreditDiscBound);
819833
noflow -> ok
820834
end,
821835
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),

0 commit comments

Comments
 (0)