Skip to content

Commit e027e1f

Browse files
committed
provide a non-parallel version of the status iteration
1 parent 974af7b commit e027e1f

File tree

7 files changed

+172
-69
lines changed

7 files changed

+172
-69
lines changed

gix/src/commit.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ pub mod describe {
5252
/// performed so that the `suffix` is appended to the output. If it is `None`, no check will be performed and
5353
/// there will be no suffix.
5454
/// Note that obtaining the dirty-state of the repository can be expensive.
55-
#[cfg(all(feature = "status", feature = "parallel"))]
55+
#[cfg(feature = "status")]
5656
pub fn format_with_dirty_suffix(
5757
self,
5858
dirty_suffix: impl Into<Option<String>>,
@@ -81,7 +81,7 @@ pub mod describe {
8181
#[error(transparent)]
8282
RefIterInit(#[from] crate::reference::iter::init::Error),
8383
#[error(transparent)]
84-
#[cfg(all(feature = "status", feature = "parallel"))]
84+
#[cfg(feature = "status")]
8585
DetermineIsDirty(#[from] crate::status::is_dirty::Error),
8686
}
8787

gix/src/status/index_worktree.rs

Lines changed: 128 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -182,30 +182,40 @@ impl Repository {
182182
///
183183
/// Changes to the index are collected and it's possible to write the index back using [iter::Outcome::write_changes()].
184184
/// Note that these changes are not observable, they will always be kept.
185-
#[cfg(feature = "parallel")]
185+
///
186+
/// ### Parallel Operation
187+
///
188+
/// Note that without the `parallel` feature, the iterator becomes 'serial', which means all status will be computed in advance
189+
/// and it's non-interruptable, yielding worse performance for is-dirty checks for instance as interruptions won't happen.
190+
/// It's a crutch that is just there to make single-threaded applications possible at all, as it's not really an iterator
191+
/// anymore. If this matters, better run [Repository::index_worktree_status()] by hand as it provides all control one would need,
192+
/// just not as an iterator.
186193
pub struct Iter {
194+
#[cfg(feature = "parallel")]
187195
#[allow(clippy::type_complexity)]
188196
rx_and_join: Option<(
189197
std::sync::mpsc::Receiver<iter::Item>,
190198
std::thread::JoinHandle<Result<iter::Outcome, crate::status::index_worktree::Error>>,
191199
)>,
200+
#[cfg(feature = "parallel")]
192201
should_interrupt: std::sync::Arc<AtomicBool>,
202+
/// Without parallelization, the iterator has to buffer all changes in advance.
203+
#[cfg(not(feature = "parallel"))]
204+
items: std::vec::IntoIter<iter::Item>,
193205
/// The outcome of the operation, only available once the operation has ended.
194206
out: Option<iter::Outcome>,
195207
/// The set of `(entry_index, change)` we extracted in order to potentially write back the index with the changes applied.
196208
changes: Vec<(usize, iter::ApplyChange)>,
197209
}
198210

199211
///
200-
#[cfg(feature = "parallel")]
201212
pub mod iter {
202213
use crate::bstr::BString;
203214
use crate::config::cache::util::ApplyLeniencyDefault;
204215
use crate::status::index_worktree::iter;
205216
use crate::status::{index_worktree, Platform};
206217
use crate::worktree::IndexPersistedOrInMemory;
207-
use crate::ThreadSafeRepository;
208-
use std::sync::atomic::{AtomicBool, Ordering};
218+
use std::sync::atomic::AtomicBool;
209219
use std::sync::Arc;
210220

211221
pub(super) enum ApplyChange {
@@ -434,8 +444,12 @@ pub mod iter {
434444
#[error(transparent)]
435445
Index(#[from] crate::worktree::open_index::Error),
436446
#[error("Failed to spawn producer thread")]
447+
#[cfg(feature = "parallel")]
437448
SpawnThread(#[source] std::io::Error),
438449
#[error(transparent)]
450+
#[cfg(not(feature = "parallel"))]
451+
IndexWorktreeStatus(#[from] crate::status::index_worktree::Error),
452+
#[error(transparent)]
439453
ConfigSkipHash(#[from] crate::config::boolean::Error),
440454
#[error(transparent)]
441455
PrepareSubmodules(#[from] crate::submodule::modules::Error),
@@ -457,8 +471,6 @@ pub mod iter {
457471
};
458472

459473
let should_interrupt = Arc::new(AtomicBool::default());
460-
let (tx, rx) = std::sync::mpsc::channel();
461-
let mut collect = Collect { tx };
462474
let skip_hash = self
463475
.repo
464476
.config
@@ -469,48 +481,93 @@ pub mod iter {
469481
.with_lenient_default(self.repo.config.lenient_config)?
470482
.unwrap_or_default();
471483
let submodule = ComputeSubmoduleStatus::new(self.repo.clone().into_sync(), self.submodules)?;
472-
let join = std::thread::Builder::new()
473-
.name("gix::status::index_worktree::iter::producer".into())
474-
.spawn({
475-
let repo = self.repo.clone().into_sync();
476-
let options = self.index_worktree_options;
477-
let should_interrupt = should_interrupt.clone();
478-
let mut progress = self.progress;
479-
move || -> Result<_, crate::status::index_worktree::Error> {
480-
let repo = repo.to_thread_local();
481-
let out = repo.index_worktree_status(
482-
&index,
483-
patterns,
484-
&mut collect,
485-
gix_status::index_as_worktree::traits::FastEq,
486-
submodule,
487-
&mut progress,
488-
&should_interrupt,
489-
options,
490-
)?;
491-
Ok(Outcome {
492-
index_worktree: out,
493-
index,
494-
changes: None,
495-
skip_hash,
496-
})
497-
}
484+
#[cfg(feature = "parallel")]
485+
{
486+
let (tx, rx) = std::sync::mpsc::channel();
487+
let mut collect = Collect { tx };
488+
let join = std::thread::Builder::new()
489+
.name("gix::status::index_worktree::iter::producer".into())
490+
.spawn({
491+
let repo = self.repo.clone().into_sync();
492+
let options = self.index_worktree_options;
493+
let should_interrupt = should_interrupt.clone();
494+
let mut progress = self.progress;
495+
move || -> Result<_, crate::status::index_worktree::Error> {
496+
let repo = repo.to_thread_local();
497+
let out = repo.index_worktree_status(
498+
&index,
499+
patterns,
500+
&mut collect,
501+
gix_status::index_as_worktree::traits::FastEq,
502+
submodule,
503+
&mut progress,
504+
&should_interrupt,
505+
options,
506+
)?;
507+
Ok(Outcome {
508+
index_worktree: out,
509+
index,
510+
changes: None,
511+
skip_hash,
512+
})
513+
}
514+
})
515+
.map_err(Error::SpawnThread)?;
516+
517+
Ok(super::Iter {
518+
rx_and_join: Some((rx, join)),
519+
should_interrupt,
520+
changes: Vec::new(),
521+
out: None,
498522
})
499-
.map_err(Error::SpawnThread)?;
500-
501-
Ok(super::Iter {
502-
rx_and_join: Some((rx, join)),
503-
should_interrupt,
504-
changes: Vec::new(),
505-
out: None,
506-
})
523+
}
524+
#[cfg(not(feature = "parallel"))]
525+
{
526+
let mut collect = Collect { items: Vec::new() };
527+
528+
let repo = self.repo.clone().into_sync();
529+
let options = self.index_worktree_options;
530+
let mut progress = self.progress;
531+
let repo = repo.to_thread_local();
532+
let out = repo.index_worktree_status(
533+
&index,
534+
patterns,
535+
&mut collect,
536+
gix_status::index_as_worktree::traits::FastEq,
537+
submodule,
538+
&mut progress,
539+
&should_interrupt,
540+
options,
541+
)?;
542+
let mut out = Outcome {
543+
index_worktree: out,
544+
index,
545+
changes: None,
546+
skip_hash,
547+
};
548+
let mut iter = super::Iter {
549+
items: Vec::new().into_iter(),
550+
changes: Vec::new(),
551+
out: None,
552+
};
553+
let items = collect
554+
.items
555+
.into_iter()
556+
.filter_map(|item| iter.maybe_keep_index_change(item))
557+
.collect::<Vec<_>>();
558+
out.changes = (!iter.changes.is_empty()).then(|| std::mem::take(&mut iter.changes));
559+
iter.items = items.into_iter();
560+
iter.out = Some(out);
561+
Ok(iter)
562+
}
507563
}
508564
}
509565

510566
impl Iterator for super::Iter {
511567
type Item = Result<Item, crate::status::index_worktree::Error>;
512568

513569
fn next(&mut self) -> Option<Self::Item> {
570+
#[cfg(feature = "parallel")]
514571
loop {
515572
let (rx, _join) = self.rx_and_join.as_ref()?;
516573
match rx.recv().ok() {
@@ -523,7 +580,8 @@ pub mod iter {
523580
None => {
524581
let (_rx, handle) = self.rx_and_join.take()?;
525582
break match handle.join().expect("no panic") {
526-
Ok(out) => {
583+
Ok(mut out) => {
584+
out.changes = Some(std::mem::take(&mut self.changes));
527585
self.out = Some(out);
528586
None
529587
}
@@ -532,6 +590,15 @@ pub mod iter {
532590
}
533591
}
534592
}
593+
#[cfg(not(feature = "parallel"))]
594+
self.items.next().map(Ok)
595+
}
596+
}
597+
598+
impl super::Iter {
599+
/// Return the outcome of the iteration, or `None` if the iterator isn't fully consumed.
600+
pub fn outcome_mut(&mut self) -> Option<&mut Outcome> {
601+
self.out.as_mut()
535602
}
536603
}
537604

@@ -562,9 +629,10 @@ pub mod iter {
562629
}
563630
}
564631

632+
#[cfg(feature = "parallel")]
565633
impl Drop for super::Iter {
566634
fn drop(&mut self) {
567-
self.should_interrupt.store(true, Ordering::Relaxed);
635+
self.should_interrupt.store(true, std::sync::atomic::Ordering::Relaxed);
568636
// Allow to temporarily 'leak' the producer to not block on drop, nobody
569637
// is interested in the result of the thread anymore.
570638
drop(self.rx_and_join.take());
@@ -574,7 +642,10 @@ pub mod iter {
574642
#[derive(Clone)]
575643
struct ComputeSubmoduleStatus {
576644
mode: crate::status::Submodule,
577-
repo: ThreadSafeRepository,
645+
#[cfg(feature = "parallel")]
646+
repo: crate::ThreadSafeRepository,
647+
#[cfg(not(feature = "parallel"))]
648+
git_dir: std::path::PathBuf,
578649
submodule_paths: Vec<BString>,
579650
}
580651

@@ -605,7 +676,10 @@ pub mod iter {
605676
};
606677
Ok(Self {
607678
mode,
679+
#[cfg(feature = "parallel")]
608680
repo,
681+
#[cfg(not(feature = "parallel"))]
682+
git_dir: local_repo.git_dir().to_owned(),
609683
submodule_paths,
610684
})
611685
}
@@ -638,7 +712,12 @@ pub mod iter {
638712
{
639713
return Ok(None);
640714
}
715+
#[cfg(feature = "parallel")]
641716
let repo = self.repo.to_thread_local();
717+
#[cfg(not(feature = "parallel"))]
718+
let Ok(repo) = crate::open(&self.git_dir) else {
719+
return Ok(None);
720+
};
642721
let Ok(Some(mut submodules)) = repo.submodules() else {
643722
return Ok(None);
644723
};
@@ -656,7 +735,10 @@ pub mod iter {
656735
}
657736

658737
struct Collect {
738+
#[cfg(feature = "parallel")]
659739
tx: std::sync::mpsc::Sender<Item>,
740+
#[cfg(not(feature = "parallel"))]
741+
items: Vec<Item>,
660742
}
661743

662744
impl<'index> gix_status::index_as_worktree_with_renames::VisitEntry<'index> for Collect {
@@ -673,7 +755,10 @@ pub mod iter {
673755
>,
674756
) {
675757
// NOTE: we assume that the receiver triggers interruption so the operation will stop if the receiver is down.
758+
#[cfg(feature = "parallel")]
676759
self.tx.send(entry.into()).ok();
760+
#[cfg(not(feature = "parallel"))]
761+
self.items.push(entry.into());
677762
}
678763
}
679764
}

gix/src/status/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@ pub struct Platform<'repo, Progress>
66
where
77
Progress: gix_features::progress::Progress + 'static,
88
{
9-
#[cfg_attr(not(feature = "parallel"), allow(dead_code))]
109
repo: &'repo Repository,
11-
#[cfg_attr(not(feature = "parallel"), allow(dead_code))]
1210
progress: Progress,
1311
index: Option<crate::worktree::IndexPersistedOrInMemory>,
1412
submodules: Submodule,
@@ -84,7 +82,6 @@ impl Repository {
8482
}
8583

8684
///
87-
#[cfg(feature = "parallel")]
8885
pub mod is_dirty {
8986
use crate::Repository;
9087

gix/src/submodule/mod.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ impl<'repo> Submodule<'repo> {
276276
}
277277

278278
///
279-
#[cfg(all(feature = "status", feature = "parallel"))]
279+
#[cfg(feature = "status")]
280280
pub mod status {
281281
use super::{head_id, index_id, open, Status};
282282
use crate::Submodule;
@@ -300,6 +300,8 @@ pub mod status {
300300
StatusPlatform(#[from] crate::config::boolean::Error),
301301
#[error(transparent)]
302302
Status(#[from] crate::status::index_worktree::iter::Error),
303+
#[error(transparent)]
304+
IndexWorktreeStatus(#[from] crate::status::index_worktree::Error),
303305
}
304306

305307
impl<'repo> Submodule<'repo> {
@@ -379,19 +381,19 @@ pub mod status {
379381
return Ok(status);
380382
}
381383

382-
status.changes = Some(
383-
adjust_options(sm_repo.status(gix_features::progress::Discard)?)
384-
.index_worktree_options_mut(|opts| {
385-
assert!(opts.dirwalk_options.is_some(), "BUG: it's supposed to be the default");
386-
if ignore == config::Ignore::Untracked {
387-
opts.dirwalk_options = None;
388-
}
389-
})
390-
.into_index_worktree_iter(Vec::new())?
391-
.filter_map(Result::ok)
392-
.collect(),
393-
);
394-
384+
let statusses = adjust_options(sm_repo.status(gix_features::progress::Discard)?)
385+
.index_worktree_options_mut(|opts| {
386+
assert!(opts.dirwalk_options.is_some(), "BUG: it's supposed to be the default");
387+
if ignore == config::Ignore::Untracked {
388+
opts.dirwalk_options = None;
389+
}
390+
})
391+
.into_index_worktree_iter(Vec::new())?;
392+
let mut changes = Vec::new();
393+
for change in statusses {
394+
changes.push(change?);
395+
}
396+
status.changes = Some(changes);
395397
Ok(status)
396398
}
397399
}
@@ -444,7 +446,7 @@ pub mod status {
444446
}
445447
}
446448
}
447-
#[cfg(all(feature = "status", feature = "parallel"))]
449+
#[cfg(feature = "status")]
448450
pub use status::types::Status;
449451

450452
/// A summary of the state of all parts forming a submodule, which allows to answer various questions about it.

gix/tests/commit/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ mod describe {
44

55
use crate::named_repo;
66

7-
#[cfg(all(feature = "status", feature = "parallel"))]
7+
#[cfg(feature = "status")]
88
mod with_dirty_suffix {
99
use crate::util::named_subrepo_opts;
1010
use gix::commit::describe::SelectRef;

0 commit comments

Comments
 (0)