Skip to content

Commit c56a38a

Browse files
committed
Added a version of msgsend that uses pipes and select. Here, select is way too slow to be useful, but this can be optimized.
1 parent deb6476 commit c56a38a

File tree

1 file changed

+165
-0
lines changed

1 file changed

+165
-0
lines changed

src/test/bench/msgsend-pipes.rs

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
// A port of the simplistic benchmark from
2+
//
3+
// http://github.com/PaulKeeble/ScalaVErlangAgents
4+
//
5+
// I *think* it's the same, more or less.
6+
7+
use std;
8+
import io::writer;
9+
import io::writer_util;
10+
11+
import pipes::{port, chan};
12+
13+
macro_rules! move {
14+
{ $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
15+
}
16+
17+
enum request {
18+
get_count,
19+
bytes(uint),
20+
stop
21+
}
22+
23+
fn server(requests: port_set<request>, responses: pipes::chan<uint>) {
24+
let mut count = 0u;
25+
let mut done = false;
26+
while !done {
27+
alt requests.try_recv() {
28+
some(get_count) { responses.send(copy count); }
29+
some(bytes(b)) {
30+
//#error("server: received %? bytes", b);
31+
count += b;
32+
}
33+
none { done = true; }
34+
_ { }
35+
}
36+
}
37+
responses.send(count);
38+
//#error("server exiting");
39+
}
40+
41+
fn run(args: &[str]) {
42+
let (to_parent, from_child) = pipes::stream();
43+
let (to_child, from_parent_) = pipes::stream();
44+
let from_parent = port_set();
45+
from_parent.add(from_parent_);
46+
47+
let size = option::get(uint::from_str(args[1]));
48+
let workers = option::get(uint::from_str(args[2]));
49+
let num_bytes = 100;
50+
let start = std::time::precise_time_s();
51+
let mut worker_results = ~[];
52+
for uint::range(0u, workers) |i| {
53+
let builder = task::builder();
54+
vec::push(worker_results, task::future_result(builder));
55+
let (to_child, from_parent_) = pipes::stream();
56+
from_parent.add(from_parent_);
57+
do task::run(builder) {
58+
for uint::range(0u, size / workers) |_i| {
59+
//#error("worker %?: sending %? bytes", i, num_bytes);
60+
to_child.send(bytes(num_bytes));
61+
}
62+
//#error("worker %? exiting", i);
63+
};
64+
}
65+
do task::spawn {
66+
server(from_parent, to_parent);
67+
}
68+
69+
vec::iter(worker_results, |r| { future::get(r); } );
70+
//#error("sending stop message");
71+
to_child.send(stop);
72+
move!{to_child};
73+
let result = from_child.recv();
74+
let end = std::time::precise_time_s();
75+
let elapsed = end - start;
76+
io::stdout().write_str(#fmt("Count is %?\n", result));
77+
io::stdout().write_str(#fmt("Test took %? seconds\n", elapsed));
78+
let thruput = ((size / workers * workers) as float) / (elapsed as float);
79+
io::stdout().write_str(#fmt("Throughput=%f per sec\n", thruput));
80+
assert result == num_bytes * size;
81+
}
82+
83+
fn main(args: ~[str]) {
84+
let args = if os::getenv("RUST_BENCH").is_some() {
85+
~["", "1000000", "10000"]
86+
} else if args.len() <= 1u {
87+
~["", "10000", "4"]
88+
} else {
89+
copy args
90+
};
91+
92+
#debug("%?", args);
93+
run(args);
94+
}
95+
96+
// Treat a whole bunch of ports as one.
97+
class box<T> {
98+
let mut contents: option<T>;
99+
new(+x: T) { self.contents = some(x); }
100+
101+
fn swap(f: fn(+T) -> T) {
102+
let mut tmp = none;
103+
self.contents <-> tmp;
104+
self.contents = some(f(option::unwrap(tmp)));
105+
}
106+
107+
fn unwrap() -> T {
108+
let mut tmp = none;
109+
self.contents <-> tmp;
110+
option::unwrap(tmp)
111+
}
112+
}
113+
114+
class port_set<T: send> {
115+
let ports: box<~[pipes::streamp::server::open<T>]>;
116+
117+
new() { self.ports = box(~[]); }
118+
119+
fn add(+port: pipes::port<T>) {
120+
let pipes::port_(port) <- port;
121+
let mut p = none;
122+
port.endp <-> p;
123+
do self.ports.swap |ports| {
124+
let mut p_ = none;
125+
p <-> p_;
126+
vec::append_one(ports, option::unwrap(p_))
127+
}
128+
}
129+
130+
fn try_recv() -> option<T> {
131+
let mut result = none;
132+
let mut done = false;
133+
while !done {
134+
do self.ports.swap |ports| {
135+
if ports.len() > 0 {
136+
let old_len = ports.len();
137+
let (_, m, ports) = pipes::select(ports);
138+
alt m {
139+
some(pipes::streamp::data(x, next)) {
140+
result = some(move!{x});
141+
done = true;
142+
assert ports.len() == old_len - 1;
143+
vec::append_one(ports, move!{next})
144+
}
145+
none {
146+
//#error("pipe closed");
147+
assert ports.len() == old_len - 1;
148+
ports
149+
}
150+
}
151+
}
152+
else {
153+
//#error("no more pipes");
154+
done = true;
155+
~[]
156+
}
157+
}
158+
}
159+
result
160+
}
161+
162+
fn recv() -> T {
163+
option::unwrap(self.try_recv())
164+
}
165+
}

0 commit comments

Comments
 (0)