|
| 1 | +/// A thread pool abstraction. Useful for achieving predictable CPU |
| 2 | +/// parallelism. |
| 3 | +
|
| 4 | +use pipes::{Chan, Port}; |
| 5 | +use task::{SchedMode, SingleThreaded}; |
| 6 | + |
| 7 | +enum Msg<T> { |
| 8 | + Execute(~fn(&T)), |
| 9 | + Quit |
| 10 | +} |
| 11 | + |
| 12 | +pub struct ThreadPool<T> { |
| 13 | + channels: ~[Chan<Msg<T>>], |
| 14 | + mut next_index: uint, |
| 15 | + |
| 16 | + drop { |
| 17 | + for self.channels.each |channel| { |
| 18 | + channel.send(Quit); |
| 19 | + } |
| 20 | + } |
| 21 | +} |
| 22 | + |
| 23 | +pub impl<T> ThreadPool<T> { |
| 24 | + /// Spawns a new thread pool with `n_tasks` tasks. If the `sched_mode` |
| 25 | + /// is None, the tasks run on this scheduler; otherwise, they run on a |
| 26 | + /// new scheduler with the given mode. The provided `init_fn_factory` |
| 27 | + /// returns a function which, given the index of the task, should return |
| 28 | + /// local data to be kept around in that task. |
| 29 | + static fn new(n_tasks: uint, |
| 30 | + opt_sched_mode: Option<SchedMode>, |
| 31 | + init_fn_factory: ~fn() -> ~fn(uint) -> T) -> ThreadPool<T> { |
| 32 | + assert n_tasks >= 1; |
| 33 | + |
| 34 | + let channels = do vec::from_fn(n_tasks) |i| { |
| 35 | + let (chan, port) = pipes::stream::<Msg<T>>(); |
| 36 | + let init_fn = init_fn_factory(); |
| 37 | + |
| 38 | + let task_body: ~fn() = |move port, move init_fn| { |
| 39 | + let local_data = init_fn(i); |
| 40 | + loop { |
| 41 | + match port.recv() { |
| 42 | + Execute(move f) => f(&local_data), |
| 43 | + Quit => break |
| 44 | + } |
| 45 | + } |
| 46 | + }; |
| 47 | + |
| 48 | + // Start the task. |
| 49 | + match opt_sched_mode { |
| 50 | + None => { |
| 51 | + // Run on this scheduler. |
| 52 | + task::spawn(move task_body); |
| 53 | + } |
| 54 | + Some(sched_mode) => { |
| 55 | + task::task().sched_mode(sched_mode).spawn(move task_body); |
| 56 | + } |
| 57 | + } |
| 58 | + |
| 59 | + move chan |
| 60 | + }; |
| 61 | + |
| 62 | + return ThreadPool { channels: move channels, next_index: 0 }; |
| 63 | + } |
| 64 | + |
| 65 | + /// Executes the function `f` on a thread in the pool. The function |
| 66 | + /// receives a reference to the local data returned by the `init_fn`. |
| 67 | + fn execute(&self, f: ~fn(&T)) { |
| 68 | + self.channels[self.next_index].send(Execute(move f)); |
| 69 | + self.next_index += 1; |
| 70 | + if self.next_index == self.channels.len() { self.next_index = 0; } |
| 71 | + } |
| 72 | +} |
| 73 | + |
| 74 | +#[test] |
| 75 | +fn test_thread_pool() { |
| 76 | + let f: ~fn() -> ~fn(uint) -> uint = || { |
| 77 | + let g: ~fn(uint) -> uint = |i| i; |
| 78 | + move g |
| 79 | + }; |
| 80 | + let pool = ThreadPool::new(4, Some(SingleThreaded), move f); |
| 81 | + for 8.times { |
| 82 | + pool.execute(|i| io::println(fmt!("Hello from thread %u!", *i))); |
| 83 | + } |
| 84 | +} |
| 85 | + |
0 commit comments