|
179 | 179 | max_journal_entries, on_sync, on_sync_msg,
|
180 | 180 | unconfirmed, unconfirmed_msg}).
|
181 | 181 |
|
182 |
| --record(segment, {num, path, journal_entries, unacked}). |
| 182 | +-record(segment, {num, path, journal_entries, |
| 183 | + entries_to_segment, unacked}). |
183 | 184 |
|
184 | 185 | -include("rabbit.hrl").
|
185 | 186 |
|
|
194 | 195 |
|
195 | 196 | -type(hdl() :: ('undefined' | any())).
|
196 | 197 | -type(segment() :: ('undefined' |
|
197 |
| - #segment { num :: non_neg_integer(), |
198 |
| - path :: file:filename(), |
199 |
| - journal_entries :: array:array(), |
200 |
| - unacked :: non_neg_integer() |
| 198 | + #segment { num :: non_neg_integer(), |
| 199 | + path :: file:filename(), |
| 200 | + journal_entries :: array:array(), |
| 201 | + entries_to_segment :: array:array(), |
| 202 | + unacked :: non_neg_integer() |
201 | 203 | })).
|
202 | 204 | -type(seq_id() :: integer()).
|
203 | 205 | -type(seg_dict() :: {dict:dict(), [segment()]}).
|
@@ -650,30 +652,46 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
|
650 | 652 |
|
651 | 653 | add_to_journal(RelSeq, Action,
|
652 | 654 | Segment = #segment { journal_entries = JEntries,
|
| 655 | + entries_to_segment = EToSeg, |
653 | 656 | unacked = UnackedCount }) ->
|
| 657 | + |
| 658 | + {Fun, Entry} = action_to_entry(RelSeq, Action, JEntries), |
| 659 | + |
| 660 | + {JEntries1, EToSeg1} = |
| 661 | + case Fun of |
| 662 | + set -> |
| 663 | + {array:set(RelSeq, Entry, JEntries), |
| 664 | + array:set(RelSeq, entry_to_segment(RelSeq, Entry, []), |
| 665 | + EToSeg)}; |
| 666 | + reset -> |
| 667 | + {array:reset(RelSeq, JEntries), |
| 668 | + array:reset(RelSeq, EToSeg)} |
| 669 | + end, |
| 670 | + |
654 | 671 | Segment #segment {
|
655 |
| - journal_entries = add_to_journal(RelSeq, Action, JEntries), |
| 672 | + journal_entries = JEntries1, |
| 673 | + entries_to_segment = EToSeg1, |
656 | 674 | unacked = UnackedCount + case Action of
|
657 | 675 | ?PUB -> +1;
|
658 | 676 | del -> 0;
|
659 | 677 | ack -> -1
|
660 |
| - end}; |
| 678 | + end}. |
661 | 679 |
|
662 |
| -add_to_journal(RelSeq, Action, JEntries) -> |
| 680 | +action_to_entry(RelSeq, Action, JEntries) -> |
663 | 681 | case array:get(RelSeq, JEntries) of
|
664 | 682 | undefined ->
|
665 |
| - array:set(RelSeq, |
666 |
| - case Action of |
667 |
| - ?PUB -> {Action, no_del, no_ack}; |
668 |
| - del -> {no_pub, del, no_ack}; |
669 |
| - ack -> {no_pub, no_del, ack} |
670 |
| - end, JEntries); |
| 683 | + {set, |
| 684 | + case Action of |
| 685 | + ?PUB -> {Action, no_del, no_ack}; |
| 686 | + del -> {no_pub, del, no_ack}; |
| 687 | + ack -> {no_pub, no_del, ack} |
| 688 | + end}; |
671 | 689 | ({Pub, no_del, no_ack}) when Action == del ->
|
672 |
| - array:set(RelSeq, {Pub, del, no_ack}, JEntries); |
| 690 | + {set, {Pub, del, no_ack}}; |
673 | 691 | ({no_pub, del, no_ack}) when Action == ack ->
|
674 |
| - array:set(RelSeq, {no_pub, del, ack}, JEntries); |
| 692 | + {set, {no_pub, del, ack}}; |
675 | 693 | ({?PUB, del, no_ack}) when Action == ack ->
|
676 |
| - array:reset(RelSeq, JEntries) |
| 694 | + {reset, none} |
677 | 695 | end.
|
678 | 696 |
|
679 | 697 | maybe_flush_journal(State) ->
|
@@ -704,18 +722,23 @@ flush_journal(State = #qistate { segments = Segments }) ->
|
704 | 722 | notify_sync(State1 #qistate { dirty_count = 0 }).
|
705 | 723 |
|
706 | 724 | append_journal_to_segment(#segment { journal_entries = JEntries,
|
| 725 | + entries_to_segment = EToSeg, |
707 | 726 | path = Path } = Segment) ->
|
708 | 727 | case array:sparse_size(JEntries) of
|
709 | 728 | 0 -> Segment;
|
710 |
| - _ -> Seg = array:sparse_foldr( |
711 |
| - fun entry_to_segment/3, [], JEntries), |
712 |
| - file_handle_cache_stats:update(queue_index_write), |
713 |
| - |
714 |
| - {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, |
715 |
| - [{write_buffer, infinity}]), |
716 |
| - file_handle_cache:append(Hdl, Seg), |
717 |
| - ok = file_handle_cache:close(Hdl), |
718 |
| - Segment #segment { journal_entries = array_new() } |
| 729 | + _ -> |
| 730 | + file_handle_cache_stats:update(queue_index_write), |
| 731 | + |
| 732 | + {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, |
| 733 | + [{write_buffer, infinity}]), |
| 734 | + %% the file_handle_cache also does a list reverse, so this |
| 735 | + %% might not be required here, but before we were doing a |
| 736 | + %% sparse_foldr, a lists:reverse/1 seems to be the correct |
| 737 | + %% thing to do for now. |
| 738 | + file_handle_cache:append(Hdl, lists:reverse(array:to_list(EToSeg))), |
| 739 | + ok = file_handle_cache:close(Hdl), |
| 740 | + Segment #segment { journal_entries = array_new(), |
| 741 | + entries_to_segment = array_new([]) } |
719 | 742 | end.
|
720 | 743 |
|
721 | 744 | get_journal_handle(State = #qistate { journal_handle = undefined,
|
@@ -748,14 +771,16 @@ recover_journal(State) ->
|
748 | 771 | Segments1 =
|
749 | 772 | segment_map(
|
750 | 773 | fun (Segment = #segment { journal_entries = JEntries,
|
| 774 | + entries_to_segment = EToSeg, |
751 | 775 | unacked = UnackedCountInJournal }) ->
|
752 | 776 | %% We want to keep ack'd entries in so that we can
|
753 | 777 | %% remove them if duplicates are in the journal. The
|
754 | 778 | %% counts here are purely from the segment itself.
|
755 | 779 | {SegEntries, UnackedCountInSeg} = load_segment(true, Segment),
|
756 |
| - {JEntries1, UnackedCountDuplicates} = |
757 |
| - journal_minus_segment(JEntries, SegEntries), |
| 780 | + {JEntries1, EToSeg1, UnackedCountDuplicates} = |
| 781 | + journal_minus_segment(JEntries, EToSeg, SegEntries), |
758 | 782 | Segment #segment { journal_entries = JEntries1,
|
| 783 | + entries_to_segment = EToSeg1, |
759 | 784 | unacked = (UnackedCountInJournal +
|
760 | 785 | UnackedCountInSeg -
|
761 | 786 | UnackedCountDuplicates) }
|
@@ -842,10 +867,11 @@ segment_find_or_new(Seg, Dir, Segments) ->
|
842 | 867 | {ok, Segment} -> Segment;
|
843 | 868 | error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION,
|
844 | 869 | Path = filename:join(Dir, SegName),
|
845 |
| - #segment { num = Seg, |
846 |
| - path = Path, |
847 |
| - journal_entries = array_new(), |
848 |
| - unacked = 0 } |
| 870 | + #segment { num = Seg, |
| 871 | + path = Path, |
| 872 | + journal_entries = array_new(), |
| 873 | + entries_to_segment = array_new([]), |
| 874 | + unacked = 0 } |
849 | 875 | end.
|
850 | 876 |
|
851 | 877 | segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) ->
|
@@ -885,20 +911,20 @@ segment_nums({Segments, CachedSegments}) ->
|
885 | 911 | segments_new() ->
|
886 | 912 | {dict:new(), []}.
|
887 | 913 |
|
888 |
| -entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) -> |
889 |
| - Buf; |
890 |
| -entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) -> |
| 914 | +entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) -> |
| 915 | + Initial; |
| 916 | +entry_to_segment(RelSeq, {Pub, Del, Ack}, Initial) -> |
891 | 917 | %% NB: we are assembling the segment in reverse order here, so
|
892 | 918 | %% del/ack comes first.
|
893 | 919 | Buf1 = case {Del, Ack} of
|
894 | 920 | {no_del, no_ack} ->
|
895 |
| - Buf; |
| 921 | + Initial; |
896 | 922 | _ ->
|
897 | 923 | Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
|
898 | 924 | RelSeq:?REL_SEQ_BITS>>,
|
899 | 925 | case {Del, Ack} of
|
900 |
| - {del, ack} -> [[Binary, Binary] | Buf]; |
901 |
| - _ -> [Binary | Buf] |
| 926 | + {del, ack} -> [[Binary, Binary] | Initial]; |
| 927 | + _ -> [Binary | Initial] |
902 | 928 | end
|
903 | 929 | end,
|
904 | 930 | case Pub of
|
@@ -987,7 +1013,10 @@ add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) ->
|
987 | 1013 | end.
|
988 | 1014 |
|
989 | 1015 | array_new() ->
|
990 |
| - array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). |
| 1016 | + array_new(undefined). |
| 1017 | + |
| 1018 | +array_new(Default) -> |
| 1019 | + array:new([{default, Default}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). |
991 | 1020 |
|
992 | 1021 | bool_to_int(true ) -> 1;
|
993 | 1022 | bool_to_int(false) -> 0.
|
@@ -1033,19 +1062,29 @@ segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) ->
|
1033 | 1062 | %% Remove from the journal entries for a segment, items that are
|
1034 | 1063 | %% duplicates of entries found in the segment itself. Used on start up
|
1035 | 1064 | %% to clean up the journal.
|
1036 |
| -journal_minus_segment(JEntries, SegEntries) -> |
| 1065 | +%% |
| 1066 | +%% We need to update the entries_to_segment since they are just a |
| 1067 | +%% cache of what's on the journal. |
| 1068 | +journal_minus_segment(JEntries, EToSeg, SegEntries) -> |
1037 | 1069 | array:sparse_foldl(
|
1038 |
| - fun (RelSeq, JObj, {JEntriesOut, UnackedRemoved}) -> |
| 1070 | + fun (RelSeq, JObj, {JEntriesOut, EToSegOut, UnackedRemoved}) -> |
1039 | 1071 | SegEntry = array:get(RelSeq, SegEntries),
|
1040 | 1072 | {Obj, UnackedRemovedDelta} =
|
1041 | 1073 | journal_minus_segment1(JObj, SegEntry),
|
1042 |
| - {case Obj of |
1043 |
| - keep -> JEntriesOut; |
1044 |
| - undefined -> array:reset(RelSeq, JEntriesOut); |
1045 |
| - _ -> array:set(RelSeq, Obj, JEntriesOut) |
1046 |
| - end, |
1047 |
| - UnackedRemoved + UnackedRemovedDelta} |
1048 |
| - end, {JEntries, 0}, JEntries). |
| 1074 | + {JEntriesOut1, EToSegOut1} = |
| 1075 | + case Obj of |
| 1076 | + keep -> |
| 1077 | + {JEntriesOut, EToSegOut}; |
| 1078 | + undefined -> |
| 1079 | + {array:reset(RelSeq, JEntriesOut), |
| 1080 | + array:reset(RelSeq, EToSegOut)}; |
| 1081 | + _ -> |
| 1082 | + {array:set(RelSeq, Obj, JEntriesOut), |
| 1083 | + array:set(RelSeq, entry_to_segment(RelSeq, Obj, []), |
| 1084 | + EToSegOut)} |
| 1085 | + end, |
| 1086 | + {JEntriesOut1, EToSegOut1, UnackedRemoved + UnackedRemovedDelta} |
| 1087 | + end, {JEntries, EToSeg, 0}, JEntries). |
1049 | 1088 |
|
1050 | 1089 | %% Here, the result is a tuple with the first element containing the
|
1051 | 1090 | %% item we are adding to or modifying in the (initially fresh) journal
|
|
0 commit comments