@@ -715,9 +715,10 @@ flush_buffer(State0 = #qi { write_buffer = WriteBuffer0,
715
715
AcksAcc }
716
716
end , {#{}, #{}, #{}}, WriteBuffer0 ),
717
717
% % Then we do the writes for each segment.
718
- State = maps :fold (fun (Segment , LocBytes , FoldState0 ) ->
718
+ State = maps :fold (fun (Segment , LocBytes0 , FoldState0 ) ->
719
719
FoldState1 = reduce_fd_usage (Segment , FoldState0 ),
720
720
{Fd , FoldState } = get_fd_for_segment (Segment , FoldState1 ),
721
+ LocBytes = flush_buffer_consolidate (lists :sort (LocBytes0 ), 1 ),
721
722
ok = file :pwrite (Fd , LocBytes ),
722
723
file_handle_cache_stats :update (queue_index_write ),
723
724
FoldState
@@ -744,7 +745,7 @@ write_ack(SeqId, SegmentEntryCount, WritesAcc) ->
744
745
Segment = SeqId div SegmentEntryCount ,
745
746
Offset = ? HEADER_SIZE + (SeqId rem SegmentEntryCount ) * ? ENTRY_SIZE ,
746
747
LocBytesAcc = maps :get (Segment , WritesAcc , []),
747
- WritesAcc #{Segment => [{Offset , <<0 >>}|LocBytesAcc ]}.
748
+ WritesAcc #{Segment => [{Offset , <<0 : ? ENTRY_SIZE / unit : 8 >>}|LocBytesAcc ]}.
748
749
749
750
write_entry (SeqId , SegmentEntryCount , Entry , WritesAcc ) ->
750
751
Segment = SeqId div SegmentEntryCount ,
@@ -793,6 +794,14 @@ get_fd_for_segment(Segment, State = #qi{ fds = OpenFds }) ->
793
794
{Fd , State # qi { fds = OpenFds #{ Segment => Fd }}}
794
795
end .
795
796
797
+ flush_buffer_consolidate ([{Offset , Data }, {NextOffset , NextData }|Tail ], Num )
798
+ when Offset + ? ENTRY_SIZE * Num =:= NextOffset ->
799
+ flush_buffer_consolidate ([{Offset , [Data , NextData ]}|Tail ], Num + 1 );
800
+ flush_buffer_consolidate ([Entry , NextEntry |Tail ], _ ) ->
801
+ [Entry |flush_buffer_consolidate ([NextEntry |Tail ], 1 )];
802
+ flush_buffer_consolidate (Tail , _ ) ->
803
+ Tail .
804
+
796
805
% % When marking acks we need to update the file(s) on disk.
797
806
% % When a file has been fully acked we may also close its
798
807
% % open FD if any and delete it.
0 commit comments