137
137
-record (client_msstate ,
138
138
{ server ,
139
139
client_ref ,
140
+ current_file ,
140
141
index_state ,
141
142
index_module ,
142
143
dir ,
182
183
-type client_msstate () :: # client_msstate {
183
184
server :: server (),
184
185
client_ref :: client_ref (),
186
+ current_file :: undefined | {non_neg_integer (), file :fd ()},
185
187
index_state :: any (),
186
188
index_module :: atom (),
187
189
% % Stored as binary() as opposed to file:filename() to save memory.
@@ -435,6 +437,7 @@ client_init(Server, Ref, MsgOnDiskFun) when is_pid(Server); is_atom(Server) ->
435
437
? CREDIT_DISC_BOUND ),
436
438
# client_msstate { server = Server ,
437
439
client_ref = Ref ,
440
+ current_file = undefined ,
438
441
index_state = IState ,
439
442
index_module = IModule ,
440
443
dir = rabbit_file :filename_to_binary (Dir ),
@@ -446,13 +449,21 @@ client_init(Server, Ref, MsgOnDiskFun) when is_pid(Server); is_atom(Server) ->
446
449
-spec client_terminate (client_msstate ()) -> 'ok' .
447
450
448
451
client_terminate (CState = # client_msstate { client_ref = Ref }) ->
449
- ok = server_call (CState , {client_terminate , Ref }).
452
+ ok = server_call (CState , {client_terminate , Ref }),
453
+ client_maybe_close_current_file (CState ).
450
454
451
455
-spec client_delete_and_terminate (client_msstate ()) -> 'ok' .
452
456
453
457
client_delete_and_terminate (CState = # client_msstate { client_ref = Ref }) ->
454
458
ok = server_cast (CState , {client_dying , Ref }),
455
- ok = server_cast (CState , {client_delete , Ref }).
459
+ ok = server_cast (CState , {client_delete , Ref }),
460
+ client_maybe_close_current_file (CState ).
461
+
462
+ client_maybe_close_current_file (# client_msstate { current_file = CurrentFile }) ->
463
+ case CurrentFile of
464
+ undefined -> ok ;
465
+ {_File , Fd } -> ok = file :close (Fd )
466
+ end .
456
467
457
468
-spec client_ref (client_msstate ()) -> client_ref ().
458
469
@@ -1348,9 +1359,21 @@ write_message(MsgId, Msg,
1348
1359
current_file_offset = CurOffset + TotalSize }).
1349
1360
1350
1361
read_from_disk (# msg_location { msg_id = MsgId , file = File , offset = Offset ,
1351
- total_size = TotalSize }, State = # client_msstate { dir = Dir }) ->
1352
- {ok , Hdl } = file :open (form_filename (Dir , filenum_to_name (File )),
1353
- [binary , read , raw ]),
1362
+ total_size = TotalSize }, State = # client_msstate { current_file = CurrentFile0 , dir = Dir }) ->
1363
+ {ok , Hdl } = case CurrentFile0 of
1364
+ {File , Fd } ->
1365
+ {ok , Fd };
1366
+ {_AnotherFile , Fd } ->
1367
+ ok = file :close (Fd ),
1368
+ file :open (form_filename (Dir , filenum_to_name (File )),
1369
+ [binary , read , raw ]);
1370
+ undefined ->
1371
+ file :open (form_filename (Dir , filenum_to_name (File )),
1372
+ [binary , read , raw ])
1373
+ end ,
1374
+
1375
+ % {ok, Hdl} = file:open(form_filename(Dir, filenum_to_name(File)),
1376
+ % [binary, read, raw]),
1354
1377
{ok , {MsgId , Msg }} =
1355
1378
case rabbit_msg_file :pread (Hdl , Offset , TotalSize ) of
1356
1379
{ok , {MsgId , _ }} = Obj ->
@@ -1364,8 +1387,8 @@ read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset,
1364
1387
{proc_dict , get ()}
1365
1388
]}}
1366
1389
end ,
1367
- ok = file :close (Hdl ),
1368
- {Msg , State }.
1390
+ % ok = file:close(Hdl),
1391
+ {Msg , State # client_msstate { current_file = { File , Hdl }} }.
1369
1392
1370
1393
contains_message (MsgId , From , State ) ->
1371
1394
MsgLocation = index_lookup_positive_ref_count (MsgId , State ),
0 commit comments