Skip to content

Commit e0c02a0

Browse files
committed
fix all tests
1 parent 997d8f1 commit e0c02a0

File tree

2 files changed

+69
-76
lines changed

2 files changed

+69
-76
lines changed

src/store/bao_file.rs

Lines changed: 18 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use std::{
1414
io,
1515
ops::{Deref, DerefMut},
1616
path::{Path, PathBuf},
17-
sync::{Arc, RwLock, Weak},
17+
sync::{Arc, Weak},
1818
};
1919

2020
use bao_tree::{
@@ -343,7 +343,7 @@ impl<T> BaoFileHandleWeak<T> {
343343
/// The inner part of a bao file handle.
344344
#[derive(Debug)]
345345
pub struct BaoFileHandleInner<T> {
346-
pub(crate) storage: RwLock<BaoFileStorage<T>>,
346+
pub(crate) storage: tokio::sync::RwLock<BaoFileStorage<T>>,
347347
config: Arc<BaoFileConfig>,
348348
hash: Hash,
349349
}
@@ -432,15 +432,9 @@ where
432432
return res;
433433
}
434434
};
435-
// otherwise, we have to spawn a task.
436-
let (handle, res) = tokio::task::spawn_blocking(move || {
437-
let storage = handle.storage.read().unwrap();
438-
let res = f(storage.deref());
439-
drop(storage);
440-
(handle, res)
441-
})
442-
.await
443-
.expect("spawn_blocking failed");
435+
let storage_guard = handle.storage.read().await;
436+
let res = f(storage_guard.deref());
437+
drop(storage_guard);
444438
*opt = Some(handle);
445439
res
446440
}
@@ -528,7 +522,7 @@ where
528522
pub fn incomplete_mem(config: Arc<BaoFileConfig>, hash: Hash) -> Self {
529523
let storage = BaoFileStorage::incomplete_mem();
530524
Self(Arc::new(BaoFileHandleInner {
531-
storage: RwLock::new(storage),
525+
storage: tokio::sync::RwLock::new(storage),
532526
config,
533527
hash,
534528
}))
@@ -543,7 +537,7 @@ where
543537
sizes: create_read_write(&paths.sizes)?,
544538
});
545539
Ok(Self(Arc::new(BaoFileHandleInner {
546-
storage: RwLock::new(storage),
540+
storage: tokio::sync::RwLock::new(storage),
547541
config,
548542
hash,
549543
})))
@@ -558,7 +552,7 @@ where
558552
) -> Self {
559553
let storage = BaoFileStorage::Complete(CompleteStorage { data, outboard });
560554
Self(Arc::new(BaoFileHandleInner {
561-
storage: RwLock::new(storage),
555+
storage: tokio::sync::RwLock::new(storage),
562556
config,
563557
hash,
564558
}))
@@ -567,20 +561,20 @@ where
567561
/// Transform the storage in place. If the transform fails, the storage will
568562
/// be an immutable empty storage.
569563
#[cfg(feature = "fs-store")]
570-
pub(crate) fn transform(
564+
pub(crate) async fn transform(
571565
&self,
572-
f: impl FnOnce(BaoFileStorage<T>) -> io::Result<BaoFileStorage<T>>,
566+
f: impl AsyncFnOnce(BaoFileStorage<T>) -> io::Result<BaoFileStorage<T>>,
573567
) -> io::Result<()> {
574-
let mut lock = self.storage.write().unwrap();
568+
let mut lock = self.storage.write().await;
575569
let storage = lock.take();
576-
*lock = f(storage)?;
570+
*lock = f(storage).await?;
577571
Ok(())
578572
}
579573

580574
/// True if the file is complete.
581575
pub fn is_complete(&self) -> bool {
582576
matches!(
583-
self.storage.read().unwrap().deref(),
577+
self.storage.try_read().unwrap().deref(),
584578
BaoFileStorage::Complete(_)
585579
)
586580
}
@@ -603,7 +597,7 @@ where
603597

604598
/// The most precise known total size of the data file.
605599
pub fn current_size(&self) -> io::Result<u64> {
606-
match self.storage.read().unwrap().deref() {
600+
match self.storage.try_read().unwrap().deref() {
607601
BaoFileStorage::Complete(mem) => Ok(mem.data_size()),
608602
BaoFileStorage::IncompleteMem(mem) => Ok(mem.current_size()),
609603
BaoFileStorage::IncompleteFile(file) => file.current_size(),
@@ -633,8 +627,8 @@ where
633627
}
634628

635629
/// This is the synchronous impl for writing a batch.
636-
fn write_batch(&self, size: u64, batch: &[BaoContentItem]) -> io::Result<HandleChange> {
637-
let mut storage = self.storage.write().unwrap();
630+
async fn write_batch(&self, size: u64, batch: &[BaoContentItem]) -> io::Result<HandleChange> {
631+
let mut storage = self.storage.write().await;
638632
match storage.deref_mut() {
639633
BaoFileStorage::IncompleteMem(mem) => {
640634
// check if we need to switch to file mode, otherwise write to memory
@@ -730,12 +724,7 @@ where
730724
let Some(handle) = self.0.take() else {
731725
return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy"));
732726
};
733-
let (handle, change) = tokio::task::spawn_blocking(move || {
734-
let change = handle.write_batch(size, &batch);
735-
(handle, change)
736-
})
737-
.await
738-
.expect("spawn_blocking failed");
727+
let change = handle.write_batch(size, &batch).await;
739728
match change? {
740729
HandleChange::None => {}
741730
HandleChange::MemToFile => {
@@ -752,12 +741,7 @@ where
752741
let Some(handle) = self.0.take() else {
753742
return Err(io::Error::new(io::ErrorKind::Other, "deferred batch busy"));
754743
};
755-
let (handle, res) = tokio::task::spawn_blocking(move || {
756-
let res = handle.storage.write().unwrap().sync_all();
757-
(handle, res)
758-
})
759-
.await
760-
.expect("spawn_blocking failed");
744+
let res = handle.storage.write().await.sync_all();
761745
self.0 = Some(handle);
762746
res
763747
}

src/store/fs.rs

Lines changed: 51 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1105,7 +1105,7 @@ where
11051105
.into(),
11061106
)
11071107
})?;
1108-
block_for(self.fs.create_dir_all(parent))?;
1108+
self.fs.create_dir_all(parent).await?;
11091109
let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash));
11101110
let (tx, rx) = oneshot::channel();
11111111
self.tx
@@ -2003,7 +2003,11 @@ where
20032003
Ok(())
20042004
}
20052005

2006-
fn import(&mut self, tables: &mut Tables, cmd: Import) -> ActorResult<(TempTag, u64)> {
2006+
async fn import(
2007+
&mut self,
2008+
tables: &mut Tables<'_>,
2009+
cmd: Import,
2010+
) -> ActorResult<(TempTag, u64)> {
20072011
let Import {
20082012
content_id,
20092013
source: file,
@@ -2028,7 +2032,9 @@ where
20282032
external_path.display()
20292033
);
20302034
let data = Bytes::from(
2031-
block_for(self.fs.read(&external_path))
2035+
self.fs
2036+
.read(&external_path)
2037+
.await
20322038
.map_err(|e| ActorError::Io(e.into()))?,
20332039
);
20342040
DataLocation::Inline(data)
@@ -2420,41 +2426,45 @@ where
24202426
Ok(())
24212427
}
24222428

2423-
fn on_complete(
2429+
async fn on_complete(
24242430
&mut self,
2425-
tables: &mut Tables,
2431+
tables: &mut Tables<'_>,
24262432
entry: BaoFileHandle<T::File>,
24272433
) -> ActorResult<()> {
24282434
let hash = entry.hash();
24292435
let mut info = None;
24302436
tracing::trace!("on_complete({})", hash.to_hex());
2431-
entry.transform(|state| {
2432-
tracing::trace!("on_complete transform {:?}", state);
2433-
let entry = match complete_storage(
2434-
state,
2435-
&hash,
2436-
&self.options.path,
2437-
&self.options.inline,
2438-
tables.delete_after_commit,
2439-
self.fs.clone(),
2440-
)? {
2441-
Ok(entry) => {
2442-
// store the info so we can insert it into the db later
2443-
info = Some((
2444-
entry.data_size(),
2445-
entry.data.mem().cloned(),
2446-
entry.outboard_size(),
2447-
entry.outboard.mem().cloned(),
2448-
));
2449-
entry
2450-
}
2451-
Err(entry) => {
2452-
// the entry was already complete, nothing to do
2453-
entry
2454-
}
2455-
};
2456-
Ok(BaoFileStorage::Complete(entry))
2457-
})?;
2437+
entry
2438+
.transform(async |state| {
2439+
tracing::trace!("on_complete transform {:?}", state);
2440+
let entry = match complete_storage(
2441+
state,
2442+
&hash,
2443+
&self.options.path,
2444+
&self.options.inline,
2445+
tables.delete_after_commit,
2446+
self.fs.clone(),
2447+
)
2448+
.await?
2449+
{
2450+
Ok(entry) => {
2451+
// store the info so we can insert it into the db later
2452+
info = Some((
2453+
entry.data_size(),
2454+
entry.data.mem().cloned(),
2455+
entry.outboard_size(),
2456+
entry.outboard.mem().cloned(),
2457+
));
2458+
entry
2459+
}
2460+
Err(entry) => {
2461+
// the entry was already complete, nothing to do
2462+
entry
2463+
}
2464+
};
2465+
Ok(BaoFileStorage::Complete(entry))
2466+
})
2467+
.await?;
24582468
if let Some((data_size, data, outboard_size, outboard)) = info {
24592469
let data_location = if data.is_some() {
24602470
DataLocation::Inline(())
@@ -2584,7 +2594,7 @@ where
25842594
) -> ActorResult<std::result::Result<(), ActorMessage<T::File>>> {
25852595
match msg {
25862596
ActorMessage::Import { cmd, tx } => {
2587-
let res = self.import(tables, cmd);
2597+
let res = self.import(tables, cmd).await;
25882598
tx.send(res).ok();
25892599
}
25902600
ActorMessage::SetTag { tag, value, tx } => {
@@ -2612,7 +2622,7 @@ where
26122622
tx.send(res).ok();
26132623
}
26142624
ActorMessage::OnComplete { handle } => {
2615-
let res = self.on_complete(tables, handle);
2625+
let res = self.on_complete(tables, handle).await;
26162626
res.ok();
26172627
}
26182628
ActorMessage::Export { cmd, tx } => {
@@ -2794,7 +2804,7 @@ async fn load_outboard<T: Persistence>(
27942804
}
27952805

27962806
/// Take a possibly incomplete storage and turn it into complete
2797-
fn complete_storage<T>(
2807+
async fn complete_storage<T>(
27982808
storage: BaoFileStorage<T::File>,
27992809
hash: &Hash,
28002810
path_options: &PathOptions,
@@ -2861,13 +2871,12 @@ where
28612871
)
28622872
}
28632873
MemOrFile::File(data) => MemOrFile::File(
2864-
block_for(
2865-
FileAndSize {
2866-
file: data,
2867-
size: data_size,
2868-
}
2869-
.map_async(move |f| fs_2.convert_std_file(f)),
2870-
)
2874+
FileAndSize {
2875+
file: data,
2876+
size: data_size,
2877+
}
2878+
.map_async(move |f| fs_2.convert_std_file(f))
2879+
.await
28712880
.transpose()
28722881
.map_err(|e| ActorError::Io(e.into()))?,
28732882
),

0 commit comments

Comments
 (0)