Skip to content

Commit 68dd199

Browse files
committed
---
yaml --- r: 31400 b: refs/heads/dist-snap c: 4f29814 h: refs/heads/master v: v3
1 parent 7631c33 commit 68dd199

File tree

3 files changed

+177
-8
lines changed

3 files changed

+177
-8
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,6 @@ refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105
77
refs/heads/ndm: f3868061cd7988080c30d6d5bf352a5a5fe2460b
88
refs/heads/try2: d0c6ce338884ee21843f4b40bf6bf18d222ce5df
99
refs/heads/incoming: d9317a174e434d4c99fc1a37fd7dc0d2f5328d37
10-
refs/heads/dist-snap: d74fb9875bc01a96359127837c5f2b71de7af93a
10+
refs/heads/dist-snap: 4f29814f2a2503f9abb2ccb1e3c286dbf8e8a0d0
1111
refs/tags/release-0.2: c870d2dffb391e14efb05aa27898f1f6333a9596
1212
refs/tags/release-0.3: b5f0d0f648d9a6153664837026ba1be43d3e2503

branches/dist-snap/src/libcore/pipes.rs

Lines changed: 70 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ import arc::methods;
77

88
// Things used by code generated by the pipe compiler.
99
export entangle, get_buffer, drop_buffer;
10+
export send_packet_buffered, recv_packet_buffered;
11+
export mk_packet;
12+
13+
// export these so we can find them in the buffer_resource
14+
// destructor. This is probably another metadata bug.
15+
export atomic_add_acq, atomic_sub_rel;
1016

1117
// User-level things
1218
export send_packet, recv_packet, send, recv, try_recv, peek;
@@ -71,7 +77,7 @@ higher level buffer structure. Packets can maintain a pointer to their
7177
buffer, and this is the part that gets freed.
7278
7379
It might be helpful to have some idea of a semi-unique pointer (like
74-
being partially pregnant, also like an ARC).
80+
being partially pregnant, also like an ARC).
7581
7682
*/
7783

@@ -87,7 +93,7 @@ class buffer_header {
8793
// get away with restricting it to 0 or 1, if we're careful.
8894
let mut ref_count: int;
8995

90-
new() { self.ref_count = 1; }
96+
new() { self.ref_count = 0; }
9197

9298
// We may want a drop, and to be careful about stringing this
9399
// thing along.
@@ -134,13 +140,24 @@ class packet_header {
134140
assert self.buffer.is_not_null();
135141
reinterpret_cast(self.buffer)
136142
}
143+
144+
fn set_buffer<T: send>(b: ~buffer<T>) unsafe {
145+
self.buffer = reinterpret_cast(b);
146+
}
137147
}
138148

139149
type packet<T: send> = {
140150
header: packet_header,
141151
mut payload: option<T>,
142152
};
143153

154+
fn mk_packet<T: send>() -> packet<T> {
155+
{
156+
header: packet_header(),
157+
mut payload: none
158+
}
159+
}
160+
144161
fn unibuffer<T: send>() -> ~buffer<packet<T>> {
145162
let b = ~{
146163
header: buffer_header(),
@@ -170,12 +187,25 @@ extern mod rusti {
170187
fn atomic_xchng(&dst: int, src: int) -> int;
171188
fn atomic_xchng_acq(&dst: int, src: int) -> int;
172189
fn atomic_xchng_rel(&dst: int, src: int) -> int;
190+
191+
fn atomic_add_acq(&dst: int, src: int) -> int;
192+
fn atomic_sub_rel(&dst: int, src: int) -> int;
173193
}
174194

195+
// If I call the rusti versions directly from a polymorphic function,
196+
// I get link errors. This is a bug that needs investigated more.
175197
fn atomic_xchng_rel(&dst: int, src: int) -> int {
176198
rusti::atomic_xchng_rel(dst, src)
177199
}
178200

201+
fn atomic_add_acq(&dst: int, src: int) -> int {
202+
rusti::atomic_add_acq(dst, src)
203+
}
204+
205+
fn atomic_sub_rel(&dst: int, src: int) -> int {
206+
rusti::atomic_sub_rel(dst, src)
207+
}
208+
179209
type rust_task = libc::c_void;
180210

181211
extern mod rustrt {
@@ -222,13 +252,21 @@ unsafe fn get_buffer<T: send>(p: *packet_header) -> ~buffer<T> {
222252
class buffer_resource<T: send> {
223253
let buffer: ~buffer<T>;
224254
new(+b: ~buffer<T>) {
255+
let p = ptr::addr_of(*b);
256+
#error("take %?", p);
257+
atomic_add_acq(b.header.ref_count, 1);
225258
self.buffer = b;
226259
}
227260

228261
drop unsafe {
229262
let b = move!{self.buffer};
230-
let old_count = atomic_xchng_rel(b.header.ref_count, 0);
231-
if old_count == 0 {
263+
let p = ptr::addr_of(*b);
264+
#error("drop %?", p);
265+
let old_count = atomic_sub_rel(b.header.ref_count, 1);
266+
//let old_count = atomic_xchng_rel(b.header.ref_count, 0);
267+
if old_count == 1 {
268+
// The new count is 0.
269+
232270
// go go gadget drop glue
233271
}
234272
else {
@@ -237,7 +275,8 @@ class buffer_resource<T: send> {
237275
}
238276
}
239277

240-
fn send<T: send>(-p: send_packet<T>, -payload: T) {
278+
fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
279+
-payload: T) {
241280
let header = p.header();
242281
let p_ = p.unwrap();
243282
let p = unsafe { &*p_ };
@@ -273,11 +312,13 @@ fn send<T: send>(-p: send_packet<T>, -payload: T) {
273312
}
274313
}
275314

276-
fn recv<T: send>(-p: recv_packet<T>) -> T {
315+
fn recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>) -> T {
277316
option::unwrap(try_recv(p))
278317
}
279318

280-
fn try_recv<T: send>(-p: recv_packet<T>) -> option<T> {
319+
fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
320+
-> option<T>
321+
{
281322
let p_ = p.unwrap();
282323
let p = unsafe { &*p_ };
283324
let this = rustrt::rust_get_task();
@@ -498,6 +539,10 @@ class send_packet_buffered<T: send, Tbuffer: send> {
498539
p <-> self.p;
499540
sender_terminate(option::unwrap(p))
500541
}
542+
unsafe { #error("send_drop: %?",
543+
if self.buffer == none {
544+
"none"
545+
} else { "some" }); }
501546
}
502547
fn unwrap() -> *packet<T> {
503548
let mut p = none;
@@ -518,6 +563,13 @@ class send_packet_buffered<T: send, Tbuffer: send> {
518563
none { fail ~"packet already consumed" }
519564
}
520565
}
566+
567+
fn reuse_buffer() -> buffer_resource<Tbuffer> {
568+
#error("send reuse_buffer");
569+
let mut tmp = none;
570+
tmp <-> self.buffer;
571+
option::unwrap(tmp)
572+
}
521573
}
522574

523575
type recv_packet<T: send> = recv_packet_buffered<T, packet<T>>;
@@ -547,6 +599,10 @@ class recv_packet_buffered<T: send, Tbuffer: send> : selectable {
547599
p <-> self.p;
548600
receiver_terminate(option::unwrap(p))
549601
}
602+
unsafe { #error("recv_drop: %?",
603+
if self.buffer == none {
604+
"none"
605+
} else { "some" }); }
550606
}
551607
fn unwrap() -> *packet<T> {
552608
let mut p = none;
@@ -567,6 +623,13 @@ class recv_packet_buffered<T: send, Tbuffer: send> : selectable {
567623
none { fail ~"packet already consumed" }
568624
}
569625
}
626+
627+
fn reuse_buffer() -> buffer_resource<Tbuffer> {
628+
#error("recv reuse_buffer");
629+
let mut tmp = none;
630+
tmp <-> self.buffer;
631+
option::unwrap(tmp)
632+
}
570633
}
571634

572635
fn entangle<T: send>() -> (send_packet<T>, recv_packet<T>) {
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Ping-pong is a bounded protocol. This is place where I can
2+
// experiment with what code the compiler should generate for bounded
3+
// protocols.
4+
5+
// xfail-pretty
6+
7+
// This was generated initially by the pipe compiler, but it's been
8+
// modified in hopefully straightforward ways.
9+
mod pingpong {
10+
import pipes::*;
11+
12+
type packets = {
13+
ping: packet<ping>,
14+
pong: packet<pong>,
15+
};
16+
17+
fn init() -> (client::ping, server::ping) {
18+
let buffer = ~{
19+
header: buffer_header(),
20+
data: {
21+
ping: mk_packet::<ping>(),
22+
pong: mk_packet::<pong>()
23+
}
24+
};
25+
unsafe {
26+
buffer.data.ping.header.set_buffer(buffer);
27+
buffer.data.pong.header.set_buffer(buffer);
28+
}
29+
let client = send_packet_buffered(ptr::addr_of(buffer.data.ping));
30+
let server = recv_packet_buffered(ptr::addr_of(buffer.data.ping));
31+
(client, server)
32+
}
33+
enum ping = server::pong;
34+
enum pong = client::ping;
35+
mod client {
36+
fn ping(+pipe: ping) -> pong {
37+
{
38+
let b = pipe.reuse_buffer();
39+
let s = send_packet_buffered(ptr::addr_of(b.buffer.data.pong));
40+
let c = recv_packet_buffered(ptr::addr_of(b.buffer.data.pong));
41+
let message = pingpong::ping(s);
42+
pipes::send(pipe, message);
43+
c
44+
}
45+
}
46+
type ping = pipes::send_packet_buffered<pingpong::ping,
47+
pingpong::packets>;
48+
type pong = pipes::recv_packet_buffered<pingpong::pong,
49+
pingpong::packets>;
50+
}
51+
mod server {
52+
type ping = pipes::recv_packet_buffered<pingpong::ping,
53+
pingpong::packets>;
54+
fn pong(+pipe: pong) -> ping {
55+
{
56+
let b = pipe.reuse_buffer();
57+
let s = send_packet_buffered(ptr::addr_of(b.buffer.data.ping));
58+
let c = recv_packet_buffered(ptr::addr_of(b.buffer.data.ping));
59+
let message = pingpong::pong(s);
60+
pipes::send(pipe, message);
61+
c
62+
}
63+
}
64+
type pong = pipes::send_packet_buffered<pingpong::pong,
65+
pingpong::packets>;
66+
}
67+
}
68+
69+
mod test {
70+
import pipes::recv;
71+
import pingpong::{ping, pong};
72+
73+
fn client(-chan: pingpong::client::ping) {
74+
import pingpong::client;
75+
76+
let chan = client::ping(chan); ret;
77+
log(error, "Sent ping");
78+
let pong(_chan) = recv(chan);
79+
log(error, "Received pong");
80+
}
81+
82+
fn server(-chan: pingpong::server::ping) {
83+
import pingpong::server;
84+
85+
let ping(chan) = recv(chan); ret;
86+
log(error, "Received ping");
87+
let _chan = server::pong(chan);
88+
log(error, "Sent pong");
89+
}
90+
}
91+
92+
fn main() {
93+
let (client_, server_) = pingpong::init();
94+
let client_ = ~mut some(client_);
95+
let server_ = ~mut some(server_);
96+
do task::spawn |move client_| {
97+
let mut client__ = none;
98+
*client_ <-> client__;
99+
test::client(option::unwrap(client__));
100+
};
101+
do task::spawn |move server_| {
102+
let mut server_ˊ = none;
103+
*server_ <-> server_ˊ;
104+
test::server(option::unwrap(server_ˊ));
105+
};
106+
}

0 commit comments

Comments
 (0)