Skip to content

Commit c1b5c4d

Browse files
committed
Start migrating stream I/O away from ~fn()
1 parent 6690bcb commit c1b5c4d

File tree

7 files changed

+501
-319
lines changed

7 files changed

+501
-319
lines changed

src/librustuv/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,9 @@ pub use self::idle::IdleWatcher;
6666
pub use self::timer::TimerWatcher;
6767
pub use self::async::AsyncWatcher;
6868
pub use self::process::Process;
69-
pub use self::pipe::Pipe;
69+
pub use self::pipe::PipeWatcher;
7070
pub use self::signal::SignalWatcher;
71+
pub use self::tty::TtyWatcher;
7172

7273
mod macros;
7374

@@ -87,6 +88,7 @@ pub mod process;
8788
pub mod pipe;
8889
pub mod tty;
8990
pub mod signal;
91+
pub mod stream;
9092

9193
/// XXX: Loop(*handle) is buggy with destructors. Normal structs
9294
/// with dtors may not be destructured, but tuple structs can,
@@ -218,7 +220,6 @@ pub type ReadCallback = ~fn(StreamWatcher, int, Buf, Option<UvError>);
218220
pub type NullCallback = ~fn();
219221
pub type ConnectionCallback = ~fn(StreamWatcher, Option<UvError>);
220222
pub type FsCallback = ~fn(&mut FsRequest, Option<UvError>);
221-
pub type AsyncCallback = ~fn(AsyncWatcher, Option<UvError>);
222223
pub type UdpReceiveCallback = ~fn(UdpWatcher, int, Buf, SocketAddr, uint, Option<UvError>);
223224
pub type UdpSendCallback = ~fn(UdpWatcher, Option<UvError>);
224225

@@ -231,7 +232,6 @@ struct WatcherData {
231232
connect_cb: Option<ConnectionCallback>,
232233
close_cb: Option<NullCallback>,
233234
alloc_cb: Option<AllocCallback>,
234-
async_cb: Option<AsyncCallback>,
235235
udp_recv_cb: Option<UdpReceiveCallback>,
236236
udp_send_cb: Option<UdpSendCallback>,
237237
}

src/librustuv/pipe.rs

Lines changed: 196 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -8,91 +8,234 @@
88
// option. This file may not be copied, modified, or distributed
99
// except according to those terms.
1010

11-
use std::libc;
1211
use std::c_str::CString;
12+
use std::cast;
13+
use std::libc;
14+
use std::rt::BlockedTask;
15+
use std::rt::io::IoError;
16+
use std::rt::local::Local;
17+
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
18+
use std::rt::sched::{Scheduler, SchedHandle};
19+
use std::rt::tube::Tube;
1320

14-
use super::{Loop, UvError, Watcher, NativeHandle, status_to_maybe_uv_error};
15-
use super::ConnectionCallback;
16-
use net;
21+
use stream::StreamWatcher;
22+
use super::{Loop, UvError, NativeHandle, uv_error_to_io_error, UvHandle};
23+
use uvio::HomingIO;
1724
use uvll;
1825

19-
pub struct Pipe(*uvll::uv_pipe_t);
26+
pub struct PipeWatcher {
27+
stream: StreamWatcher,
28+
home: SchedHandle,
29+
}
30+
31+
pub struct PipeListener {
32+
home: SchedHandle,
33+
pipe: *uvll::uv_pipe_t,
34+
priv closing_task: Option<BlockedTask>,
35+
priv outgoing: Tube<Result<~RtioPipe, IoError>>,
36+
}
37+
38+
pub struct PipeAcceptor {
39+
listener: ~PipeListener,
40+
priv incoming: Tube<Result<~RtioPipe, IoError>>,
41+
}
2042

21-
impl Watcher for Pipe {}
43+
// PipeWatcher implementation and traits
2244

23-
impl Pipe {
24-
pub fn new(loop_: &Loop, ipc: bool) -> Pipe {
45+
impl PipeWatcher {
46+
pub fn new(pipe: *uvll::uv_pipe_t) -> PipeWatcher {
47+
PipeWatcher {
48+
stream: StreamWatcher::new(pipe),
49+
home: get_handle_to_current_scheduler!(),
50+
}
51+
}
52+
53+
pub fn alloc(loop_: &Loop, ipc: bool) -> *uvll::uv_pipe_t {
2554
unsafe {
2655
let handle = uvll::malloc_handle(uvll::UV_NAMED_PIPE);
27-
assert!(handle.is_not_null());
56+
assert!(!handle.is_null());
2857
let ipc = ipc as libc::c_int;
2958
assert_eq!(uvll::uv_pipe_init(loop_.native_handle(), handle, ipc), 0);
30-
let mut ret: Pipe =
31-
NativeHandle::from_native_handle(handle);
32-
ret.install_watcher_data();
33-
ret
59+
handle
3460
}
3561
}
3662

37-
pub fn as_stream(&self) -> net::StreamWatcher {
38-
net::StreamWatcher(**self as *uvll::uv_stream_t)
63+
pub fn open(loop_: &Loop, file: libc::c_int) -> Result<PipeWatcher, UvError>
64+
{
65+
let handle = PipeWatcher::alloc(loop_, false);
66+
match unsafe { uvll::uv_pipe_open(handle, file) } {
67+
0 => Ok(PipeWatcher::new(handle)),
68+
n => {
69+
unsafe { uvll::uv_close(handle, pipe_close_cb) }
70+
Err(UvError(n))
71+
}
72+
}
3973
}
4074

41-
#[fixed_stack_segment] #[inline(never)]
42-
pub fn open(&mut self, file: libc::c_int) -> Result<(), UvError> {
43-
match unsafe { uvll::uv_pipe_open(self.native_handle(), file) } {
44-
0 => Ok(()),
45-
n => Err(UvError(n))
75+
pub fn connect(loop_: &Loop, name: &CString) -> Result<PipeWatcher, UvError>
76+
{
77+
struct Ctx {
78+
task: Option<BlockedTask>,
79+
result: Option<Result<PipeWatcher, UvError>>,
4680
}
81+
let mut cx = Ctx { task: None, result: None };
82+
let req = unsafe { uvll::malloc_req(uvll::UV_CONNECT) };
83+
unsafe { uvll::set_data_for_req(req, &cx as *Ctx) }
84+
85+
let sched: ~Scheduler = Local::take();
86+
do sched.deschedule_running_task_and_then |_, task| {
87+
cx.task = Some(task);
88+
unsafe {
89+
uvll::uv_pipe_connect(req,
90+
PipeWatcher::alloc(loop_, false),
91+
name.with_ref(|p| p),
92+
connect_cb)
93+
}
94+
}
95+
assert!(cx.task.is_none());
96+
return cx.result.take().expect("pipe connect needs a result");
97+
98+
extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {
99+
unsafe {
100+
let cx: &mut Ctx = cast::transmute(uvll::get_data_for_req(req));
101+
let stream = uvll::get_stream_handle_from_connect_req(req);
102+
cx.result = Some(match status {
103+
0 => Ok(PipeWatcher::new(stream)),
104+
n => {
105+
uvll::free_handle(stream);
106+
Err(UvError(n))
107+
}
108+
});
109+
uvll::free_req(req);
110+
111+
let sched: ~Scheduler = Local::take();
112+
sched.resume_blocked_task_immediately(cx.task.take_unwrap());
113+
}
114+
}
115+
}
116+
}
117+
118+
impl RtioPipe for PipeWatcher {
119+
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError> {
120+
let _m = self.fire_missiles();
121+
self.stream.read(buf).map_err(uv_error_to_io_error)
122+
}
123+
124+
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
125+
let _m = self.fire_missiles();
126+
self.stream.write(buf).map_err(uv_error_to_io_error)
127+
}
128+
}
129+
130+
impl HomingIO for PipeWatcher {
131+
fn home<'a>(&'a mut self) -> &'a mut SchedHandle { &mut self.home }
132+
}
133+
134+
impl Drop for PipeWatcher {
135+
fn drop(&mut self) {
136+
let _m = self.fire_missiles();
137+
self.stream.close(true); // close synchronously
47138
}
139+
}
140+
141+
extern fn pipe_close_cb(handle: *uvll::uv_handle_t) {
142+
unsafe { uvll::free_handle(handle) }
143+
}
48144

49-
#[fixed_stack_segment] #[inline(never)]
50-
pub fn bind(&mut self, name: &CString) -> Result<(), UvError> {
51-
do name.with_ref |name| {
52-
match unsafe { uvll::uv_pipe_bind(self.native_handle(), name) } {
53-
0 => Ok(()),
54-
n => Err(UvError(n))
145+
// PipeListener implementation and traits
146+
147+
impl PipeListener {
148+
pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> {
149+
let pipe = PipeWatcher::alloc(loop_, false);
150+
match unsafe { uvll::uv_pipe_bind(pipe, name.with_ref(|p| p)) } {
151+
0 => {
152+
let p = ~PipeListener {
153+
home: get_handle_to_current_scheduler!(),
154+
pipe: pipe,
155+
closing_task: None,
156+
outgoing: Tube::new(),
157+
};
158+
Ok(p.install())
159+
}
160+
n => {
161+
unsafe { uvll::free_handle(pipe) }
162+
Err(UvError(n))
55163
}
56164
}
57165
}
166+
}
58167

59-
#[fixed_stack_segment] #[inline(never)]
60-
pub fn connect(&mut self, name: &CString, cb: ConnectionCallback) {
61-
{
62-
let data = self.get_watcher_data();
63-
assert!(data.connect_cb.is_none());
64-
data.connect_cb = Some(cb);
168+
impl RtioUnixListener for PipeListener {
169+
fn listen(mut ~self) -> Result<~RtioUnixAcceptor, IoError> {
170+
// create the acceptor object from ourselves
171+
let incoming = self.outgoing.clone();
172+
let mut acceptor = ~PipeAcceptor {
173+
listener: self,
174+
incoming: incoming,
175+
};
176+
177+
let _m = acceptor.fire_missiles();
178+
// XXX: the 128 backlog should be configurable
179+
match unsafe { uvll::uv_listen(acceptor.listener.pipe, 128, listen_cb) } {
180+
0 => Ok(acceptor as ~RtioUnixAcceptor),
181+
n => Err(uv_error_to_io_error(UvError(n))),
65182
}
183+
}
184+
}
66185

67-
let connect = net::ConnectRequest::new();
68-
let name = do name.with_ref |p| { p };
186+
impl HomingIO for PipeListener {
187+
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { &mut self.home }
188+
}
69189

70-
unsafe {
71-
uvll::uv_pipe_connect(connect.native_handle(),
72-
self.native_handle(),
73-
name,
74-
connect_cb)
190+
impl UvHandle<uvll::uv_pipe_t> for PipeListener {
191+
fn uv_handle(&self) -> *uvll::uv_pipe_t { self.pipe }
192+
}
193+
194+
extern fn listen_cb(server: *uvll::uv_stream_t, status: libc::c_int) {
195+
let msg = match status {
196+
0 => {
197+
let loop_ = NativeHandle::from_native_handle(unsafe {
198+
uvll::get_loop_for_uv_handle(server)
199+
});
200+
let client = PipeWatcher::alloc(&loop_, false);
201+
assert_eq!(unsafe { uvll::uv_accept(server, client) }, 0);
202+
Ok(~PipeWatcher::new(client) as ~RtioPipe)
75203
}
204+
n => Err(uv_error_to_io_error(UvError(n)))
205+
};
206+
207+
let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&server) };
208+
pipe.outgoing.send(msg);
209+
}
76210

77-
extern "C" fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {
78-
let connect_request: net::ConnectRequest =
79-
NativeHandle::from_native_handle(req);
80-
let mut stream_watcher = connect_request.stream();
81-
connect_request.delete();
211+
impl Drop for PipeListener {
212+
fn drop(&mut self) {
213+
let (_m, sched) = self.fire_missiles_sched();
82214

83-
let cb = stream_watcher.get_watcher_data().connect_cb.take_unwrap();
84-
let status = status_to_maybe_uv_error(status);
85-
cb(stream_watcher, status);
215+
do sched.deschedule_running_task_and_then |_, task| {
216+
self.closing_task = Some(task);
217+
unsafe { uvll::uv_close(self.pipe, listener_close_cb) }
86218
}
87219
}
220+
}
221+
222+
extern fn listener_close_cb(handle: *uvll::uv_handle_t) {
223+
let pipe: &mut PipeListener = unsafe { UvHandle::from_uv_handle(&handle) };
224+
unsafe { uvll::free_handle(handle) }
88225

226+
let sched: ~Scheduler = Local::take();
227+
sched.resume_blocked_task_immediately(pipe.closing_task.take_unwrap());
89228
}
90229

91-
impl NativeHandle<*uvll::uv_pipe_t> for Pipe {
92-
fn from_native_handle(handle: *uvll::uv_pipe_t) -> Pipe {
93-
Pipe(handle)
94-
}
95-
fn native_handle(&self) -> *uvll::uv_pipe_t {
96-
match self { &Pipe(ptr) => ptr }
230+
// PipeAcceptor implementation and traits
231+
232+
impl RtioUnixAcceptor for PipeAcceptor {
233+
fn accept(&mut self) -> Result<~RtioPipe, IoError> {
234+
let _m = self.fire_missiles();
235+
self.incoming.recv()
97236
}
98237
}
238+
239+
impl HomingIO for PipeAcceptor {
240+
fn home<'r>(&'r mut self) -> &'r mut SchedHandle { self.listener.home() }
241+
}

src/librustuv/process.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ use std::rt::sched::{Scheduler, SchedHandle};
2121
use std::vec;
2222

2323
use super::{Loop, NativeHandle, UvHandle, UvError, uv_error_to_io_error};
24-
use uvio::{HomingIO, UvPipeStream, UvUnboundPipe};
24+
use uvio::HomingIO;
2525
use uvll;
26+
use pipe::PipeWatcher;
2627

2728
pub struct Process {
2829
handle: *uvll::uv_process_t,
@@ -42,7 +43,7 @@ impl Process {
4243
/// Returns either the corresponding process object or an error which
4344
/// occurred.
4445
pub fn spawn(loop_: &Loop, config: ProcessConfig)
45-
-> Result<(~Process, ~[Option<~UvPipeStream>]), UvError>
46+
-> Result<(~Process, ~[Option<PipeWatcher>]), UvError>
4647
{
4748
let cwd = config.cwd.map(|s| s.to_c_str());
4849
let io = config.io;
@@ -121,7 +122,7 @@ extern fn on_exit(handle: *uvll::uv_process_t,
121122

122123
unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
123124
io: &StdioContainer,
124-
loop_: &Loop) -> Option<~UvPipeStream> {
125+
loop_: &Loop) -> Option<PipeWatcher> {
125126
match *io {
126127
Ignored => {
127128
uvll::set_stdio_container_flags(dst, uvll::STDIO_IGNORE);
@@ -140,11 +141,10 @@ unsafe fn set_stdio(dst: *uvll::uv_stdio_container_t,
140141
if writable {
141142
flags |= uvll::STDIO_WRITABLE_PIPE as libc::c_int;
142143
}
143-
let pipe = UvUnboundPipe::new(loop_);
144-
let handle = pipe.pipe.as_stream().native_handle();
144+
let pipe_handle = PipeWatcher::alloc(loop_, false);
145145
uvll::set_stdio_container_flags(dst, flags);
146-
uvll::set_stdio_container_stream(dst, handle);
147-
Some(~UvPipeStream::new(pipe))
146+
uvll::set_stdio_container_stream(dst, pipe_handle);
147+
Some(PipeWatcher::new(pipe_handle))
148148
}
149149
}
150150
}

0 commit comments

Comments
 (0)