Skip to content

Commit ed518c7

Browse files
committed
Add ParallelIterator::panic_fuse()
This wraps an iterator with a fuse in case of panics, to halt all threads as soon as possible. While we always propagate panics anyway, that doesn't usually synchronize across threads, so iterators may end up processing many more items before the panic is realized. With `panic_fuse()`, a shared `AtomicBool` is checked before processing each item, and set for any panic unwinding in its path.
1 parent 047ea91 commit ed518c7

File tree

7 files changed

+422
-5
lines changed

7 files changed

+422
-5
lines changed

src/compile_fail/must_use.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ must_use! {
4949
map /** v.par_iter().map(|x| x); */
5050
map_with /** v.par_iter().map_with(0, |_, x| x); */
5151
map_init /** v.par_iter().map_init(|| 0, |_, x| x); */
52+
panic_fuse /** v.par_iter().panic_fuse(); */
5253
rev /** v.par_iter().rev(); */
5354
skip /** v.par_iter().skip(1); */
5455
take /** v.par_iter().take(1); */

src/iter/mod.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ mod sum;
155155
pub use self::cloned::Cloned;
156156
mod inspect;
157157
pub use self::inspect::Inspect;
158+
mod panic_fuse;
159+
pub use self::panic_fuse::PanicFuse;
158160
mod while_some;
159161
pub use self::while_some::WhileSome;
160162
mod extend;
@@ -1730,6 +1732,40 @@ pub trait ParallelIterator: Sized + Send {
17301732
while_some::new(self)
17311733
}
17321734

1735+
/// Wraps an iterator with a fuse in case of panics, to halt all threads
1736+
/// as soon as possible.
1737+
///
1738+
/// Panics within parallel iterators are always propagated to the caller,
1739+
/// but they don't always halt the rest of the iterator right away, due to
1740+
/// the internal semantics of [`join`]. This adaptor makes a greater effort
1741+
/// to stop processing other items sooner, with the cost of additional
1742+
/// synchronization overhead, which may also inhibit some optimizations.
1743+
///
1744+
/// [`join`]: ../fn.join.html#panics
1745+
///
1746+
/// # Examples
1747+
///
1748+
/// If this code didn't use `panic_fuse()`, it would continue processing
1749+
/// many more items in other threads (with long sleep delays) before the
1750+
/// panic is finally propagated.
1751+
///
1752+
/// ```should_panic
1753+
/// use rayon::prelude::*;
1754+
/// use std::{thread, time};
1755+
///
1756+
/// (0..1_000_000)
1757+
/// .into_par_iter()
1758+
/// .panic_fuse()
1759+
/// .for_each(|i| {
1760+
/// // simulate some work
1761+
/// thread::sleep(time::Duration::from_secs(1));
1762+
/// assert!(i > 0); // oops!
1763+
/// });
1764+
/// ```
1765+
fn panic_fuse(self) -> PanicFuse<Self> {
1766+
panic_fuse::new(self)
1767+
}
1768+
17331769
/// Create a fresh collection containing all the element produced
17341770
/// by this parallel iterator.
17351771
///

src/iter/panic_fuse.rs

Lines changed: 338 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
use super::plumbing::*;
2+
use super::*;
3+
use std::sync::atomic::{AtomicBool, Ordering};
4+
use std::thread;
5+
6+
/// `PanicFuse` is an adaptor that wraps an iterator with a fuse in case
7+
/// of panics, to halt all threads as soon as possible.
8+
///
9+
/// This struct is created by the [`panic_fuse()`] method on [`ParallelIterator`]
10+
///
11+
/// [`panic_fuse()`]: trait.ParallelIterator.html#method.panic_fuse
12+
/// [`ParallelIterator`]: trait.ParallelIterator.html
13+
#[must_use = "iterator adaptors are lazy and do nothing unless consumed"]
14+
#[derive(Debug, Clone)]
15+
pub struct PanicFuse<I: ParallelIterator> {
16+
base: I,
17+
}
18+
19+
/// Helper that sets a bool to `true` if dropped while unwinding.
20+
#[derive(Clone)]
21+
struct Fuse<'a>(&'a AtomicBool);
22+
23+
impl<'a> Drop for Fuse<'a> {
24+
#[inline]
25+
fn drop(&mut self) {
26+
if thread::panicking() {
27+
self.0.store(true, Ordering::Relaxed);
28+
}
29+
}
30+
}
31+
32+
impl<'a> Fuse<'a> {
33+
#[inline]
34+
fn panicked(&self) -> bool {
35+
self.0.load(Ordering::Relaxed)
36+
}
37+
}
38+
39+
/// Create a new `PanicFuse` iterator.
40+
///
41+
/// NB: a free fn because it is NOT part of the end-user API.
42+
pub fn new<I>(base: I) -> PanicFuse<I>
43+
where
44+
I: ParallelIterator,
45+
{
46+
PanicFuse { base: base }
47+
}
48+
49+
impl<I> ParallelIterator for PanicFuse<I>
50+
where
51+
I: ParallelIterator,
52+
{
53+
type Item = I::Item;
54+
55+
fn drive_unindexed<C>(self, consumer: C) -> C::Result
56+
where
57+
C: UnindexedConsumer<Self::Item>,
58+
{
59+
let panicked = AtomicBool::new(false);
60+
let consumer1 = PanicFuseConsumer {
61+
base: consumer,
62+
fuse: Fuse(&panicked),
63+
};
64+
self.base.drive_unindexed(consumer1)
65+
}
66+
67+
fn opt_len(&self) -> Option<usize> {
68+
self.base.opt_len()
69+
}
70+
}
71+
72+
impl<I> IndexedParallelIterator for PanicFuse<I>
73+
where
74+
I: IndexedParallelIterator,
75+
{
76+
fn drive<C>(self, consumer: C) -> C::Result
77+
where
78+
C: Consumer<Self::Item>,
79+
{
80+
let panicked = AtomicBool::new(false);
81+
let consumer1 = PanicFuseConsumer {
82+
base: consumer,
83+
fuse: Fuse(&panicked),
84+
};
85+
self.base.drive(consumer1)
86+
}
87+
88+
fn len(&self) -> usize {
89+
self.base.len()
90+
}
91+
92+
fn with_producer<CB>(self, callback: CB) -> CB::Output
93+
where
94+
CB: ProducerCallback<Self::Item>,
95+
{
96+
return self.base.with_producer(Callback { callback: callback });
97+
98+
struct Callback<CB> {
99+
callback: CB,
100+
}
101+
102+
impl<T, CB> ProducerCallback<T> for Callback<CB>
103+
where
104+
CB: ProducerCallback<T>,
105+
{
106+
type Output = CB::Output;
107+
108+
fn callback<P>(self, base: P) -> CB::Output
109+
where
110+
P: Producer<Item = T>,
111+
{
112+
let panicked = AtomicBool::new(false);
113+
let producer = PanicFuseProducer {
114+
base: base,
115+
fuse: Fuse(&panicked),
116+
};
117+
self.callback.callback(producer)
118+
}
119+
}
120+
}
121+
}
122+
123+
/// ////////////////////////////////////////////////////////////////////////
124+
/// Producer implementation
125+
126+
struct PanicFuseProducer<'a, P> {
127+
base: P,
128+
fuse: Fuse<'a>,
129+
}
130+
131+
impl<'a, P> Producer for PanicFuseProducer<'a, P>
132+
where
133+
P: Producer,
134+
{
135+
type Item = P::Item;
136+
type IntoIter = PanicFuseIter<'a, P::IntoIter>;
137+
138+
fn into_iter(self) -> Self::IntoIter {
139+
PanicFuseIter {
140+
base: self.base.into_iter(),
141+
fuse: self.fuse,
142+
}
143+
}
144+
145+
fn min_len(&self) -> usize {
146+
self.base.min_len()
147+
}
148+
fn max_len(&self) -> usize {
149+
self.base.max_len()
150+
}
151+
152+
fn split_at(self, index: usize) -> (Self, Self) {
153+
let (left, right) = self.base.split_at(index);
154+
(
155+
PanicFuseProducer {
156+
base: left,
157+
fuse: self.fuse.clone(),
158+
},
159+
PanicFuseProducer {
160+
base: right,
161+
fuse: self.fuse,
162+
},
163+
)
164+
}
165+
166+
fn fold_with<G>(self, folder: G) -> G
167+
where
168+
G: Folder<Self::Item>,
169+
{
170+
let folder1 = PanicFuseFolder {
171+
base: folder,
172+
fuse: self.fuse,
173+
};
174+
self.base.fold_with(folder1).base
175+
}
176+
}
177+
178+
struct PanicFuseIter<'a, I> {
179+
base: I,
180+
fuse: Fuse<'a>,
181+
}
182+
183+
impl<'a, I> Iterator for PanicFuseIter<'a, I>
184+
where
185+
I: Iterator,
186+
{
187+
type Item = I::Item;
188+
189+
fn next(&mut self) -> Option<Self::Item> {
190+
if self.fuse.panicked() {
191+
None
192+
} else {
193+
self.base.next()
194+
}
195+
}
196+
197+
fn size_hint(&self) -> (usize, Option<usize>) {
198+
self.base.size_hint()
199+
}
200+
}
201+
202+
impl<'a, I> DoubleEndedIterator for PanicFuseIter<'a, I>
203+
where
204+
I: DoubleEndedIterator,
205+
{
206+
fn next_back(&mut self) -> Option<Self::Item> {
207+
if self.fuse.panicked() {
208+
None
209+
} else {
210+
self.base.next_back()
211+
}
212+
}
213+
}
214+
215+
impl<'a, I> ExactSizeIterator for PanicFuseIter<'a, I>
216+
where
217+
I: ExactSizeIterator,
218+
{
219+
fn len(&self) -> usize {
220+
self.base.len()
221+
}
222+
}
223+
224+
/// ////////////////////////////////////////////////////////////////////////
225+
/// Consumer implementation
226+
227+
struct PanicFuseConsumer<'a, C> {
228+
base: C,
229+
fuse: Fuse<'a>,
230+
}
231+
232+
impl<'a, T, C> Consumer<T> for PanicFuseConsumer<'a, C>
233+
where
234+
C: Consumer<T>,
235+
{
236+
type Folder = PanicFuseFolder<'a, C::Folder>;
237+
type Reducer = PanicFuseReducer<'a, C::Reducer>;
238+
type Result = C::Result;
239+
240+
fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
241+
let (left, right, reducer) = self.base.split_at(index);
242+
(
243+
PanicFuseConsumer {
244+
base: left,
245+
fuse: self.fuse.clone(),
246+
},
247+
PanicFuseConsumer {
248+
base: right,
249+
fuse: self.fuse.clone(),
250+
},
251+
PanicFuseReducer {
252+
base: reducer,
253+
_fuse: self.fuse,
254+
},
255+
)
256+
}
257+
258+
fn into_folder(self) -> Self::Folder {
259+
PanicFuseFolder {
260+
base: self.base.into_folder(),
261+
fuse: self.fuse,
262+
}
263+
}
264+
265+
fn full(&self) -> bool {
266+
self.fuse.panicked() || self.base.full()
267+
}
268+
}
269+
270+
impl<'a, T, C> UnindexedConsumer<T> for PanicFuseConsumer<'a, C>
271+
where
272+
C: UnindexedConsumer<T>,
273+
{
274+
fn split_off_left(&self) -> Self {
275+
PanicFuseConsumer {
276+
base: self.base.split_off_left(),
277+
fuse: self.fuse.clone(),
278+
}
279+
}
280+
281+
fn to_reducer(&self) -> Self::Reducer {
282+
PanicFuseReducer {
283+
base: self.base.to_reducer(),
284+
_fuse: self.fuse.clone(),
285+
}
286+
}
287+
}
288+
289+
struct PanicFuseFolder<'a, C> {
290+
base: C,
291+
fuse: Fuse<'a>,
292+
}
293+
294+
impl<'a, T, C> Folder<T> for PanicFuseFolder<'a, C>
295+
where
296+
C: Folder<T>,
297+
{
298+
type Result = C::Result;
299+
300+
fn consume(mut self, item: T) -> Self {
301+
self.base = self.base.consume(item);
302+
self
303+
}
304+
305+
fn consume_iter<I>(mut self, iter: I) -> Self
306+
where
307+
I: IntoIterator<Item = T>,
308+
{
309+
self.base = {
310+
let fuse = &self.fuse;
311+
let iter = iter.into_iter().take_while(move |_| !fuse.panicked());
312+
self.base.consume_iter(iter)
313+
};
314+
self
315+
}
316+
317+
fn complete(self) -> C::Result {
318+
self.base.complete()
319+
}
320+
321+
fn full(&self) -> bool {
322+
self.fuse.panicked() || self.base.full()
323+
}
324+
}
325+
326+
struct PanicFuseReducer<'a, C> {
327+
base: C,
328+
_fuse: Fuse<'a>,
329+
}
330+
331+
impl<'a, T, C> Reducer<T> for PanicFuseReducer<'a, C>
332+
where
333+
C: Reducer<T>,
334+
{
335+
fn reduce(self, left: T, right: T) -> T {
336+
self.base.reduce(left, right)
337+
}
338+
}

0 commit comments

Comments
 (0)