Skip to content

Commit 10059bc

Browse files
Merge branch 'rabbitmq-server-289' into stable
2 parents 5b71eb3 + 8c153b7 commit 10059bc

File tree

2 files changed

+143
-16
lines changed

2 files changed

+143
-16
lines changed

src/rabbit_queue_index.erl

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
-export([erase/1, init/3, recover/6,
2020
terminate/2, delete_and_terminate/1,
21+
pre_publish/7, flush_pre_publish_cache/2,
2122
publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
2223
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
2324

@@ -177,7 +178,8 @@
177178

178179
-record(qistate, {dir, segments, journal_handle, dirty_count,
179180
max_journal_entries, on_sync, on_sync_msg,
180-
unconfirmed, unconfirmed_msg}).
181+
unconfirmed, unconfirmed_msg,
182+
pre_publish_cache, delivered_cache}).
181183

182184
-record(segment, {num, path, journal_entries,
183185
entries_to_segment, unacked}).
@@ -212,7 +214,9 @@
212214
on_sync :: on_sync_fun(),
213215
on_sync_msg :: on_sync_fun(),
214216
unconfirmed :: gb_sets:set(),
215-
unconfirmed_msg :: gb_sets:set()
217+
unconfirmed_msg :: gb_sets:set(),
218+
pre_publish_cache :: list(),
219+
delivered_cache :: list()
216220
}).
217221
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
218222
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -291,6 +295,78 @@ delete_and_terminate(State) ->
291295
ok = rabbit_file:recursive_delete([Dir]),
292296
State1.
293297

298+
pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint,
299+
State = #qistate{unconfirmed = UC,
300+
unconfirmed_msg = UCM,
301+
pre_publish_cache = PPC,
302+
delivered_cache = DC}) ->
303+
MsgId = case MsgOrId of
304+
#basic_message{id = Id} -> Id;
305+
Id when is_binary(Id) -> Id
306+
end,
307+
?MSG_ID_BYTES = size(MsgId),
308+
309+
State1 =
310+
case {MsgProps#message_properties.needs_confirming, MsgOrId} of
311+
{true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
312+
State#qistate{unconfirmed = UC1};
313+
{true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
314+
State#qistate{unconfirmed_msg = UCM1};
315+
{false, _} -> State
316+
end,
317+
318+
{Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
319+
320+
PPC1 =
321+
[[<<(case IsPersistent of
322+
true -> ?PUB_PERSIST_JPREFIX;
323+
false -> ?PUB_TRANS_JPREFIX
324+
end):?JPREFIX_BITS,
325+
SeqId:?SEQ_BITS, Bin/binary,
326+
(size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin], PPC],
327+
328+
DC1 =
329+
case IsDelivered of
330+
true ->
331+
[SeqId | DC];
332+
false ->
333+
DC
334+
end,
335+
336+
add_to_journal(SeqId, {IsPersistent, Bin, MsgBin},
337+
maybe_flush_pre_publish_cache(
338+
JournalSizeHint,
339+
State1#qistate{pre_publish_cache = PPC1,
340+
delivered_cache = DC1})).
341+
342+
%% pre_publish_cache is the entry with most elements when comapred to
343+
%% delivered_cache so we only check the former in the guard.
344+
maybe_flush_pre_publish_cache(JournalSizeHint,
345+
#qistate{pre_publish_cache = PPC} = State)
346+
when length(PPC) >= ?SEGMENT_ENTRY_COUNT ->
347+
flush_pre_publish_cache(JournalSizeHint, State);
348+
maybe_flush_pre_publish_cache(_JournalSizeHint, State) ->
349+
State.
350+
351+
flush_pre_publish_cache(JournalSizeHint, State) ->
352+
State1 = flush_pre_publish_cache(State),
353+
State2 = flush_delivered_cache(State1),
354+
maybe_flush_journal(JournalSizeHint, State2).
355+
356+
flush_pre_publish_cache(#qistate{pre_publish_cache = []} = State) ->
357+
State;
358+
flush_pre_publish_cache(State = #qistate{pre_publish_cache = PPC}) ->
359+
{JournalHdl, State1} = get_journal_handle(State),
360+
file_handle_cache_stats:update(queue_index_journal_write),
361+
ok = file_handle_cache:append(JournalHdl, lists:reverse(PPC)),
362+
State1#qistate{pre_publish_cache = []}.
363+
364+
flush_delivered_cache(#qistate{delivered_cache = []} = State) ->
365+
State;
366+
flush_delivered_cache(State = #qistate{delivered_cache = DC}) ->
367+
State1 = deliver(lists:reverse(DC), State),
368+
State1#qistate{delivered_cache = []}.
369+
294370
publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint,
295371
State = #qistate{unconfirmed = UC,
296372
unconfirmed_msg = UCM}) ->
@@ -446,7 +522,9 @@ blank_state_dir(Dir) ->
446522
on_sync = fun (_) -> ok end,
447523
on_sync_msg = fun (_) -> ok end,
448524
unconfirmed = gb_sets:new(),
449-
unconfirmed_msg = gb_sets:new() }.
525+
unconfirmed_msg = gb_sets:new(),
526+
pre_publish_cache = [],
527+
delivered_cache = [] }.
450528

451529
init_clean(RecoveredCounts, State) ->
452530
%% Load the journal. Since this is a clean recovery this (almost)

src/rabbit_variable_queue.erl

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,6 +1379,43 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
13791379
maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
13801380
{MsgStatus, State}.
13811381

1382+
%% Due to certain optimizations made inside
1383+
%% rabbit_queue_index:pre_publish/7 we need to have two separate
1384+
%% functions for index persistence. This one is only used when paging
1385+
%% during memory pressure. We didn't want to modify
1386+
%% maybe_write_index_to_disk/3 because that function is used in other
1387+
%% places.
1388+
maybe_batch_write_index_to_disk(_Force,
1389+
MsgStatus = #msg_status {
1390+
index_on_disk = true }, State) ->
1391+
{MsgStatus, State};
1392+
maybe_batch_write_index_to_disk(Force,
1393+
MsgStatus = #msg_status {
1394+
msg = Msg,
1395+
msg_id = MsgId,
1396+
seq_id = SeqId,
1397+
is_persistent = IsPersistent,
1398+
is_delivered = IsDelivered,
1399+
msg_props = MsgProps},
1400+
State = #vqstate {
1401+
target_ram_count = TargetRamCount,
1402+
disk_write_count = DiskWriteCount,
1403+
index_state = IndexState})
1404+
when Force orelse IsPersistent ->
1405+
{MsgOrId, DiskWriteCount1} =
1406+
case persist_to(MsgStatus) of
1407+
msg_store -> {MsgId, DiskWriteCount};
1408+
queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
1409+
end,
1410+
IndexState1 = rabbit_queue_index:pre_publish(
1411+
MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
1412+
TargetRamCount, IndexState),
1413+
{MsgStatus#msg_status{index_on_disk = true},
1414+
State#vqstate{index_state = IndexState1,
1415+
disk_write_count = DiskWriteCount1}};
1416+
maybe_batch_write_index_to_disk(_Force, MsgStatus, State) ->
1417+
{MsgStatus, State}.
1418+
13821419
maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
13831420
index_on_disk = true }, State) ->
13841421
{MsgStatus, State};
@@ -1413,6 +1450,10 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
14131450
{MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
14141451
maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).
14151452

1453+
maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
1454+
{MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
1455+
maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1).
1456+
14161457
determine_persist_to(#basic_message{
14171458
content = #content{properties = Props,
14181459
properties_bin = PropsBin}},
@@ -1800,16 +1841,16 @@ reduce_memory_use(State = #vqstate {
18001841
end.
18011842

18021843
limit_ram_acks(0, State) ->
1803-
{0, State};
1844+
{0, ui(State)};
18041845
limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
18051846
disk_pending_ack = DPA }) ->
18061847
case gb_trees:is_empty(RPA) of
18071848
true ->
1808-
{Quota, State};
1849+
{Quota, ui(State)};
18091850
false ->
18101851
{SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
18111852
{MsgStatus1, State1} =
1812-
maybe_write_to_disk(true, false, MsgStatus, State),
1853+
maybe_prepare_write_to_disk(true, false, MsgStatus, State),
18131854
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
18141855
DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
18151856
limit_ram_acks(Quota - 1,
@@ -1947,16 +1988,17 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
19471988
when Quota =:= 0 orelse
19481989
TargetRamCount =:= infinity orelse
19491990
TargetRamCount >= RamMsgCount ->
1950-
{Quota, State};
1991+
{Quota, ui(State)};
19511992
push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
19521993
case credit_flow:blocked() of
1953-
true -> {Quota, State};
1994+
true -> {Quota, ui(State)};
19541995
false -> case Generator(Q) of
19551996
{empty, _Q} ->
1956-
{Quota, State};
1997+
{Quota, ui(State)};
19571998
{{value, MsgStatus}, Qa} ->
19581999
{MsgStatus1, State1} =
1959-
maybe_write_to_disk(true, false, MsgStatus, State),
2000+
maybe_prepare_write_to_disk(true, false, MsgStatus,
2001+
State),
19602002
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
19612003
State2 = stats(
19622004
ready0, {MsgStatus, MsgStatus2}, State1),
@@ -1997,24 +2039,31 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
19972039
end
19982040
end.
19992041

2000-
push_betas_to_deltas1(_Generator, _Limit, Q, {0, _Delta, _State} = PushState) ->
2001-
{Q, PushState};
2002-
push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State} = PushState) ->
2042+
push_betas_to_deltas1(_Generator, _Limit, Q, {0, Delta, State}) ->
2043+
{Q, {0, Delta, ui(State)}};
2044+
push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) ->
20032045
case Generator(Q) of
20042046
{empty, _Q} ->
2005-
{Q, PushState};
2047+
{Q, {Quota, Delta, ui(State)}};
20062048
{{value, #msg_status { seq_id = SeqId }}, _Qa}
20072049
when SeqId < Limit ->
2008-
{Q, PushState};
2050+
{Q, {Quota, Delta, ui(State)}};
20092051
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
20102052
{#msg_status { index_on_disk = true }, State1} =
2011-
maybe_write_index_to_disk(true, MsgStatus, State),
2053+
maybe_batch_write_index_to_disk(true, MsgStatus, State),
20122054
State2 = stats(ready0, {MsgStatus, none}, State1),
20132055
Delta1 = expand_delta(SeqId, Delta),
20142056
push_betas_to_deltas1(Generator, Limit, Qa,
20152057
{Quota - 1, Delta1, State2})
20162058
end.
20172059

2060+
%% Flushes queue index batch caches and updates queue index state.
2061+
ui(#vqstate{index_state = IndexState,
2062+
target_ram_count = TargetRamCount} = State) ->
2063+
IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
2064+
TargetRamCount, IndexState),
2065+
State#vqstate{index_state = IndexState1}.
2066+
20182067
%%----------------------------------------------------------------------------
20192068
%% Upgrading
20202069
%%----------------------------------------------------------------------------

0 commit comments

Comments
 (0)