Skip to content

Commit c149e96

Browse files
committed
Merge branch 'master' into rust-1.26
2 parents f1e5933 + 021d15f commit c149e96

File tree

14 files changed

+505
-18
lines changed

14 files changed

+505
-18
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,4 @@ rand = "0.6"
3333
rand_xorshift = "0.1"
3434
serde = "1"
3535
serde_derive = "1"
36+
doc-comment = "0.3"

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,14 @@ command to get a visualization of an nbody simulation. To see the
104104
effect of using Rayon, press `s` to run sequentially and `p` to run in
105105
parallel.
106106

107-
```
107+
```text
108108
> cd rayon-demo
109109
> cargo run --release -- nbody visualize
110110
```
111111

112112
For more information on demos, try:
113113

114-
```
114+
```text
115115
> cd rayon-demo
116116
> cargo run --release -- --help
117117
```

rayon-core/src/compile_fail/scope_join_bad.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*! ```compile_fail,E0597
1+
/*! ```compile_fail,E0373
22
33
fn bad_scope<F>(f: F)
44
where F: FnOnce(&i32) + Send,

rayon-futures/src/compile_fail/future_escape.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/*! ```compile_fail,E0501,E0382,E0597
1+
/*! ```compile_fail,E0382,E0501,E0503,E0716
22
33
extern crate futures;
44
extern crate rayon_core;

src/compile_fail/must_use.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ must_use! {
4949
map /** v.par_iter().map(|x| x); */
5050
map_with /** v.par_iter().map_with(0, |_, x| x); */
5151
map_init /** v.par_iter().map_init(|| 0, |_, x| x); */
52+
panic_fuse /** v.par_iter().panic_fuse(); */
5253
rev /** v.par_iter().rev(); */
5354
skip /** v.par_iter().skip(1); */
5455
take /** v.par_iter().take(1); */

src/iter/collect/consumer.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ use std::sync::atomic::{AtomicUsize, Ordering};
77
pub(super) struct CollectConsumer<'c, T: Send + 'c> {
88
/// Tracks how many items we successfully wrote. Used to guarantee
99
/// safety in the face of panics or buggy parallel iterators.
10+
///
11+
/// In theory we could just produce this as a `CollectConsumer::Result`,
12+
/// folding local counts and reducing by addition, but that requires a
13+
/// certain amount of trust that the producer driving this will behave
14+
/// itself. Since this count is important to the safety of marking the
15+
/// memory initialized (`Vec::set_len`), we choose to keep it internal.
1016
writes: &'c AtomicUsize,
1117

1218
/// A slice covering the target memory, not yet initialized!
@@ -82,7 +88,8 @@ impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> {
8288
}
8389

8490
fn complete(self) {
85-
assert!(self.target.len() == 0, "too few values pushed to consumer");
91+
// NB: We don't explicitly check that the local writes were complete,
92+
// but `Collect::complete()` will assert the global write count.
8693

8794
// track total values written
8895
self.global_writes

src/iter/collect/test.rs

Lines changed: 64 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
use super::Collect;
99
use iter::plumbing::*;
10+
use rayon_core::join;
1011

1112
/// Promises to produce 2 items, but then produces 3. Does not do any
1213
/// splits at all.
@@ -25,15 +26,18 @@ fn produce_too_many_items() {
2526
/// Produces fewer items than promised. Does not do any
2627
/// splits at all.
2728
#[test]
28-
#[should_panic(expected = "too few values")]
29+
#[should_panic(expected = "expected 5 total writes, but got 2")]
2930
fn produce_fewer_items() {
3031
let mut v = vec![];
3132
let mut collect = Collect::new(&mut v, 5);
32-
let consumer = collect.as_consumer();
33-
let mut folder = consumer.into_folder();
34-
folder = folder.consume(22);
35-
folder = folder.consume(23);
36-
folder.complete();
33+
{
34+
let consumer = collect.as_consumer();
35+
let mut folder = consumer.into_folder();
36+
folder = folder.consume(22);
37+
folder = folder.consume(23);
38+
folder.complete();
39+
}
40+
collect.complete();
3741
}
3842

3943
// Complete is not called by the consumer.Hence,the collection vector is not fully initialized.
@@ -129,7 +133,7 @@ fn right_produces_too_many_items() {
129133
// The left consumer produces fewer items while the right
130134
// consumer produces correct number.
131135
#[test]
132-
#[should_panic(expected = "too few values")]
136+
#[should_panic(expected = "expected 4 total writes, but got 3")]
133137
fn left_produces_fewer_items() {
134138
let mut v = vec![];
135139
let mut collect = Collect::new(&mut v, 4);
@@ -149,7 +153,7 @@ fn left_produces_fewer_items() {
149153
// The right consumer produces fewer items while the left
150154
// consumer produces correct number.
151155
#[test]
152-
#[should_panic(expected = "too few values")]
156+
#[should_panic(expected = "expected 4 total writes, but got 3")]
153157
fn right_produces_fewer_items() {
154158
let mut v = vec![];
155159
let mut collect = Collect::new(&mut v, 4);
@@ -165,3 +169,55 @@ fn right_produces_fewer_items() {
165169
}
166170
collect.complete();
167171
}
172+
173+
// The left consumer panics and the right stops short, like `panic_fuse()`.
174+
// We should get the left panic without ever reaching `Collect::complete()`.
175+
#[test]
176+
#[should_panic(expected = "left consumer panic")]
177+
fn left_panics() {
178+
let mut v = vec![];
179+
let mut collect = Collect::new(&mut v, 4);
180+
{
181+
let consumer = collect.as_consumer();
182+
let (left_consumer, right_consumer, _) = consumer.split_at(2);
183+
join(
184+
|| {
185+
let mut left_folder = left_consumer.into_folder();
186+
left_folder = left_folder.consume(0);
187+
panic!("left consumer panic");
188+
},
189+
|| {
190+
let mut right_folder = right_consumer.into_folder();
191+
right_folder = right_folder.consume(2);
192+
right_folder.complete() // early return
193+
},
194+
);
195+
}
196+
collect.complete();
197+
}
198+
199+
// The right consumer panics and the left stops short, like `panic_fuse()`.
200+
// We should get the right panic without ever reaching `Collect::complete()`.
201+
#[test]
202+
#[should_panic(expected = "right consumer panic")]
203+
fn right_panics() {
204+
let mut v = vec![];
205+
let mut collect = Collect::new(&mut v, 4);
206+
{
207+
let consumer = collect.as_consumer();
208+
let (left_consumer, right_consumer, _) = consumer.split_at(2);
209+
join(
210+
|| {
211+
let mut left_folder = left_consumer.into_folder();
212+
left_folder = left_folder.consume(0);
213+
left_folder.complete() // early return
214+
},
215+
|| {
216+
let mut right_folder = right_consumer.into_folder();
217+
right_folder = right_folder.consume(2);
218+
panic!("right consumer panic");
219+
},
220+
);
221+
}
222+
collect.complete();
223+
}

src/iter/mod.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ mod sum;
155155
pub use self::cloned::Cloned;
156156
mod inspect;
157157
pub use self::inspect::Inspect;
158+
mod panic_fuse;
159+
pub use self::panic_fuse::PanicFuse;
158160
mod while_some;
159161
pub use self::while_some::WhileSome;
160162
mod extend;
@@ -1728,6 +1730,40 @@ pub trait ParallelIterator: Sized + Send {
17281730
WhileSome::new(self)
17291731
}
17301732

1733+
/// Wraps an iterator with a fuse in case of panics, to halt all threads
1734+
/// as soon as possible.
1735+
///
1736+
/// Panics within parallel iterators are always propagated to the caller,
1737+
/// but they don't always halt the rest of the iterator right away, due to
1738+
/// the internal semantics of [`join`]. This adaptor makes a greater effort
1739+
/// to stop processing other items sooner, with the cost of additional
1740+
/// synchronization overhead, which may also inhibit some optimizations.
1741+
///
1742+
/// [`join`]: ../fn.join.html#panics
1743+
///
1744+
/// # Examples
1745+
///
1746+
/// If this code didn't use `panic_fuse()`, it would continue processing
1747+
/// many more items in other threads (with long sleep delays) before the
1748+
/// panic is finally propagated.
1749+
///
1750+
/// ```should_panic
1751+
/// use rayon::prelude::*;
1752+
/// use std::{thread, time};
1753+
///
1754+
/// (0..1_000_000)
1755+
/// .into_par_iter()
1756+
/// .panic_fuse()
1757+
/// .for_each(|i| {
1758+
/// // simulate some work
1759+
/// thread::sleep(time::Duration::from_secs(1));
1760+
/// assert!(i > 0); // oops!
1761+
/// });
1762+
/// ```
1763+
fn panic_fuse(self) -> PanicFuse<Self> {
1764+
panic_fuse::new(self)
1765+
}
1766+
17311767
/// Create a fresh collection containing all the element produced
17321768
/// by this parallel iterator.
17331769
///

0 commit comments

Comments
 (0)