Skip to content

Commit 0f07b77

Browse files
committed
---
yaml --- r: 95614 b: refs/heads/dist-snap c: bac9681 h: refs/heads/master v: v3
1 parent c91636a commit 0f07b77

File tree

12 files changed

+603
-87
lines changed

12 files changed

+603
-87
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ refs/heads/try: c274a6888410ce3e357e014568b43310ed787d36
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105
77
refs/heads/ndm: f3868061cd7988080c30d6d5bf352a5a5fe2460b
88
refs/heads/try2: 147ecfdd8221e4a4d4e090486829a06da1e0ca3c
9-
refs/heads/dist-snap: 61f8c059c4c6082683d78b2ee3d963f65fa1eb98
9+
refs/heads/dist-snap: bac96818580a97c049532e50702c2a8204e11754
1010
refs/tags/release-0.2: c870d2dffb391e14efb05aa27898f1f6333a9596
1111
refs/tags/release-0.3: b5f0d0f648d9a6153664837026ba1be43d3e2503
1212
refs/heads/try3: 9387340aab40a73e8424c48fd42f0c521a4875c0

branches/dist-snap/src/libstd/rt/io/net/unix.rs

Lines changed: 268 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,44 +8,296 @@
88
// option. This file may not be copied, modified, or distributed
99
// except according to those terms.
1010

11+
/*!
12+
13+
Named pipes
14+
15+
This module contains the ability to communicate over named pipes with
16+
synchronous I/O. On windows, this corresponds to talking over a Named Pipe,
17+
while on Unix it corresponds to UNIX domain sockets.
18+
19+
These pipes are similar to TCP in the sense that you can have both a stream to a
20+
server and a server itself. The server provided accepts other `UnixStream`
21+
instances as clients.
22+
23+
*/
24+
1125
use prelude::*;
12-
use super::super::*;
26+
1327
use super::super::support::PathLike;
28+
use rt::rtio::{IoFactory, IoFactoryObject, RtioUnixListenerObject};
29+
use rt::rtio::{RtioUnixAcceptorObject, RtioPipeObject, RtioUnixListener};
30+
use rt::rtio::RtioUnixAcceptor;
31+
use rt::io::pipe::PipeStream;
32+
use rt::io::{io_error, Listener, Acceptor, Reader, Writer};
33+
use rt::local::Local;
1434

15-
pub struct UnixStream;
35+
/// A stream which communicates over a named pipe.
36+
pub struct UnixStream {
37+
priv obj: PipeStream,
38+
}
1639

1740
impl UnixStream {
18-
pub fn connect<P: PathLike>(_path: &P) -> Option<UnixStream> {
19-
fail!()
41+
fn new(obj: ~RtioPipeObject) -> UnixStream {
42+
UnixStream { obj: PipeStream::new_bound(obj) }
43+
}
44+
45+
/// Connect to a pipe named by `path`. This will attempt to open a
46+
/// connection to the underlying socket.
47+
///
48+
/// The returned stream will be closed when the object falls out of scope.
49+
///
50+
/// # Failure
51+
///
52+
/// This function will raise on the `io_error` condition if the connection
53+
/// could not be made.
54+
///
55+
/// # Example
56+
///
57+
/// use std::rt::io::net::unix::UnixStream;
58+
///
59+
/// let server = Path("path/to/my/socket");
60+
/// let mut stream = UnixStream::connect(&server);
61+
/// stream.write([1, 2, 3]);
62+
///
63+
pub fn connect<P: PathLike>(path: &P) -> Option<UnixStream> {
64+
let pipe = unsafe {
65+
let io: *mut IoFactoryObject = Local::unsafe_borrow();
66+
(*io).unix_connect(path)
67+
};
68+
69+
match pipe {
70+
Ok(s) => Some(UnixStream::new(s)),
71+
Err(ioerr) => {
72+
io_error::cond.raise(ioerr);
73+
None
74+
}
75+
}
2076
}
2177
}
2278

2379
impl Reader for UnixStream {
24-
fn read(&mut self, _buf: &mut [u8]) -> Option<uint> { fail!() }
25-
26-
fn eof(&mut self) -> bool { fail!() }
80+
fn read(&mut self, buf: &mut [u8]) -> Option<uint> { self.obj.read(buf) }
81+
fn eof(&mut self) -> bool { self.obj.eof() }
2782
}
2883

2984
impl Writer for UnixStream {
30-
fn write(&mut self, _v: &[u8]) { fail!() }
31-
32-
fn flush(&mut self) { fail!() }
85+
fn write(&mut self, buf: &[u8]) { self.obj.write(buf) }
86+
fn flush(&mut self) { self.obj.flush() }
3387
}
3488

35-
pub struct UnixListener;
89+
pub struct UnixListener {
90+
priv obj: ~RtioUnixListenerObject,
91+
}
3692

3793
impl UnixListener {
38-
pub fn bind<P: PathLike>(_path: &P) -> Option<UnixListener> {
39-
fail!()
94+
95+
/// Creates a new listener, ready to receive incoming connections on the
96+
/// specified socket. The server will be named by `path`.
97+
///
98+
/// This listener will be closed when it falls out of scope.
99+
///
100+
/// # Failure
101+
///
102+
/// This function will raise on the `io_error` condition if the specified
103+
/// path could not be bound.
104+
///
105+
/// # Example
106+
///
107+
/// use std::rt::io::net::unix::UnixListener;
108+
///
109+
/// let server = Path("path/to/my/socket");
110+
/// let mut stream = UnixListener::bind(&server);
111+
/// for client in stream.incoming() {
112+
/// let mut client = client;
113+
/// client.write([1, 2, 3, 4]);
114+
/// }
115+
///
116+
pub fn bind<P: PathLike>(path: &P) -> Option<UnixListener> {
117+
let listener = unsafe {
118+
let io: *mut IoFactoryObject = Local::unsafe_borrow();
119+
(*io).unix_bind(path)
120+
};
121+
match listener {
122+
Ok(s) => Some(UnixListener{ obj: s }),
123+
Err(ioerr) => {
124+
io_error::cond.raise(ioerr);
125+
None
126+
}
127+
}
40128
}
41129
}
42130

43131
impl Listener<UnixStream, UnixAcceptor> for UnixListener {
44-
fn listen(self) -> Option<UnixAcceptor> { fail!() }
132+
fn listen(self) -> Option<UnixAcceptor> {
133+
match self.obj.listen() {
134+
Ok(acceptor) => Some(UnixAcceptor { obj: acceptor }),
135+
Err(ioerr) => {
136+
io_error::cond.raise(ioerr);
137+
None
138+
}
139+
}
140+
}
45141
}
46142

47-
pub struct UnixAcceptor;
143+
pub struct UnixAcceptor {
144+
priv obj: ~RtioUnixAcceptorObject,
145+
}
48146

49147
impl Acceptor<UnixStream> for UnixAcceptor {
50-
fn accept(&mut self) -> Option<UnixStream> { fail!() }
148+
fn accept(&mut self) -> Option<UnixStream> {
149+
match self.obj.accept() {
150+
Ok(s) => Some(UnixStream::new(s)),
151+
Err(ioerr) => {
152+
io_error::cond.raise(ioerr);
153+
None
154+
}
155+
}
156+
}
157+
}
158+
159+
#[cfg(test)]
160+
mod tests {
161+
use prelude::*;
162+
use super::*;
163+
use cell::Cell;
164+
use rt::test::*;
165+
use rt::io::*;
166+
use rt::comm::oneshot;
167+
use os;
168+
169+
fn smalltest(server: ~fn(UnixStream), client: ~fn(UnixStream)) {
170+
let server = Cell::new(server);
171+
let client = Cell::new(client);
172+
do run_in_mt_newsched_task {
173+
let server = Cell::new(server.take());
174+
let client = Cell::new(client.take());
175+
let path1 = next_test_unix();
176+
let path2 = path1.clone();
177+
let (port, chan) = oneshot();
178+
let port = Cell::new(port);
179+
let chan = Cell::new(chan);
180+
181+
do spawntask {
182+
let mut acceptor = UnixListener::bind(&path1).listen();
183+
chan.take().send(());
184+
server.take()(acceptor.accept().unwrap());
185+
}
186+
187+
do spawntask {
188+
port.take().recv();
189+
client.take()(UnixStream::connect(&path2).unwrap());
190+
}
191+
}
192+
}
193+
194+
#[test]
195+
fn bind_error() {
196+
do run_in_mt_newsched_task {
197+
let mut called = false;
198+
do io_error::cond.trap(|e| {
199+
assert!(e.kind == PermissionDenied);
200+
called = true;
201+
}).inside {
202+
let listener = UnixListener::bind(&("path/to/nowhere"));
203+
assert!(listener.is_none());
204+
}
205+
assert!(called);
206+
}
207+
}
208+
209+
#[test]
210+
fn connect_error() {
211+
do run_in_mt_newsched_task {
212+
let mut called = false;
213+
do io_error::cond.trap(|e| {
214+
assert_eq!(e.kind, OtherIoError);
215+
called = true;
216+
}).inside {
217+
let stream = UnixStream::connect(&("path/to/nowhere"));
218+
assert!(stream.is_none());
219+
}
220+
assert!(called);
221+
}
222+
}
223+
224+
#[test]
225+
fn smoke() {
226+
smalltest(|mut server| {
227+
let mut buf = [0];
228+
server.read(buf);
229+
assert!(buf[0] == 99);
230+
}, |mut client| {
231+
client.write([99]);
232+
})
233+
}
234+
235+
#[test]
236+
fn read_eof() {
237+
smalltest(|mut server| {
238+
let mut buf = [0];
239+
assert!(server.read(buf).is_none());
240+
assert!(server.read(buf).is_none());
241+
}, |_client| {
242+
// drop the client
243+
})
244+
}
245+
246+
#[test]
247+
fn write_begone() {
248+
smalltest(|mut server| {
249+
let buf = [0];
250+
let mut stop = false;
251+
while !stop{
252+
do io_error::cond.trap(|e| {
253+
assert_eq!(e.kind, BrokenPipe);
254+
stop = true;
255+
}).inside {
256+
server.write(buf);
257+
}
258+
}
259+
}, |_client| {
260+
// drop the client
261+
})
262+
}
263+
264+
#[test]
265+
fn accept_lots() {
266+
do run_in_mt_newsched_task {
267+
let times = 10;
268+
let path1 = next_test_unix();
269+
let path2 = path1.clone();
270+
let (port, chan) = oneshot();
271+
let port = Cell::new(port);
272+
let chan = Cell::new(chan);
273+
274+
do spawntask {
275+
let mut acceptor = UnixListener::bind(&path1).listen();
276+
chan.take().send(());
277+
do times.times {
278+
let mut client = acceptor.accept();
279+
let mut buf = [0];
280+
client.read(buf);
281+
assert_eq!(buf[0], 100);
282+
}
283+
}
284+
285+
do spawntask {
286+
port.take().recv();
287+
do times.times {
288+
let mut stream = UnixStream::connect(&path2);
289+
stream.write([100]);
290+
}
291+
}
292+
}
293+
}
294+
295+
#[test]
296+
fn path_exists() {
297+
do run_in_mt_newsched_task {
298+
let path = next_test_unix();
299+
let _acceptor = UnixListener::bind(&path).listen();
300+
assert!(os::path_exists(&path));
301+
}
302+
}
51303
}

branches/dist-snap/src/libstd/rt/io/pipe.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use rt::rtio::{RtioPipe, RtioPipeObject, IoFactoryObject, IoFactory};
2121
use rt::rtio::RtioUnboundPipeObject;
2222

2323
pub struct PipeStream {
24-
priv obj: RtioPipeObject
24+
priv obj: ~RtioPipeObject
2525
}
2626

2727
// This should not be a newtype, but rt::uv::process::set_stdio needs to reach
@@ -45,7 +45,7 @@ impl PipeStream {
4545
}
4646
}
4747

48-
pub fn bind(inner: RtioPipeObject) -> PipeStream {
48+
pub fn new_bound(inner: ~RtioPipeObject) -> PipeStream {
4949
PipeStream { obj: inner }
5050
}
5151
}

branches/dist-snap/src/libstd/rt/io/process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ impl Process {
100100
Ok((p, io)) => Some(Process{
101101
handle: p,
102102
io: io.move_iter().map(|p|
103-
p.map(|p| io::PipeStream::bind(p))
103+
p.map(|p| io::PipeStream::new_bound(p))
104104
).collect()
105105
}),
106106
Err(ioerr) => {

branches/dist-snap/src/libstd/rt/rtio.rs

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback;
3636
pub type RtioPipeObject = uvio::UvPipeStream;
3737
pub type RtioUnboundPipeObject = uvio::UvUnboundPipe;
3838
pub type RtioProcessObject = uvio::UvProcess;
39+
pub type RtioUnixListenerObject = uvio::UvUnixListener;
40+
pub type RtioUnixAcceptorObject = uvio::UvUnixAcceptor;
3941

4042
pub trait EventLoop {
4143
fn run(&mut self);
@@ -86,7 +88,12 @@ pub trait IoFactory {
8688
Result<~[Path], IoError>;
8789
fn pipe_init(&mut self, ipc: bool) -> Result<~RtioUnboundPipeObject, IoError>;
8890
fn spawn(&mut self, config: ProcessConfig)
89-
-> Result<(~RtioProcessObject, ~[Option<RtioPipeObject>]), IoError>;
91+
-> Result<(~RtioProcessObject, ~[Option<~RtioPipeObject>]), IoError>;
92+
93+
fn unix_bind<P: PathLike>(&mut self, path: &P) ->
94+
Result<~RtioUnixListenerObject, IoError>;
95+
fn unix_connect<P: PathLike>(&mut self, path: &P) ->
96+
Result<~RtioPipeObject, IoError>;
9097
}
9198

9299
pub trait RtioTcpListener : RtioSocket {
@@ -154,3 +161,13 @@ pub trait RtioPipe {
154161
fn read(&mut self, buf: &mut [u8]) -> Result<uint, IoError>;
155162
fn write(&mut self, buf: &[u8]) -> Result<(), IoError>;
156163
}
164+
165+
pub trait RtioUnixListener {
166+
fn listen(self) -> Result<~RtioUnixAcceptorObject, IoError>;
167+
}
168+
169+
pub trait RtioUnixAcceptor {
170+
fn accept(&mut self) -> Result<~RtioPipeObject, IoError>;
171+
fn accept_simultaneously(&mut self) -> Result<(), IoError>;
172+
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;
173+
}

0 commit comments

Comments
 (0)