Skip to content

Commit 38e2d3b

Browse files
committed
---
yaml --- r: 35033 b: refs/heads/master c: 70886d3 h: refs/heads/master i: 35031: 89099d2 v: v3
1 parent c95500e commit 38e2d3b

File tree

3 files changed

+87
-1
lines changed

3 files changed

+87
-1
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
---
2-
refs/heads/master: c88c969b3e20e5aa2c36aee2e5154afd9a383517
2+
refs/heads/master: 70886d314d0e49fe652f022139b03c2ddaca7b14
33
refs/heads/snap-stage1: e33de59e47c5076a89eadeb38f4934f58a3618a6
44
refs/heads/snap-stage3: eb8fd119c65c67f3b1b8268cc7341c22d39b7b61
55
refs/heads/try: d324a424d8f84b1eb049b12cf34182bda91b0024

trunk/src/libstd/std.rc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub mod sync;
5454
pub mod arc;
5555
pub mod comm;
5656
pub mod future;
57+
pub mod thread_pool;
5758

5859
// Collections
5960

trunk/src/libstd/thread_pool.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/// A thread pool abstraction. Useful for achieving predictable CPU
2+
/// parallelism.
3+
4+
use pipes::{Chan, Port};
5+
use task::{SchedMode, SingleThreaded};
6+
7+
enum Msg<T> {
8+
Execute(~fn(&T)),
9+
Quit
10+
}
11+
12+
pub struct ThreadPool<T> {
13+
channels: ~[Chan<Msg<T>>],
14+
mut next_index: uint,
15+
16+
drop {
17+
for self.channels.each |channel| {
18+
channel.send(Quit);
19+
}
20+
}
21+
}
22+
23+
pub impl<T> ThreadPool<T> {
24+
/// Spawns a new thread pool with `n_tasks` tasks. If the `sched_mode`
25+
/// is None, the tasks run on this scheduler; otherwise, they run on a
26+
/// new scheduler with the given mode. The provided `init_fn_factory`
27+
/// returns a function which, given the index of the task, should return
28+
/// local data to be kept around in that task.
29+
static fn new(n_tasks: uint,
30+
opt_sched_mode: Option<SchedMode>,
31+
init_fn_factory: ~fn() -> ~fn(uint) -> T) -> ThreadPool<T> {
32+
assert n_tasks >= 1;
33+
34+
let channels = do vec::from_fn(n_tasks) |i| {
35+
let (chan, port) = pipes::stream::<Msg<T>>();
36+
let init_fn = init_fn_factory();
37+
38+
let task_body: ~fn() = |move port, move init_fn| {
39+
let local_data = init_fn(i);
40+
loop {
41+
match port.recv() {
42+
Execute(move f) => f(&local_data),
43+
Quit => break
44+
}
45+
}
46+
};
47+
48+
// Start the task.
49+
match opt_sched_mode {
50+
None => {
51+
// Run on this scheduler.
52+
task::spawn(move task_body);
53+
}
54+
Some(sched_mode) => {
55+
task::task().sched_mode(sched_mode).spawn(move task_body);
56+
}
57+
}
58+
59+
move chan
60+
};
61+
62+
return ThreadPool { channels: move channels, next_index: 0 };
63+
}
64+
65+
/// Executes the function `f` on a thread in the pool. The function
66+
/// receives a reference to the local data returned by the `init_fn`.
67+
fn execute(&self, f: ~fn(&T)) {
68+
self.channels[self.next_index].send(Execute(move f));
69+
self.next_index += 1;
70+
if self.next_index == self.channels.len() { self.next_index = 0; }
71+
}
72+
}
73+
74+
#[test]
75+
fn test_thread_pool() {
76+
let f: ~fn() -> ~fn(uint) -> uint = || {
77+
let g: ~fn(uint) -> uint = |i| i;
78+
move g
79+
};
80+
let pool = ThreadPool::new(4, Some(SingleThreaded), move f);
81+
for 8.times {
82+
pool.execute(|i| io::println(fmt!("Hello from thread %u!", *i)));
83+
}
84+
}
85+

0 commit comments

Comments
 (0)