Skip to content

Commit 7ba2571

Browse files
Merge branch 'master' into rabbitmq-stomp-24
2 parents 6a084b6 + 4527bea commit 7ba2571

File tree

2 files changed

+143
-16
lines changed

2 files changed

+143
-16
lines changed

src/rabbit_queue_index.erl

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
-export([erase/1, init/3, recover/6,
2020
terminate/2, delete_and_terminate/1,
21+
pre_publish/7, flush_pre_publish_cache/2,
2122
publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
2223
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
2324

@@ -177,7 +178,8 @@
177178

178179
-record(qistate, {dir, segments, journal_handle, dirty_count,
179180
max_journal_entries, on_sync, on_sync_msg,
180-
unconfirmed, unconfirmed_msg}).
181+
unconfirmed, unconfirmed_msg,
182+
pre_publish_cache, delivered_cache}).
181183

182184
-record(segment, {num, path, journal_entries,
183185
entries_to_segment, unacked}).
@@ -212,7 +214,9 @@
212214
on_sync :: on_sync_fun(),
213215
on_sync_msg :: on_sync_fun(),
214216
unconfirmed :: gb_sets:set(),
215-
unconfirmed_msg :: gb_sets:set()
217+
unconfirmed_msg :: gb_sets:set(),
218+
pre_publish_cache :: list(),
219+
delivered_cache :: list()
216220
}).
217221
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
218222
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -291,6 +295,78 @@ delete_and_terminate(State) ->
291295
ok = rabbit_file:recursive_delete([Dir]),
292296
State1.
293297

298+
pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint,
299+
State = #qistate{unconfirmed = UC,
300+
unconfirmed_msg = UCM,
301+
pre_publish_cache = PPC,
302+
delivered_cache = DC}) ->
303+
MsgId = case MsgOrId of
304+
#basic_message{id = Id} -> Id;
305+
Id when is_binary(Id) -> Id
306+
end,
307+
?MSG_ID_BYTES = size(MsgId),
308+
309+
State1 =
310+
case {MsgProps#message_properties.needs_confirming, MsgOrId} of
311+
{true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
312+
State#qistate{unconfirmed = UC1};
313+
{true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
314+
State#qistate{unconfirmed_msg = UCM1};
315+
{false, _} -> State
316+
end,
317+
318+
{Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
319+
320+
PPC1 =
321+
[[<<(case IsPersistent of
322+
true -> ?PUB_PERSIST_JPREFIX;
323+
false -> ?PUB_TRANS_JPREFIX
324+
end):?JPREFIX_BITS,
325+
SeqId:?SEQ_BITS, Bin/binary,
326+
(size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin], PPC],
327+
328+
DC1 =
329+
case IsDelivered of
330+
true ->
331+
[SeqId | DC];
332+
false ->
333+
DC
334+
end,
335+
336+
add_to_journal(SeqId, {IsPersistent, Bin, MsgBin},
337+
maybe_flush_pre_publish_cache(
338+
JournalSizeHint,
339+
State1#qistate{pre_publish_cache = PPC1,
340+
delivered_cache = DC1})).
341+
342+
%% pre_publish_cache is the entry with most elements when comapred to
343+
%% delivered_cache so we only check the former in the guard.
344+
maybe_flush_pre_publish_cache(JournalSizeHint,
345+
#qistate{pre_publish_cache = PPC} = State)
346+
when length(PPC) >= ?SEGMENT_ENTRY_COUNT ->
347+
flush_pre_publish_cache(JournalSizeHint, State);
348+
maybe_flush_pre_publish_cache(_JournalSizeHint, State) ->
349+
State.
350+
351+
flush_pre_publish_cache(JournalSizeHint, State) ->
352+
State1 = flush_pre_publish_cache(State),
353+
State2 = flush_delivered_cache(State1),
354+
maybe_flush_journal(JournalSizeHint, State2).
355+
356+
flush_pre_publish_cache(#qistate{pre_publish_cache = []} = State) ->
357+
State;
358+
flush_pre_publish_cache(State = #qistate{pre_publish_cache = PPC}) ->
359+
{JournalHdl, State1} = get_journal_handle(State),
360+
file_handle_cache_stats:update(queue_index_journal_write),
361+
ok = file_handle_cache:append(JournalHdl, lists:reverse(PPC)),
362+
State1#qistate{pre_publish_cache = []}.
363+
364+
flush_delivered_cache(#qistate{delivered_cache = []} = State) ->
365+
State;
366+
flush_delivered_cache(State = #qistate{delivered_cache = DC}) ->
367+
State1 = deliver(lists:reverse(DC), State),
368+
State1#qistate{delivered_cache = []}.
369+
294370
publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint,
295371
State = #qistate{unconfirmed = UC,
296372
unconfirmed_msg = UCM}) ->
@@ -446,7 +522,9 @@ blank_state_dir(Dir) ->
446522
on_sync = fun (_) -> ok end,
447523
on_sync_msg = fun (_) -> ok end,
448524
unconfirmed = gb_sets:new(),
449-
unconfirmed_msg = gb_sets:new() }.
525+
unconfirmed_msg = gb_sets:new(),
526+
pre_publish_cache = [],
527+
delivered_cache = [] }.
450528

451529
init_clean(RecoveredCounts, State) ->
452530
%% Load the journal. Since this is a clean recovery this (almost)

src/rabbit_variable_queue.erl

Lines changed: 62 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1440,6 +1440,43 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
14401440
maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
14411441
{MsgStatus, State}.
14421442

1443+
%% Due to certain optimizations made inside
1444+
%% rabbit_queue_index:pre_publish/7 we need to have two separate
1445+
%% functions for index persistence. This one is only used when paging
1446+
%% during memory pressure. We didn't want to modify
1447+
%% maybe_write_index_to_disk/3 because that function is used in other
1448+
%% places.
1449+
maybe_batch_write_index_to_disk(_Force,
1450+
MsgStatus = #msg_status {
1451+
index_on_disk = true }, State) ->
1452+
{MsgStatus, State};
1453+
maybe_batch_write_index_to_disk(Force,
1454+
MsgStatus = #msg_status {
1455+
msg = Msg,
1456+
msg_id = MsgId,
1457+
seq_id = SeqId,
1458+
is_persistent = IsPersistent,
1459+
is_delivered = IsDelivered,
1460+
msg_props = MsgProps},
1461+
State = #vqstate {
1462+
target_ram_count = TargetRamCount,
1463+
disk_write_count = DiskWriteCount,
1464+
index_state = IndexState})
1465+
when Force orelse IsPersistent ->
1466+
{MsgOrId, DiskWriteCount1} =
1467+
case persist_to(MsgStatus) of
1468+
msg_store -> {MsgId, DiskWriteCount};
1469+
queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
1470+
end,
1471+
IndexState1 = rabbit_queue_index:pre_publish(
1472+
MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
1473+
TargetRamCount, IndexState),
1474+
{MsgStatus#msg_status{index_on_disk = true},
1475+
State#vqstate{index_state = IndexState1,
1476+
disk_write_count = DiskWriteCount1}};
1477+
maybe_batch_write_index_to_disk(_Force, MsgStatus, State) ->
1478+
{MsgStatus, State}.
1479+
14431480
maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
14441481
index_on_disk = true }, State) ->
14451482
{MsgStatus, State};
@@ -1474,6 +1511,10 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
14741511
{MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
14751512
maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).
14761513

1514+
maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
1515+
{MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
1516+
maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1).
1517+
14771518
determine_persist_to(#basic_message{
14781519
content = #content{properties = Props,
14791520
properties_bin = PropsBin}},
@@ -1861,16 +1902,16 @@ reduce_memory_use(State = #vqstate {
18611902
end.
18621903

18631904
limit_ram_acks(0, State) ->
1864-
{0, State};
1905+
{0, ui(State)};
18651906
limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
18661907
disk_pending_ack = DPA }) ->
18671908
case gb_trees:is_empty(RPA) of
18681909
true ->
1869-
{Quota, State};
1910+
{Quota, ui(State)};
18701911
false ->
18711912
{SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
18721913
{MsgStatus1, State1} =
1873-
maybe_write_to_disk(true, false, MsgStatus, State),
1914+
maybe_prepare_write_to_disk(true, false, MsgStatus, State),
18741915
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
18751916
DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
18761917
limit_ram_acks(Quota - 1,
@@ -2008,16 +2049,17 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
20082049
when Quota =:= 0 orelse
20092050
TargetRamCount =:= infinity orelse
20102051
TargetRamCount >= RamMsgCount ->
2011-
{Quota, State};
2052+
{Quota, ui(State)};
20122053
push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
20132054
case credit_flow:blocked() of
2014-
true -> {Quota, State};
2055+
true -> {Quota, ui(State)};
20152056
false -> case Generator(Q) of
20162057
{empty, _Q} ->
2017-
{Quota, State};
2058+
{Quota, ui(State)};
20182059
{{value, MsgStatus}, Qa} ->
20192060
{MsgStatus1, State1} =
2020-
maybe_write_to_disk(true, false, MsgStatus, State),
2061+
maybe_prepare_write_to_disk(true, false, MsgStatus,
2062+
State),
20212063
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
20222064
State2 = stats(
20232065
ready0, {MsgStatus, MsgStatus2}, State1),
@@ -2058,24 +2100,31 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
20582100
end
20592101
end.
20602102

2061-
push_betas_to_deltas1(_Generator, _Limit, Q, {0, _Delta, _State} = PushState) ->
2062-
{Q, PushState};
2063-
push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State} = PushState) ->
2103+
push_betas_to_deltas1(_Generator, _Limit, Q, {0, Delta, State}) ->
2104+
{Q, {0, Delta, ui(State)}};
2105+
push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) ->
20642106
case Generator(Q) of
20652107
{empty, _Q} ->
2066-
{Q, PushState};
2108+
{Q, {Quota, Delta, ui(State)}};
20672109
{{value, #msg_status { seq_id = SeqId }}, _Qa}
20682110
when SeqId < Limit ->
2069-
{Q, PushState};
2111+
{Q, {Quota, Delta, ui(State)}};
20702112
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
20712113
{#msg_status { index_on_disk = true }, State1} =
2072-
maybe_write_index_to_disk(true, MsgStatus, State),
2114+
maybe_batch_write_index_to_disk(true, MsgStatus, State),
20732115
State2 = stats(ready0, {MsgStatus, none}, State1),
20742116
Delta1 = expand_delta(SeqId, Delta),
20752117
push_betas_to_deltas1(Generator, Limit, Qa,
20762118
{Quota - 1, Delta1, State2})
20772119
end.
20782120

2121+
%% Flushes queue index batch caches and updates queue index state.
2122+
ui(#vqstate{index_state = IndexState,
2123+
target_ram_count = TargetRamCount} = State) ->
2124+
IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
2125+
TargetRamCount, IndexState),
2126+
State#vqstate{index_state = IndexState1}.
2127+
20792128
%%----------------------------------------------------------------------------
20802129
%% Upgrading
20812130
%%----------------------------------------------------------------------------

0 commit comments

Comments
 (0)