Skip to content

Commit b01f7b8

Browse files
committed
---
yaml --- r: 124276 b: refs/heads/try c: 5a7ecff2d9c362befddac7c4e036b155fae3bd93 h: refs/heads/master v: v3
1 parent 8a78dd5 commit b01f7b8

File tree

10 files changed

+482
-47
lines changed

10 files changed

+482
-47
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
refs/heads/master: 6c35d513cea468b30759b4f78becf28f11a123c0
33
refs/heads/snap-stage1: e33de59e47c5076a89eadeb38f4934f58a3618a6
44
refs/heads/snap-stage3: afbcbbc77ffc6b10053bc543daf7d2e05d68cc01
5-
refs/heads/try: 2eadfe42e58de0263286195e7560cb85337a3847
5+
refs/heads/try: 5a7ecff2d9c362befddac7c4e036b155fae3bd93
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105
77
refs/heads/ndm: f3868061cd7988080c30d6d5bf352a5a5fe2460b
88
refs/heads/try2: 147ecfdd8221e4a4d4e090486829a06da1e0ca3c

branches/try/src/libnative/io/net.rs

Lines changed: 73 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@ use std::mem;
1414
use std::rt::mutex;
1515
use std::rt::rtio;
1616
use std::rt::rtio::{IoResult, IoError};
17+
use std::sync::atomics;
1718

1819
use super::{retry, keep_going};
1920
use super::c;
2021
use super::util;
22+
use super::file::FileDesc;
23+
use super::process;
2124

2225
////////////////////////////////////////////////////////////////////////////////
2326
// sockaddr and misc bindings
@@ -476,9 +479,26 @@ impl TcpListener {
476479
pub fn fd(&self) -> sock_t { self.inner.fd }
477480

478481
pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
482+
try!(util::set_nonblocking(self.fd(), true));
479483
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
480484
-1 => Err(last_error()),
481-
_ => Ok(TcpAcceptor { listener: self, deadline: 0 })
485+
486+
#[cfg(unix)]
487+
_ => {
488+
let (reader, writer) = try!(process::pipe());
489+
try!(util::set_nonblocking(reader.fd(), true));
490+
try!(util::set_nonblocking(writer.fd(), true));
491+
try!(util::set_nonblocking(self.fd(), true));
492+
Ok(TcpAcceptor {
493+
inner: Arc::new(AcceptorInner {
494+
listener: self,
495+
reader: reader,
496+
writer: writer,
497+
closed: atomics::AtomicBool::new(false),
498+
}),
499+
deadline: 0,
500+
})
501+
}
482502
}
483503
}
484504
}
@@ -498,31 +518,46 @@ impl rtio::RtioSocket for TcpListener {
498518
}
499519

500520
pub struct TcpAcceptor {
501-
listener: TcpListener,
521+
inner: Arc<AcceptorInner>,
502522
deadline: u64,
503523
}
504524

525+
#[cfg(unix)]
526+
struct AcceptorInner {
527+
listener: TcpListener,
528+
reader: FileDesc,
529+
writer: FileDesc,
530+
closed: atomics::AtomicBool,
531+
}
532+
505533
impl TcpAcceptor {
506-
pub fn fd(&self) -> sock_t { self.listener.fd() }
534+
pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
507535

536+
#[cfg(unix)]
508537
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
509-
if self.deadline != 0 {
510-
try!(util::await(self.fd(), Some(self.deadline), util::Readable));
511-
}
512-
unsafe {
513-
let mut storage: libc::sockaddr_storage = mem::zeroed();
514-
let storagep = &mut storage as *mut libc::sockaddr_storage;
515-
let size = mem::size_of::<libc::sockaddr_storage>();
516-
let mut size = size as libc::socklen_t;
517-
match retry(|| {
518-
libc::accept(self.fd(),
519-
storagep as *mut libc::sockaddr,
520-
&mut size as *mut libc::socklen_t) as libc::c_int
521-
}) as sock_t {
522-
-1 => Err(last_error()),
523-
fd => Ok(TcpStream::new(Inner::new(fd))),
538+
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
539+
540+
while !self.inner.closed.load(atomics::SeqCst) {
541+
unsafe {
542+
let mut storage: libc::sockaddr_storage = mem::zeroed();
543+
let storagep = &mut storage as *mut libc::sockaddr_storage;
544+
let size = mem::size_of::<libc::sockaddr_storage>();
545+
let mut size = size as libc::socklen_t;
546+
match retry(|| {
547+
libc::accept(self.fd(),
548+
storagep as *mut libc::sockaddr,
549+
&mut size as *mut libc::socklen_t) as libc::c_int
550+
}) as sock_t {
551+
-1 if util::wouldblock() => {}
552+
-1 => return Err(last_error()),
553+
fd => return Ok(TcpStream::new(Inner::new(fd))),
554+
}
524555
}
556+
try!(util::await([self.fd(), self.inner.reader.fd()],
557+
deadline, util::Readable));
525558
}
559+
560+
Err(util::eof())
526561
}
527562
}
528563

@@ -542,6 +577,24 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
542577
fn set_timeout(&mut self, timeout: Option<u64>) {
543578
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
544579
}
580+
581+
fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
582+
box TcpAcceptor {
583+
inner: self.inner.clone(),
584+
deadline: 0,
585+
} as Box<rtio::RtioTcpAcceptor + Send>
586+
}
587+
588+
#[cfg(unix)]
589+
fn close_accept(&mut self) -> IoResult<()> {
590+
self.inner.closed.store(true, atomics::SeqCst);
591+
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
592+
match fd.inner_write([0]) {
593+
Ok(..) => Ok(()),
594+
Err(..) if util::wouldblock() => Ok(()),
595+
Err(e) => Err(e),
596+
}
597+
}
545598
}
546599

547600
////////////////////////////////////////////////////////////////////////////////
@@ -813,7 +866,7 @@ pub fn read<T>(fd: sock_t,
813866
// With a timeout, first we wait for the socket to become
814867
// readable using select(), specifying the relevant timeout for
815868
// our previously set deadline.
816-
try!(util::await(fd, deadline, util::Readable));
869+
try!(util::await([fd], deadline, util::Readable));
817870

818871
// At this point, we're still within the timeout, and we've
819872
// determined that the socket is readable (as returned by
@@ -867,7 +920,7 @@ pub fn write<T>(fd: sock_t,
867920
while written < buf.len() && (write_everything || written == 0) {
868921
// As with read(), first wait for the socket to be ready for
869922
// the I/O operation.
870-
match util::await(fd, deadline, util::Writable) {
923+
match util::await([fd], deadline, util::Writable) {
871924
Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
872925
assert!(deadline.is_some());
873926
return Err(util::short_write(written, "short write"))

branches/try/src/libnative/io/pipe_unix.rs

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@ use std::mem;
1515
use std::rt::mutex;
1616
use std::rt::rtio;
1717
use std::rt::rtio::{IoResult, IoError};
18+
use std::sync::atomics;
1819

1920
use super::retry;
2021
use super::net;
2122
use super::util;
2223
use super::c;
23-
use super::file::fd_t;
24+
use super::process;
25+
use super::file::{fd_t, FileDesc};
2426

2527
fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
2628
match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
@@ -223,7 +225,23 @@ impl UnixListener {
223225
pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
224226
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
225227
-1 => Err(super::last_error()),
226-
_ => Ok(UnixAcceptor { listener: self, deadline: 0 })
228+
229+
#[cfg(unix)]
230+
_ => {
231+
let (reader, writer) = try!(process::pipe());
232+
try!(util::set_nonblocking(reader.fd(), true));
233+
try!(util::set_nonblocking(writer.fd(), true));
234+
try!(util::set_nonblocking(self.fd(), true));
235+
Ok(UnixAcceptor {
236+
inner: Arc::new(AcceptorInner {
237+
listener: self,
238+
reader: reader,
239+
writer: writer,
240+
closed: atomics::AtomicBool::new(false),
241+
}),
242+
deadline: 0,
243+
})
244+
}
227245
}
228246
}
229247
}
@@ -237,29 +255,45 @@ impl rtio::RtioUnixListener for UnixListener {
237255
}
238256

239257
pub struct UnixAcceptor {
240-
listener: UnixListener,
258+
inner: Arc<AcceptorInner>,
241259
deadline: u64,
242260
}
243261

262+
#[cfg(unix)]
263+
struct AcceptorInner {
264+
listener: UnixListener,
265+
reader: FileDesc,
266+
writer: FileDesc,
267+
closed: atomics::AtomicBool,
268+
}
269+
244270
impl UnixAcceptor {
245-
fn fd(&self) -> fd_t { self.listener.fd() }
271+
fn fd(&self) -> fd_t { self.inner.listener.fd() }
246272

247273
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
248-
if self.deadline != 0 {
249-
try!(util::await(self.fd(), Some(self.deadline), util::Readable));
250-
}
251-
let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
252-
let storagep = &mut storage as *mut libc::sockaddr_storage;
253-
let size = mem::size_of::<libc::sockaddr_storage>();
254-
let mut size = size as libc::socklen_t;
255-
match retry(|| unsafe {
256-
libc::accept(self.fd(),
257-
storagep as *mut libc::sockaddr,
258-
&mut size as *mut libc::socklen_t) as libc::c_int
259-
}) {
260-
-1 => Err(super::last_error()),
261-
fd => Ok(UnixStream::new(Arc::new(Inner::new(fd))))
274+
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
275+
276+
while !self.inner.closed.load(atomics::SeqCst) {
277+
unsafe {
278+
let mut storage: libc::sockaddr_storage = mem::zeroed();
279+
let storagep = &mut storage as *mut libc::sockaddr_storage;
280+
let size = mem::size_of::<libc::sockaddr_storage>();
281+
let mut size = size as libc::socklen_t;
282+
match retry(|| {
283+
libc::accept(self.fd(),
284+
storagep as *mut libc::sockaddr,
285+
&mut size as *mut libc::socklen_t) as libc::c_int
286+
}) {
287+
-1 if util::wouldblock() => {}
288+
-1 => return Err(super::last_error()),
289+
fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
290+
}
291+
}
292+
try!(util::await([self.fd(), self.inner.reader.fd()],
293+
deadline, util::Readable));
262294
}
295+
296+
Err(util::eof())
263297
}
264298
}
265299

@@ -270,6 +304,24 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
270304
fn set_timeout(&mut self, timeout: Option<u64>) {
271305
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
272306
}
307+
308+
fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
309+
box UnixAcceptor {
310+
inner: self.inner.clone(),
311+
deadline: 0,
312+
} as Box<rtio::RtioUnixAcceptor + Send>
313+
}
314+
315+
#[cfg(unix)]
316+
fn close_accept(&mut self) -> IoResult<()> {
317+
self.inner.closed.store(true, atomics::SeqCst);
318+
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
319+
match fd.inner_write([0]) {
320+
Ok(..) => Ok(()),
321+
Err(..) if util::wouldblock() => Ok(()),
322+
Err(e) => Err(e),
323+
}
324+
}
273325
}
274326

275327
impl Drop for UnixListener {

branches/try/src/libnative/io/pipe_win32.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,10 @@ use super::c;
9999
use super::util;
100100
use super::file::to_utf16;
101101

102-
struct Event(libc::HANDLE);
102+
pub struct Event(libc::HANDLE);
103103

104104
impl Event {
105-
fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
105+
pub fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
106106
let event = unsafe {
107107
libc::CreateEventW(ptr::mut_null(),
108108
manual_reset as libc::BOOL,
@@ -116,7 +116,7 @@ impl Event {
116116
}
117117
}
118118

119-
fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
119+
pub fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
120120
}
121121

122122
impl Drop for Event {

branches/try/src/libnative/io/process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl Drop for Process {
191191
}
192192
}
193193

194-
fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> {
194+
pub fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> {
195195
#[cfg(unix)] use ERROR = libc::EMFILE;
196196
#[cfg(windows)] use ERROR = libc::WSAEMFILE;
197197
struct Closer { fd: libc::c_int }

branches/try/src/libnative/io/util.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
// except according to those terms.
1010

1111
use libc;
12+
use std::cmp;
1213
use std::mem;
1314
use std::os;
1415
use std::ptr;
@@ -158,10 +159,15 @@ pub fn connect_timeout(fd: net::sock_t,
158159
}
159160
}
160161

161-
pub fn await(fd: net::sock_t, deadline: Option<u64>,
162+
pub fn await(fds: &[net::sock_t], deadline: Option<u64>,
162163
status: SocketStatus) -> IoResult<()> {
163164
let mut set: c::fd_set = unsafe { mem::zeroed() };
164-
c::fd_set(&mut set, fd);
165+
let mut max = 0;
166+
for &fd in fds.iter() {
167+
c::fd_set(&mut set, fd);
168+
max = cmp::max(max, fd + 1);
169+
}
170+
165171
let (read, write) = match status {
166172
Readable => (&mut set as *mut _, ptr::mut_null()),
167173
Writable => (ptr::mut_null(), &mut set as *mut _),
@@ -180,8 +186,7 @@ pub fn await(fd: net::sock_t, deadline: Option<u64>,
180186
&mut tv as *mut _
181187
}
182188
};
183-
let n = if cfg!(windows) {1} else {fd as libc::c_int + 1};
184-
let r = unsafe { c::select(n, read, write, ptr::mut_null(), tvp) };
189+
let r = unsafe { c::select(max, read, write, ptr::mut_null(), tvp) };
185190
r
186191
}) {
187192
-1 => Err(last_error()),

branches/try/src/librustrt/rtio.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,8 @@ pub trait RtioTcpAcceptor : RtioSocket {
246246
fn accept_simultaneously(&mut self) -> IoResult<()>;
247247
fn dont_accept_simultaneously(&mut self) -> IoResult<()>;
248248
fn set_timeout(&mut self, timeout: Option<u64>);
249+
fn clone(&self) -> Box<RtioTcpAcceptor + Send>;
250+
fn close_accept(&mut self) -> IoResult<()>;
249251
}
250252

251253
pub trait RtioTcpStream : RtioSocket {
@@ -335,6 +337,8 @@ pub trait RtioUnixListener {
335337
pub trait RtioUnixAcceptor {
336338
fn accept(&mut self) -> IoResult<Box<RtioPipe + Send>>;
337339
fn set_timeout(&mut self, timeout: Option<u64>);
340+
fn clone(&self) -> Box<RtioUnixAcceptor + Send>;
341+
fn close_accept(&mut self) -> IoResult<()>;
338342
}
339343

340344
pub trait RtioTTY {

0 commit comments

Comments
 (0)