Skip to content

Commit d45fbc3

Browse files
Loïc Hoguinlhoguin
authored andcommitted
CQ: Write large messages into their own files
1 parent 18acc01 commit d45fbc3

File tree

1 file changed

+125
-41
lines changed

1 file changed

+125
-41
lines changed

deps/rabbit/src/rabbit_msg_store.erl

Lines changed: 125 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,7 @@ handle_cast({write, CRef, MsgRef, MsgId, Flow},
899899
%% or the non-current files. If the message *is* in the
900900
%% current file then the cache entry will be removed by
901901
%% the normal logic for that in write_message/4 and
902-
%% maybe_roll_to_new_file/2.
902+
%% flush_or_roll_to_new_file/2.
903903
case index_lookup(MsgId, State) of
904904
#msg_location { file = File }
905905
when File == State #msstate.current_file ->
@@ -1208,26 +1208,123 @@ gc_candidate(File, State = #msstate{ gc_candidates = Candidates,
12081208
gc_candidate(File, State = #msstate{ gc_candidates = Candidates }) ->
12091209
State#msstate{ gc_candidates = Candidates#{ File => true }}.
12101210

1211-
write_message(MsgId, Msg,
1212-
State0 = #msstate { current_file_handle = CurHdl,
1213-
current_file = CurFile,
1214-
current_file_offset = CurOffset,
1215-
file_summary_ets = FileSummaryEts }) ->
1216-
{MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, Msg),
1217-
State = case MaybeFlush of
1218-
flush -> internal_sync(State0);
1219-
ok -> State0
1220-
end,
1211+
%% This value must be smaller enough than ?SCAN_BLOCK_SIZE
1212+
%% to ensure we only ever need 2 reads when scanning files.
1213+
%% Hence the choice of 4MB here and 4MiB there, the difference
1214+
%% in size being more than enough to ensure that property.
1215+
-define(LARGE_MESSAGE_THRESHOLD, 4000000). %% 4MB.
1216+
1217+
write_message(MsgId, MsgBody, State) ->
1218+
MsgBodyBin = term_to_binary(MsgBody),
1219+
%% Large messages get written to their own files.
1220+
if
1221+
byte_size(MsgBodyBin) >= ?LARGE_MESSAGE_THRESHOLD ->
1222+
write_large_message(MsgId, MsgBodyBin, State);
1223+
true ->
1224+
write_small_message(MsgId, MsgBodyBin, State)
1225+
end.
1226+
1227+
write_small_message(MsgId, MsgBodyBin,
1228+
State = #msstate { current_file_handle = CurHdl,
1229+
current_file = CurFile,
1230+
current_file_offset = CurOffset,
1231+
file_summary_ets = FileSummaryEts }) ->
1232+
{MaybeFlush, TotalSize} = writer_append(CurHdl, MsgId, MsgBodyBin),
12211233
ok = index_insert(
12221234
#msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
12231235
offset = CurOffset, total_size = TotalSize }, State),
12241236
[_,_] = ets:update_counter(FileSummaryEts, CurFile,
12251237
[{#file_summary.valid_total_size, TotalSize},
12261238
{#file_summary.file_size, TotalSize}]),
1227-
maybe_roll_to_new_file(CurOffset + TotalSize,
1239+
flush_or_roll_to_new_file(CurOffset + TotalSize, MaybeFlush,
12281240
State #msstate {
12291241
current_file_offset = CurOffset + TotalSize }).
12301242

1243+
flush_or_roll_to_new_file(
1244+
Offset, _MaybeFlush,
1245+
State = #msstate { dir = Dir,
1246+
current_file_handle = CurHdl,
1247+
current_file = CurFile,
1248+
file_summary_ets = FileSummaryEts,
1249+
cur_file_cache_ets = CurFileCacheEts,
1250+
file_size_limit = FileSizeLimit })
1251+
when Offset >= FileSizeLimit ->
1252+
State1 = internal_sync(State),
1253+
ok = writer_close(CurHdl),
1254+
NextFile = CurFile + 1,
1255+
{ok, NextHdl} = writer_open(Dir, NextFile),
1256+
true = ets:insert_new(FileSummaryEts, #file_summary {
1257+
file = NextFile,
1258+
valid_total_size = 0,
1259+
file_size = 0,
1260+
locked = false }),
1261+
%% Delete messages from the cache that were written to disk.
1262+
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
1263+
State1 #msstate { current_file_handle = NextHdl,
1264+
current_file = NextFile,
1265+
current_file_offset = 0 };
1266+
%% If we need to flush, do so here.
1267+
flush_or_roll_to_new_file(_, flush, State) ->
1268+
internal_sync(State);
1269+
flush_or_roll_to_new_file(_, _, State) ->
1270+
State.
1271+
1272+
write_large_message(MsgId, MsgBodyBin,
1273+
State0 = #msstate { dir = Dir,
1274+
current_file_handle = CurHdl,
1275+
current_file = CurFile,
1276+
current_file_offset = CurOffset,
1277+
file_summary_ets = FileSummaryEts,
1278+
cur_file_cache_ets = CurFileCacheEts }) ->
1279+
{LargeMsgFile, LargeMsgHdl} = case CurOffset of
1280+
%% We haven't written in the file yet. Use it.
1281+
0 ->
1282+
{CurFile, CurHdl};
1283+
%% Flush the current file and close it. Open a new file.
1284+
_ ->
1285+
ok = writer_flush(CurHdl),
1286+
ok = writer_close(CurHdl),
1287+
LargeMsgFile0 = CurFile + 1,
1288+
{ok, LargeMsgHdl0} = writer_open(Dir, LargeMsgFile0),
1289+
{LargeMsgFile0, LargeMsgHdl0}
1290+
end,
1291+
%% Write the message directly and close the file.
1292+
TotalSize = writer_direct_write(LargeMsgHdl, MsgId, MsgBodyBin),
1293+
ok = writer_close(LargeMsgHdl),
1294+
%% Update ets with the new information.
1295+
ok = index_insert(
1296+
#msg_location { msg_id = MsgId, ref_count = 1, file = LargeMsgFile,
1297+
offset = 0, total_size = TotalSize }, State0),
1298+
_ = case CurFile of
1299+
%% We didn't open a new file. We must update the existing value.
1300+
LargeMsgFile ->
1301+
[_,_] = ets:update_counter(FileSummaryEts, LargeMsgFile,
1302+
[{#file_summary.valid_total_size, TotalSize},
1303+
{#file_summary.file_size, TotalSize}]);
1304+
%% We opened a new file. We can insert it all at once.
1305+
_ ->
1306+
true = ets:insert_new(FileSummaryEts, #file_summary {
1307+
file = LargeMsgFile,
1308+
valid_total_size = TotalSize,
1309+
file_size = TotalSize,
1310+
locked = false })
1311+
end,
1312+
%% Roll over to the next file.
1313+
NextFile = LargeMsgFile + 1,
1314+
{ok, NextHdl} = writer_open(Dir, NextFile),
1315+
true = ets:insert_new(FileSummaryEts, #file_summary {
1316+
file = NextFile,
1317+
valid_total_size = 0,
1318+
file_size = 0,
1319+
locked = false }),
1320+
%% Delete messages from the cache that were written to disk.
1321+
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
1322+
%% Process confirms (this won't flush; we already did) and continue.
1323+
State = internal_sync(State0),
1324+
State #msstate { current_file_handle = NextHdl,
1325+
current_file = NextFile,
1326+
current_file_offset = 0 }.
1327+
12311328
contains_message(MsgId, From, State) ->
12321329
MsgLocation = index_lookup_positive_ref_count(MsgId, State),
12331330
gen_server2:reply(From, MsgLocation =/= not_found),
@@ -1325,8 +1422,7 @@ writer_recover(Dir, Num, Offset) ->
13251422
ok = file:truncate(Fd),
13261423
{ok, #writer{fd = Fd, buffer = prim_buffer:new()}}.
13271424

1328-
writer_append(#writer{buffer = Buffer}, MsgId, MsgBody) ->
1329-
MsgBodyBin = term_to_binary(MsgBody),
1425+
writer_append(#writer{buffer = Buffer}, MsgId, MsgBodyBin) ->
13301426
MsgBodyBinSize = byte_size(MsgBodyBin),
13311427
EntrySize = MsgBodyBinSize + 16, %% Size of MsgId + MsgBodyBin.
13321428
%% We send an iovec to the buffer instead of building a binary.
@@ -1354,6 +1450,21 @@ writer_flush(#writer{fd = Fd, buffer = Buffer}) ->
13541450
file:write(Fd, prim_buffer:read_iovec(Buffer, Size))
13551451
end.
13561452

1453+
%% For large messages we don't buffer anything. Large messages
1454+
%% are kept within their own files.
1455+
%%
1456+
%% This is basically the same as writer_append except no buffering.
1457+
writer_direct_write(#writer{fd = Fd}, MsgId, MsgBodyBin) ->
1458+
MsgBodyBinSize = byte_size(MsgBodyBin),
1459+
EntrySize = MsgBodyBinSize + 16, %% Size of MsgId + MsgBodyBin.
1460+
ok = file:write(Fd, [
1461+
<<EntrySize:64>>,
1462+
MsgId,
1463+
MsgBodyBin,
1464+
<<255>> %% OK marker.
1465+
]),
1466+
EntrySize + 9.
1467+
13571468
writer_close(#writer{fd = Fd}) ->
13581469
file:close(Fd).
13591470

@@ -1700,33 +1811,6 @@ rebuild_index(Gatherer, Files, State) ->
17001811
%% garbage collection / compaction / aggregation -- internal
17011812
%%----------------------------------------------------------------------------
17021813

1703-
maybe_roll_to_new_file(
1704-
Offset,
1705-
State = #msstate { dir = Dir,
1706-
current_file_handle = CurHdl,
1707-
current_file = CurFile,
1708-
file_summary_ets = FileSummaryEts,
1709-
cur_file_cache_ets = CurFileCacheEts,
1710-
file_size_limit = FileSizeLimit })
1711-
when Offset >= FileSizeLimit ->
1712-
State1 = internal_sync(State),
1713-
ok = writer_close(CurHdl),
1714-
NextFile = CurFile + 1,
1715-
{ok, NextHdl} = writer_open(Dir, NextFile),
1716-
true = ets:insert_new(FileSummaryEts, #file_summary {
1717-
file = NextFile,
1718-
valid_total_size = 0,
1719-
file_size = 0,
1720-
locked = false }),
1721-
%% We only delete messages from the cache that were written to disk
1722-
%% in the previous file.
1723-
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
1724-
State1 #msstate { current_file_handle = NextHdl,
1725-
current_file = NextFile,
1726-
current_file_offset = 0 };
1727-
maybe_roll_to_new_file(_, State) ->
1728-
State.
1729-
17301814
%% We keep track of files that have seen removes and
17311815
%% check those periodically for compaction. We only
17321816
%% compact files that have less than half valid data.

0 commit comments

Comments
 (0)