@@ -137,7 +137,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
137
137
% % read is only needed so that we can seek
138
138
{ok , FileHdl } = file :open (Path , [read , write , raw , binary , delayed_write ]),
139
139
{ok , Offset } = file :position (FileHdl , {bof , Offset }),
140
- {ok , State1 # dqstate { current_file_handle = FileHdl }}.
140
+ {ok , State1 # dqstate { current_file_handle = FileHdl }}.
141
141
142
142
handle_call ({deliver , Q }, _From , State ) ->
143
143
{ok , Result , State1 } = internal_deliver (Q , State ),
@@ -156,8 +156,8 @@ handle_call(clean_stop, _From, State) ->
156
156
true = ets :delete (Sequences ),
157
157
lists :foreach (fun file :delete /1 , filelib :wildcard (form_filename (" *" ))),
158
158
{stop , normal , ok ,
159
- State1 # dqstate { current_file_handle = undefined ,
160
- read_file_handles = {dict :new (), gb_trees :empty ()}}}.
159
+ State1 # dqstate { current_file_handle = undefined ,
160
+ read_file_handles = {dict :new (), gb_trees :empty ()}}}.
161
161
% % gen_server now calls terminate, which then calls shutdown
162
162
163
163
handle_cast ({publish , Q , MsgId , MsgBody }, State ) ->
@@ -194,8 +194,8 @@ shutdown(State = #dqstate { msg_location = MsgLocation,
194
194
dict :fold (fun (_File , Hdl , _Acc ) ->
195
195
file :close (Hdl )
196
196
end , ok , ReadHdls ),
197
- State # dqstate { current_file_handle = undefined ,
198
- read_file_handles = {dict :new (), gb_trees :empty ()}}.
197
+ State # dqstate { current_file_handle = undefined ,
198
+ read_file_handles = {dict :new (), gb_trees :empty ()}}.
199
199
200
200
code_change (_OldVsn , State , _Extra ) ->
201
201
{ok , State }.
@@ -232,26 +232,24 @@ internal_deliver(Q, State =
232
232
{ok , Hdl } = file :open (form_filename (File ),
233
233
[read , raw , binary ,
234
234
read_ahead ]),
235
- {ReadHdls2 , ReadHdlsAge2 } =
236
- case dict :size (ReadHdls ) < ReadFileHandlesLimit of
237
- true ->
238
- {ReadHdls , ReadHdlsAge };
239
- _False ->
240
- {_Then , OldFile , ReadHdlsAge3 } =
241
- gb_trees :take_smallest (ReadHdlsAge ),
242
- {ok , {OldHdl , _Then }} =
243
- dict :find (OldFile , ReadHdls ),
244
- ok = file :close (OldHdl ),
245
- {dict :erase (OldFile , ReadHdls ),
246
- ReadHdlsAge3 }
247
- end ,
248
- {Hdl , dict :store (File , {Hdl , Now }, ReadHdls2 ),
249
- gb_trees :enter (Now , File , ReadHdlsAge2 )};
235
+ case dict :size (ReadHdls ) < ReadFileHandlesLimit of
236
+ true ->
237
+ {Hdl , ReadHdls , ReadHdlsAge };
238
+ _False ->
239
+ {Then , OldFile , ReadHdlsAge3 } =
240
+ gb_trees :take_smallest (ReadHdlsAge ),
241
+ {ok , {OldHdl , Then }} =
242
+ dict :find (OldFile , ReadHdls ),
243
+ ok = file :close (OldHdl ),
244
+ {Hdl , dict :erase (OldFile , ReadHdls ),
245
+ ReadHdlsAge3 }
246
+ end ;
250
247
{ok , {Hdl , Then }} ->
251
- {Hdl , dict :store (File , {Hdl , Now }, ReadHdls ),
252
- gb_trees :enter (Now , File ,
253
- gb_trees :delete (Then , ReadHdlsAge ))}
248
+ {Hdl , ReadHdls ,
249
+ gb_trees :delete (Then , ReadHdlsAge )}
254
250
end ,
251
+ ReadHdls2 = dict :store (File , {FileHdl , Now }, ReadHdls1 ),
252
+ ReadHdlsAge2 = gb_trees :enter (Now , File , ReadHdlsAge1 ),
255
253
% % read the message
256
254
{ok , {MsgBody , BodySize }} =
257
255
read_message_at_offset (FileHdl , Offset , TotalSize ),
@@ -261,7 +259,7 @@ internal_deliver(Q, State =
261
259
end ,
262
260
true = ets :insert (Sequences , {Q , ReadSeqId + 1 , WriteSeqId }),
263
261
{ok , {MsgId , MsgBody , BodySize , Delivered , {MsgId , ReadSeqId }},
264
- State # dqstate { read_file_handles = {ReadHdls1 , ReadHdlsAge1 } }}
262
+ State # dqstate { read_file_handles = {ReadHdls2 , ReadHdlsAge2 } }}
265
263
end
266
264
end .
267
265
@@ -272,10 +270,10 @@ internal_ack(Q, MsgIds, State) ->
272
270
% % called from tx_cancel with MnesiaDelete = false
273
271
% % called from ack with MnesiaDelete = true
274
272
remove_messages (Q , MsgSeqIds , MnesiaDelete ,
275
- State = # dqstate { msg_location = MsgLocation ,
276
- file_summary = FileSummary ,
277
- current_file_name = CurName
278
- }) ->
273
+ State = # dqstate { msg_location = MsgLocation ,
274
+ file_summary = FileSummary ,
275
+ current_file_name = CurName
276
+ }) ->
279
277
Files =
280
278
lists :foldl (
281
279
fun ({MsgId , SeqId }, Files2 ) ->
@@ -334,7 +332,7 @@ internal_tx_publish(MsgId, MsgBody,
334
332
ContiguousTop1 , Left , undefined }),
335
333
NextOffset = CurOffset + TotalSize + ? FILE_PACKING_ADJUSTMENT ,
336
334
maybe_roll_to_new_file (NextOffset ,
337
- State # dqstate {current_offset = NextOffset });
335
+ State # dqstate {current_offset = NextOffset });
338
336
[{MsgId , RefCount , File , Offset , TotalSize }] ->
339
337
% % We already know about it, just update counter
340
338
ok = dets :insert (MsgLocation , {MsgId , RefCount + 1 , File ,
@@ -417,11 +415,11 @@ maybe_roll_to_new_file(Offset,
417
415
[write , raw , binary , delayed_write ]),
418
416
true = ets :update_element (FileSummary , CurName , {5 , NextName }), % % 5 is Right
419
417
true = ets :insert_new (FileSummary , {NextName , 0 , 0 , CurName , undefined }),
420
- {ok , State # dqstate { current_file_name = NextName ,
421
- current_file_handle = NextHdl ,
422
- current_file_num = NextNum ,
423
- current_offset = 0
424
- }};
418
+ {ok , State # dqstate { current_file_name = NextName ,
419
+ current_file_handle = NextHdl ,
420
+ current_file_num = NextNum ,
421
+ current_offset = 0
422
+ }};
425
423
maybe_roll_to_new_file (_ , State ) ->
426
424
{ok , State }.
427
425
@@ -745,8 +743,8 @@ load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) ->
745
743
sortMsgLocationsByOffset (false , L ),
746
744
MaxOffset + TotalSize + ? FILE_PACKING_ADJUSTMENT
747
745
end ,
748
- State # dqstate { current_file_num = Num , current_file_name = Left ,
749
- current_offset = Offset };
746
+ State # dqstate { current_file_num = Num , current_file_name = Left ,
747
+ current_offset = Offset };
750
748
load_messages (Left , [File |Files ],
751
749
State = # dqstate { msg_location = MsgLocation ,
752
750
file_summary = FileSummary
0 commit comments