@@ -414,7 +414,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
414
414
}
415
415
416
416
async fn advance_timestamp (
417
- bounds_writer : & Writer < SnapshotBounds > ,
417
+ bounds_writer : & mut Writer < SnapshotBounds > ,
418
418
persistence : & dyn Persistence ,
419
419
snapshot_reader : & Reader < SnapshotManager > ,
420
420
checkpoint_reader : & Reader < Checkpoint > ,
@@ -497,7 +497,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
497
497
}
498
498
499
499
async fn go_advance_min_snapshot (
500
- bounds_writer : Writer < SnapshotBounds > ,
500
+ mut bounds_writer : Writer < SnapshotBounds > ,
501
501
checkpoint_reader : Reader < Checkpoint > ,
502
502
rt : RT ,
503
503
persistence : Arc < dyn Persistence > ,
@@ -511,7 +511,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
511
511
let _timer = retention_advance_timestamp_timer ( ) ;
512
512
513
513
let index_ts = Self :: advance_timestamp (
514
- & bounds_writer,
514
+ & mut bounds_writer,
515
515
persistence. as_ref ( ) ,
516
516
& snapshot_reader,
517
517
& checkpoint_reader,
@@ -522,7 +522,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
522
522
Self :: emit_timestamp ( & min_snapshot_sender, index_ts) . await ;
523
523
524
524
let document_ts = Self :: advance_timestamp (
525
- & bounds_writer,
525
+ & mut bounds_writer,
526
526
persistence. as_ref ( ) ,
527
527
& snapshot_reader,
528
528
& checkpoint_reader,
@@ -1103,7 +1103,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
1103
1103
mut index_cursor : RepeatableTimestamp ,
1104
1104
retention_validator : Arc < dyn RetentionValidator > ,
1105
1105
mut min_snapshot_rx : Receiver < RepeatableTimestamp > ,
1106
- checkpoint_writer : Writer < Checkpoint > ,
1106
+ mut checkpoint_writer : Writer < Checkpoint > ,
1107
1107
snapshot_reader : Reader < SnapshotManager > ,
1108
1108
) {
1109
1109
let reader = persistence. reader ( ) ;
@@ -1178,7 +1178,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
1178
1178
Self :: checkpoint (
1179
1179
persistence. as_ref ( ) ,
1180
1180
new_cursor,
1181
- & checkpoint_writer,
1181
+ & mut checkpoint_writer,
1182
1182
RetentionType :: Index ,
1183
1183
bounds_reader. clone ( ) ,
1184
1184
snapshot_reader. clone ( ) ,
@@ -1218,7 +1218,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
1218
1218
persistence : Arc < dyn Persistence > ,
1219
1219
retention_validator : Arc < dyn RetentionValidator > ,
1220
1220
mut min_document_snapshot_rx : Receiver < RepeatableTimestamp > ,
1221
- checkpoint_writer : Writer < Checkpoint > ,
1221
+ mut checkpoint_writer : Writer < Checkpoint > ,
1222
1222
snapshot_reader : Reader < SnapshotManager > ,
1223
1223
) {
1224
1224
// Wait with jitter on startup to avoid thundering herd
@@ -1284,7 +1284,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
1284
1284
Self :: checkpoint (
1285
1285
persistence. as_ref ( ) ,
1286
1286
new_cursor,
1287
- & checkpoint_writer,
1287
+ & mut checkpoint_writer,
1288
1288
RetentionType :: Document ,
1289
1289
bounds_reader. clone ( ) ,
1290
1290
snapshot_reader. clone ( ) ,
@@ -1314,7 +1314,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
1314
1314
async fn checkpoint (
1315
1315
persistence : & dyn Persistence ,
1316
1316
cursor : RepeatableTimestamp ,
1317
- checkpoint_writer : & Writer < Checkpoint > ,
1317
+ checkpoint_writer : & mut Writer < Checkpoint > ,
1318
1318
retention_type : RetentionType ,
1319
1319
bounds_reader : Reader < SnapshotBounds > ,
1320
1320
snapshot_reader : Reader < SnapshotManager > ,
0 commit comments