Skip to content

Commit 021d15f

Browse files
bors[bot]cuviper
andcommitted
644: Add ParallelIterator::panic_fuse() r=nikomatsakis a=cuviper This wraps an iterator with a fuse in case of panics, to halt all threads as soon as possible. While we always propagate panics anyway, that doesn't usually synchronize across threads, so iterators may end up processing many more items before the panic is realized. With `panic_fuse()`, a shared `AtomicBool` is checked before processing each item, and set for any panic unwinding in its path. Co-authored-by: Josh Stone <[email protected]>
2 parents 6c9e02e + ded3266 commit 021d15f

File tree

9 files changed

+494
-14
lines changed

9 files changed

+494
-14
lines changed

src/compile_fail/must_use.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ must_use! {
4949
map /** v.par_iter().map(|x| x); */
5050
map_with /** v.par_iter().map_with(0, |_, x| x); */
5151
map_init /** v.par_iter().map_init(|| 0, |_, x| x); */
52+
panic_fuse /** v.par_iter().panic_fuse(); */
5253
rev /** v.par_iter().rev(); */
5354
skip /** v.par_iter().skip(1); */
5455
take /** v.par_iter().take(1); */

src/iter/collect/consumer.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ use std::sync::atomic::{AtomicUsize, Ordering};
77
pub struct CollectConsumer<'c, T: Send + 'c> {
88
/// Tracks how many items we successfully wrote. Used to guarantee
99
/// safety in the face of panics or buggy parallel iterators.
10+
///
11+
/// In theory we could just produce this as a `CollectConsumer::Result`,
12+
/// folding local counts and reducing by addition, but that requires a
13+
/// certain amount of trust that the producer driving this will behave
14+
/// itself. Since this count is important to the safety of marking the
15+
/// memory initialized (`Vec::set_len`), we choose to keep it internal.
1016
writes: &'c AtomicUsize,
1117

1218
/// A slice covering the target memory, not yet initialized!
@@ -85,7 +91,8 @@ impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> {
8591
}
8692

8793
fn complete(self) {
88-
assert!(self.target.len() == 0, "too few values pushed to consumer");
94+
// NB: We don't explicitly check that the local writes were complete,
95+
// but `Collect::complete()` will assert the global write count.
8996

9097
// track total values written
9198
self.global_writes

src/iter/collect/test.rs

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
use super::Collect;
99
use iter::plumbing::*;
10+
use rayon_core::join;
1011

1112
/// Promises to produce 2 items, but then produces 3. Does not do any
1213
/// splits at all.
@@ -25,15 +26,18 @@ fn produce_too_many_items() {
2526
/// Produces fewer items than promised. Does not do any
2627
/// splits at all.
2728
#[test]
28-
#[should_panic(expected = "too few values")]
29+
#[should_panic(expected = "expected 5 total writes, but got 2")]
2930
fn produce_fewer_items() {
3031
let mut v = vec![];
3132
let mut collect = Collect::new(&mut v, 5);
32-
let consumer = collect.as_consumer();
33-
let mut folder = consumer.into_folder();
34-
folder = folder.consume(22);
35-
folder = folder.consume(23);
36-
folder.complete();
33+
{
34+
let consumer = collect.as_consumer();
35+
let mut folder = consumer.into_folder();
36+
folder = folder.consume(22);
37+
folder = folder.consume(23);
38+
folder.complete();
39+
}
40+
collect.complete();
3741
}
3842

3943
// Complete is not called by the consumer.Hence,the collection vector is not fully initialized.
@@ -129,7 +133,7 @@ fn right_produces_too_many_items() {
129133
// The left consumer produces fewer items while the right
130134
// consumer produces correct number.
131135
#[test]
132-
#[should_panic(expected = "too few values")]
136+
#[should_panic(expected = "expected 4 total writes, but got 3")]
133137
fn left_produces_fewer_items() {
134138
let mut v = vec![];
135139
let mut collect = Collect::new(&mut v, 4);
@@ -149,7 +153,7 @@ fn left_produces_fewer_items() {
149153
// The right consumer produces fewer items while the left
150154
// consumer produces correct number.
151155
#[test]
152-
#[should_panic(expected = "too few values")]
156+
#[should_panic(expected = "expected 4 total writes, but got 3")]
153157
fn right_produces_fewer_items() {
154158
let mut v = vec![];
155159
let mut collect = Collect::new(&mut v, 4);
@@ -165,3 +169,55 @@ fn right_produces_fewer_items() {
165169
}
166170
collect.complete();
167171
}
172+
173+
// The left consumer panics and the right stops short, like `panic_fuse()`.
174+
// We should get the left panic without ever reaching `Collect::complete()`.
175+
#[test]
176+
#[should_panic(expected = "left consumer panic")]
177+
fn left_panics() {
178+
let mut v = vec![];
179+
let mut collect = Collect::new(&mut v, 4);
180+
{
181+
let consumer = collect.as_consumer();
182+
let (left_consumer, right_consumer, _) = consumer.split_at(2);
183+
join(
184+
|| {
185+
let mut left_folder = left_consumer.into_folder();
186+
left_folder = left_folder.consume(0);
187+
panic!("left consumer panic");
188+
},
189+
|| {
190+
let mut right_folder = right_consumer.into_folder();
191+
right_folder = right_folder.consume(2);
192+
right_folder.complete() // early return
193+
},
194+
);
195+
}
196+
collect.complete();
197+
}
198+
199+
// The right consumer panics and the left stops short, like `panic_fuse()`.
200+
// We should get the right panic without ever reaching `Collect::complete()`.
201+
#[test]
202+
#[should_panic(expected = "right consumer panic")]
203+
fn right_panics() {
204+
let mut v = vec![];
205+
let mut collect = Collect::new(&mut v, 4);
206+
{
207+
let consumer = collect.as_consumer();
208+
let (left_consumer, right_consumer, _) = consumer.split_at(2);
209+
join(
210+
|| {
211+
let mut left_folder = left_consumer.into_folder();
212+
left_folder = left_folder.consume(0);
213+
left_folder.complete() // early return
214+
},
215+
|| {
216+
let mut right_folder = right_consumer.into_folder();
217+
right_folder = right_folder.consume(2);
218+
panic!("right consumer panic");
219+
},
220+
);
221+
}
222+
collect.complete();
223+
}

src/iter/mod.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ mod sum;
155155
pub use self::cloned::Cloned;
156156
mod inspect;
157157
pub use self::inspect::Inspect;
158+
mod panic_fuse;
159+
pub use self::panic_fuse::PanicFuse;
158160
mod while_some;
159161
pub use self::while_some::WhileSome;
160162
mod extend;
@@ -1730,6 +1732,40 @@ pub trait ParallelIterator: Sized + Send {
17301732
while_some::new(self)
17311733
}
17321734

1735+
/// Wraps an iterator with a fuse in case of panics, to halt all threads
1736+
/// as soon as possible.
1737+
///
1738+
/// Panics within parallel iterators are always propagated to the caller,
1739+
/// but they don't always halt the rest of the iterator right away, due to
1740+
/// the internal semantics of [`join`]. This adaptor makes a greater effort
1741+
/// to stop processing other items sooner, with the cost of additional
1742+
/// synchronization overhead, which may also inhibit some optimizations.
1743+
///
1744+
/// [`join`]: ../fn.join.html#panics
1745+
///
1746+
/// # Examples
1747+
///
1748+
/// If this code didn't use `panic_fuse()`, it would continue processing
1749+
/// many more items in other threads (with long sleep delays) before the
1750+
/// panic is finally propagated.
1751+
///
1752+
/// ```should_panic
1753+
/// use rayon::prelude::*;
1754+
/// use std::{thread, time};
1755+
///
1756+
/// (0..1_000_000)
1757+
/// .into_par_iter()
1758+
/// .panic_fuse()
1759+
/// .for_each(|i| {
1760+
/// // simulate some work
1761+
/// thread::sleep(time::Duration::from_secs(1));
1762+
/// assert!(i > 0); // oops!
1763+
/// });
1764+
/// ```
1765+
fn panic_fuse(self) -> PanicFuse<Self> {
1766+
panic_fuse::new(self)
1767+
}
1768+
17331769
/// Create a fresh collection containing all the element produced
17341770
/// by this parallel iterator.
17351771
///

0 commit comments

Comments
 (0)