Skip to content

Commit 8c153b7

Browse files
committed
implements maybe_flush_pre_publish_cache
1 parent 623f1de commit 8c153b7

File tree

2 files changed

+24
-11
lines changed

2 files changed

+24
-11
lines changed

src/rabbit_queue_index.erl

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
-export([erase/1, init/3, recover/6,
2020
terminate/2, delete_and_terminate/1,
21-
pre_publish/6, flush_pre_publish_cache/2,
21+
pre_publish/7, flush_pre_publish_cache/2,
2222
publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
2323
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
2424

@@ -295,11 +295,11 @@ delete_and_terminate(State) ->
295295
ok = rabbit_file:recursive_delete([Dir]),
296296
State1.
297297

298-
pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
299-
State = #qistate{unconfirmed = UC,
300-
unconfirmed_msg = UCM,
301-
pre_publish_cache = PPC,
302-
delivered_cache = DC}) ->
298+
pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint,
299+
State = #qistate{unconfirmed = UC,
300+
unconfirmed_msg = UCM,
301+
pre_publish_cache = PPC,
302+
delivered_cache = DC}) ->
303303
MsgId = case MsgOrId of
304304
#basic_message{id = Id} -> Id;
305305
Id when is_binary(Id) -> Id
@@ -334,7 +334,19 @@ pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
334334
end,
335335

336336
add_to_journal(SeqId, {IsPersistent, Bin, MsgBin},
337-
State1#qistate{pre_publish_cache = PPC1, delivered_cache = DC1}).
337+
maybe_flush_pre_publish_cache(
338+
JournalSizeHint,
339+
State1#qistate{pre_publish_cache = PPC1,
340+
delivered_cache = DC1})).
341+
342+
%% pre_publish_cache is the entry with most elements when comapred to
343+
%% delivered_cache so we only check the former in the guard.
344+
maybe_flush_pre_publish_cache(JournalSizeHint,
345+
#qistate{pre_publish_cache = PPC} = State)
346+
when length(PPC) >= ?SEGMENT_ENTRY_COUNT ->
347+
flush_pre_publish_cache(JournalSizeHint, State);
348+
maybe_flush_pre_publish_cache(_JournalSizeHint, State) ->
349+
State.
338350

339351
flush_pre_publish_cache(JournalSizeHint, State) ->
340352
State1 = flush_pre_publish_cache(State),

src/rabbit_variable_queue.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1372,7 +1372,7 @@ maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
13721372
{MsgStatus, State}.
13731373

13741374
%% Due to certain optimizations made inside
1375-
%% rabbit_queue_index:pre_publish/6 we need to have two separate
1375+
%% rabbit_queue_index:pre_publish/7 we need to have two separate
13761376
%% functions for index persistence. This one is only used when paging
13771377
%% during memory pressure. We didn't want to modify
13781378
%% maybe_write_index_to_disk/3 because that function is used in other
@@ -1390,8 +1390,9 @@ maybe_batch_write_index_to_disk(Force,
13901390
is_delivered = IsDelivered,
13911391
msg_props = MsgProps},
13921392
State = #vqstate {
1393-
disk_write_count = DiskWriteCount,
1394-
index_state = IndexState })
1393+
target_ram_count = TargetRamCount,
1394+
disk_write_count = DiskWriteCount,
1395+
index_state = IndexState})
13951396
when Force orelse IsPersistent ->
13961397
{MsgOrId, DiskWriteCount1} =
13971398
case persist_to(MsgStatus) of
@@ -1400,7 +1401,7 @@ maybe_batch_write_index_to_disk(Force,
14001401
end,
14011402
IndexState1 = rabbit_queue_index:pre_publish(
14021403
MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
1403-
IndexState),
1404+
TargetRamCount, IndexState),
14041405
{MsgStatus#msg_status{index_on_disk = true},
14051406
State#vqstate{index_state = IndexState1,
14061407
disk_write_count = DiskWriteCount1}};

0 commit comments

Comments
 (0)