Skip to content

Commit 4d2dd17

Browse files
committed
---
yaml --- r: 12933 b: refs/heads/master c: a785f3f h: refs/heads/master i: 12931: aef0743 v: v3
1 parent 03a0d03 commit 4d2dd17

File tree

4 files changed

+218
-5
lines changed

4 files changed

+218
-5
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: d485f23a1a1879b6c066ae56a1ccfe1f092785fd
2+
refs/heads/master: a785f3fc95752dc2cfced5e7ccef710e189acb9c
33
refs/heads/snap-stage1: e33de59e47c5076a89eadeb38f4934f58a3618a6
44
refs/heads/snap-stage3: 4a81779abd786ff22d71434c6d9a5917ea4cdfff
55
refs/heads/try: 2898dcc5d97da9427ac367542382b6239d9c0bbf

trunk/src/libstd/par.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import comm::port;
2+
import comm::chan;
3+
import comm::send;
4+
import comm::recv;
5+
import task::spawn;
6+
7+
export future;
8+
export map;
9+
export alli;
10+
11+
iface future<T: send> {
12+
fn get() -> T;
13+
}
14+
15+
type future_<T: send> = {
16+
mut slot : option<T>,
17+
port : port<T>,
18+
};
19+
20+
impl<T: send> of future<T> for future_<T> {
21+
fn get() -> T {
22+
alt(self.slot) {
23+
some(x) { x }
24+
none {
25+
let x = recv(self.port);
26+
self.slot = some(x);
27+
x
28+
}
29+
}
30+
}
31+
}
32+
33+
34+
#[doc="Executes a bit of code asynchronously.
35+
36+
Returns a handle that can be used to retrieve the result at your
37+
leisure."]
38+
fn future<T: send>(thunk : fn~() -> T) -> future<T> {
39+
let p = port();
40+
let c = chan(p);
41+
42+
spawn() {||
43+
send(c, thunk());
44+
}
45+
46+
{mut slot: none::<T>, port : p} as future::<T>
47+
}
48+
49+
#[doc="The maximum number of tasks this module will spawn for a single
50+
operationg."]
51+
const max_tasks : uint = 32u;
52+
53+
#[doc="The minimum number of elements each task will process."]
54+
const min_granularity : uint = 1024u;
55+
56+
#[doc="An internal helper to map a function over a large vector and
57+
return the intermediate results.
58+
59+
This is used to build most of the other parallel vector functions,
60+
like map or alli."]
61+
fn map_slices<A: send, B: send>(xs: [A], f: fn~(uint, [A]) -> B) -> [B] {
62+
let len = xs.len();
63+
if len < min_granularity {
64+
// This is a small vector, fall back on the normal map.
65+
[f(0u, xs)]
66+
}
67+
else {
68+
let num_tasks = uint::min(max_tasks, len / min_granularity);
69+
70+
let items_per_task = len / num_tasks;
71+
72+
let mut futures = [];
73+
let mut base = 0u;
74+
while base < len {
75+
let slice = vec::slice(xs, base,
76+
uint::min(len, base + items_per_task));
77+
futures += [future() {|copy base|
78+
f(base, slice)
79+
}];
80+
base += items_per_task;
81+
}
82+
83+
futures.map() {|ys|
84+
ys.get()
85+
}
86+
}
87+
}
88+
89+
#[doc="A parallel version of map."]
90+
fn map<A: send, B: send>(xs: [A], f: fn~(A) -> B) -> [B] {
91+
vec::concat(map_slices(xs) {|_base, slice|
92+
map(slice, f)
93+
})
94+
}
95+
96+
#[doc="Returns true if the function holds for all elements in the vector."]
97+
fn alli<A: send>(xs: [A], f: fn~(uint, A) -> bool) -> bool {
98+
vec::all(map_slices(xs) {|base, slice|
99+
slice.alli() {|i, x|
100+
f(i + base, x)
101+
}
102+
}) {|x| x }
103+
}

trunk/src/libstd/std.rc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap;
1919
export rope, arena;
2020
export ebml, dbg, getopts, json, rand, sha1, term, time, prettyprint;
2121
export test, tempfile, serialization;
22+
export par;
2223

2324
// General io and system-services modules
2425

@@ -58,6 +59,7 @@ mod getopts;
5859
mod json;
5960
mod sha1;
6061
mod md4;
62+
mod par;
6163
mod tempfile;
6264
mod term;
6365
mod time;

trunk/src/test/bench/graph500-bfs.rs

Lines changed: 112 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import std::map;
1010
import std::map::hashmap;
1111
import std::deque;
1212
import std::deque::t;
13+
//import std::par;
1314
import io::writer_util;
1415
import comm::*;
1516
import int::abs;
@@ -221,13 +222,11 @@ fn validate(edges: [(node_id, node_id)],
221222

222223
log(info, "Verifying tree and graph edges...");
223224

224-
let status = tree.alli() {|u, v|
225+
let status = par::alli(tree) {|u, v|
225226
if v == -1 || u as int == root {
226227
true
227228
}
228229
else {
229-
log(info, #fmt("Checking for %? or %?",
230-
(u, v), (v, u)));
231230
edges.contains((u as int, v)) || edges.contains((v, u as int))
232231
}
233232
};
@@ -269,9 +268,118 @@ fn main() {
269268
stop - start));
270269

271270
let start = time::precise_time_s();
272-
assert(validate(graph, edges, root, bfs_tree));
271+
assert(validate(edges, root, bfs_tree));
273272
let stop = time::precise_time_s();
274273

275274
io::stdout().write_line(#fmt("Validation completed in %? seconds.",
276275
stop - start));
277276
}
277+
278+
279+
// par stuff /////////////////////////////////////////////////////////
280+
281+
mod par {
282+
import comm::port;
283+
import comm::chan;
284+
import comm::send;
285+
import comm::recv;
286+
import task::spawn;
287+
288+
iface future<T: send> {
289+
fn get() -> T;
290+
}
291+
292+
type future_<T: send> = {
293+
mut slot : option<T>,
294+
port : port<T>,
295+
};
296+
297+
impl<T: send> of future<T> for future_<T> {
298+
fn get() -> T {
299+
get(self)
300+
}
301+
}
302+
303+
fn get<T: send>(f: future_<T>) -> T {
304+
alt(f.slot) {
305+
some(x) { x }
306+
none {
307+
let x = recv(f.port);
308+
f.slot = some(x);
309+
x
310+
}
311+
}
312+
}
313+
314+
315+
#[doc="Executes a bit of code asynchronously.
316+
317+
Returns a handle that can be used to retrieve the result at your
318+
leisure."]
319+
fn future<T: send>(thunk : fn~() -> T) -> future<T> {
320+
let p = port();
321+
let c = chan(p);
322+
323+
spawn() {||
324+
send(c, thunk());
325+
}
326+
327+
{mut slot: none::<T>, port : p} as future::<T>
328+
}
329+
330+
#[doc="The maximum number of tasks this module will spawn for a single
331+
operationg."]
332+
const max_tasks : uint = 32u;
333+
334+
#[doc="The minimum number of elements each task will process."]
335+
const min_granularity : uint = 1024u;
336+
337+
#[doc="An internal helper to map a function over a large vector and
338+
return the intermediate results.
339+
340+
This is used to build most of the other parallel vector functions,
341+
like map or alli."]
342+
fn map_slices<A: send, B: send>(xs: [A], f: fn~(uint, [A]) -> B) -> [B] {
343+
let len = xs.len();
344+
if len < min_granularity {
345+
// This is a small vector, fall back on the normal map.
346+
[f(0u, xs)]
347+
}
348+
else {
349+
let num_tasks = uint::min(max_tasks, len / min_granularity);
350+
351+
let items_per_task = len / num_tasks;
352+
353+
let mut futures = [];
354+
let mut base = 0u;
355+
while base < len {
356+
let slice = vec::slice(xs, base,
357+
uint::min(len, base + items_per_task));
358+
futures += [future() {|copy base|
359+
f(base, slice)
360+
}];
361+
base += items_per_task;
362+
}
363+
364+
futures.map() {|ys|
365+
ys.get()
366+
}
367+
}
368+
}
369+
370+
#[doc="A parallel version of map."]
371+
fn map<A: send, B: send>(xs: [A], f: fn~(A) -> B) -> [B] {
372+
vec::concat(map_slices(xs) {|_base, slice|
373+
map(slice, f)
374+
})
375+
}
376+
377+
#[doc="Returns true if the function holds for all elements in the vector."]
378+
fn alli<A: send>(xs: [A], f: fn~(uint, A) -> bool) -> bool {
379+
vec::all(map_slices(xs) {|base, slice|
380+
slice.alli() {|i, x|
381+
f(i + base, x)
382+
}
383+
}) {|x| x }
384+
}
385+
}

0 commit comments

Comments
 (0)