Skip to content

Commit c669c76

Browse files
committed
---
yaml --- r: 129531 b: refs/heads/snap-stage3 c: 110168d h: refs/heads/master i: 129529: 7c200b8 129527: 3a255b4 v: v3
1 parent e98b320 commit c669c76

File tree

11 files changed

+482
-49
lines changed

11 files changed

+482
-49
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
---
22
refs/heads/master: 566b470e138e929e8a93d613372db1ba177c494f
33
refs/heads/snap-stage1: e33de59e47c5076a89eadeb38f4934f58a3618a6
4-
refs/heads/snap-stage3: 833277e2d2c7cf5fcf995ef413412a21b4756ca7
4+
refs/heads/snap-stage3: 110168de2a7b529a7c4839ca1e19c4c42f68be12
55
refs/heads/try: 80b45ddbd351f0a4a939c3a3c4e20b4defec4b35
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105
77
refs/heads/ndm: f3868061cd7988080c30d6d5bf352a5a5fe2460b

branches/snap-stage3/src/libnative/io/net.rs

Lines changed: 73 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,13 @@ use std::mem;
1414
use std::rt::mutex;
1515
use std::rt::rtio;
1616
use std::rt::rtio::{IoResult, IoError};
17+
use std::sync::atomics;
1718

1819
use super::{retry, keep_going};
1920
use super::c;
2021
use super::util;
22+
use super::file::FileDesc;
23+
use super::process;
2124

2225
////////////////////////////////////////////////////////////////////////////////
2326
// sockaddr and misc bindings
@@ -479,9 +482,26 @@ impl TcpListener {
479482
pub fn fd(&self) -> sock_t { self.inner.fd }
480483

481484
pub fn native_listen(self, backlog: int) -> IoResult<TcpAcceptor> {
485+
try!(util::set_nonblocking(self.fd(), true));
482486
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
483487
-1 => Err(last_error()),
484-
_ => Ok(TcpAcceptor { listener: self, deadline: 0 })
488+
489+
#[cfg(unix)]
490+
_ => {
491+
let (reader, writer) = try!(process::pipe());
492+
try!(util::set_nonblocking(reader.fd(), true));
493+
try!(util::set_nonblocking(writer.fd(), true));
494+
try!(util::set_nonblocking(self.fd(), true));
495+
Ok(TcpAcceptor {
496+
inner: Arc::new(AcceptorInner {
497+
listener: self,
498+
reader: reader,
499+
writer: writer,
500+
closed: atomics::AtomicBool::new(false),
501+
}),
502+
deadline: 0,
503+
})
504+
}
485505
}
486506
}
487507
}
@@ -502,31 +522,46 @@ impl rtio::RtioSocket for TcpListener {
502522
}
503523

504524
pub struct TcpAcceptor {
505-
listener: TcpListener,
525+
inner: Arc<AcceptorInner>,
506526
deadline: u64,
507527
}
508528

529+
#[cfg(unix)]
530+
struct AcceptorInner {
531+
listener: TcpListener,
532+
reader: FileDesc,
533+
writer: FileDesc,
534+
closed: atomics::AtomicBool,
535+
}
536+
509537
impl TcpAcceptor {
510-
pub fn fd(&self) -> sock_t { self.listener.fd() }
538+
pub fn fd(&self) -> sock_t { self.inner.listener.fd() }
511539

540+
#[cfg(unix)]
512541
pub fn native_accept(&mut self) -> IoResult<TcpStream> {
513-
if self.deadline != 0 {
514-
try!(util::await(self.fd(), Some(self.deadline), util::Readable));
515-
}
516-
unsafe {
517-
let mut storage: libc::sockaddr_storage = mem::zeroed();
518-
let storagep = &mut storage as *mut libc::sockaddr_storage;
519-
let size = mem::size_of::<libc::sockaddr_storage>();
520-
let mut size = size as libc::socklen_t;
521-
match retry(|| {
522-
libc::accept(self.fd(),
523-
storagep as *mut libc::sockaddr,
524-
&mut size as *mut libc::socklen_t) as libc::c_int
525-
}) as sock_t {
526-
-1 => Err(last_error()),
527-
fd => Ok(TcpStream::new(Inner::new(fd))),
542+
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
543+
544+
while !self.inner.closed.load(atomics::SeqCst) {
545+
unsafe {
546+
let mut storage: libc::sockaddr_storage = mem::zeroed();
547+
let storagep = &mut storage as *mut libc::sockaddr_storage;
548+
let size = mem::size_of::<libc::sockaddr_storage>();
549+
let mut size = size as libc::socklen_t;
550+
match retry(|| {
551+
libc::accept(self.fd(),
552+
storagep as *mut libc::sockaddr,
553+
&mut size as *mut libc::socklen_t) as libc::c_int
554+
}) as sock_t {
555+
-1 if util::wouldblock() => {}
556+
-1 => return Err(last_error()),
557+
fd => return Ok(TcpStream::new(Inner::new(fd))),
558+
}
528559
}
560+
try!(util::await([self.fd(), self.inner.reader.fd()],
561+
deadline, util::Readable));
529562
}
563+
564+
Err(util::eof())
530565
}
531566
}
532567

@@ -546,6 +581,24 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
546581
fn set_timeout(&mut self, timeout: Option<u64>) {
547582
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
548583
}
584+
585+
fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
586+
box TcpAcceptor {
587+
inner: self.inner.clone(),
588+
deadline: 0,
589+
} as Box<rtio::RtioTcpAcceptor + Send>
590+
}
591+
592+
#[cfg(unix)]
593+
fn close_accept(&mut self) -> IoResult<()> {
594+
self.inner.closed.store(true, atomics::SeqCst);
595+
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
596+
match fd.inner_write([0]) {
597+
Ok(..) => Ok(()),
598+
Err(..) if util::wouldblock() => Ok(()),
599+
Err(e) => Err(e),
600+
}
601+
}
549602
}
550603

551604
////////////////////////////////////////////////////////////////////////////////
@@ -817,7 +870,7 @@ pub fn read<T>(fd: sock_t,
817870
// With a timeout, first we wait for the socket to become
818871
// readable using select(), specifying the relevant timeout for
819872
// our previously set deadline.
820-
try!(util::await(fd, deadline, util::Readable));
873+
try!(util::await([fd], deadline, util::Readable));
821874

822875
// At this point, we're still within the timeout, and we've
823876
// determined that the socket is readable (as returned by
@@ -871,7 +924,7 @@ pub fn write<T>(fd: sock_t,
871924
while written < buf.len() && (write_everything || written == 0) {
872925
// As with read(), first wait for the socket to be ready for
873926
// the I/O operation.
874-
match util::await(fd, deadline, util::Writable) {
927+
match util::await([fd], deadline, util::Writable) {
875928
Err(ref e) if e.code == libc::EOF as uint && written > 0 => {
876929
assert!(deadline.is_some());
877930
return Err(util::short_write(written, "short write"))

branches/snap-stage3/src/libnative/io/pipe_unix.rs

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@ use std::mem;
1515
use std::rt::mutex;
1616
use std::rt::rtio;
1717
use std::rt::rtio::{IoResult, IoError};
18+
use std::sync::atomics;
1819

1920
use super::retry;
2021
use super::net;
2122
use super::util;
2223
use super::c;
23-
use super::file::fd_t;
24+
use super::process;
25+
use super::file::{fd_t, FileDesc};
2426

2527
fn unix_socket(ty: libc::c_int) -> IoResult<fd_t> {
2628
match unsafe { libc::socket(libc::AF_UNIX, ty, 0) } {
@@ -225,7 +227,23 @@ impl UnixListener {
225227
pub fn native_listen(self, backlog: int) -> IoResult<UnixAcceptor> {
226228
match unsafe { libc::listen(self.fd(), backlog as libc::c_int) } {
227229
-1 => Err(super::last_error()),
228-
_ => Ok(UnixAcceptor { listener: self, deadline: 0 })
230+
231+
#[cfg(unix)]
232+
_ => {
233+
let (reader, writer) = try!(process::pipe());
234+
try!(util::set_nonblocking(reader.fd(), true));
235+
try!(util::set_nonblocking(writer.fd(), true));
236+
try!(util::set_nonblocking(self.fd(), true));
237+
Ok(UnixAcceptor {
238+
inner: Arc::new(AcceptorInner {
239+
listener: self,
240+
reader: reader,
241+
writer: writer,
242+
closed: atomics::AtomicBool::new(false),
243+
}),
244+
deadline: 0,
245+
})
246+
}
229247
}
230248
}
231249
}
@@ -240,29 +258,45 @@ impl rtio::RtioUnixListener for UnixListener {
240258
}
241259

242260
pub struct UnixAcceptor {
243-
listener: UnixListener,
261+
inner: Arc<AcceptorInner>,
244262
deadline: u64,
245263
}
246264

265+
#[cfg(unix)]
266+
struct AcceptorInner {
267+
listener: UnixListener,
268+
reader: FileDesc,
269+
writer: FileDesc,
270+
closed: atomics::AtomicBool,
271+
}
272+
247273
impl UnixAcceptor {
248-
fn fd(&self) -> fd_t { self.listener.fd() }
274+
fn fd(&self) -> fd_t { self.inner.listener.fd() }
249275

250276
pub fn native_accept(&mut self) -> IoResult<UnixStream> {
251-
if self.deadline != 0 {
252-
try!(util::await(self.fd(), Some(self.deadline), util::Readable));
253-
}
254-
let mut storage: libc::sockaddr_storage = unsafe { mem::zeroed() };
255-
let storagep = &mut storage as *mut libc::sockaddr_storage;
256-
let size = mem::size_of::<libc::sockaddr_storage>();
257-
let mut size = size as libc::socklen_t;
258-
match retry(|| unsafe {
259-
libc::accept(self.fd(),
260-
storagep as *mut libc::sockaddr,
261-
&mut size as *mut libc::socklen_t) as libc::c_int
262-
}) {
263-
-1 => Err(super::last_error()),
264-
fd => Ok(UnixStream::new(Arc::new(Inner::new(fd))))
277+
let deadline = if self.deadline == 0 {None} else {Some(self.deadline)};
278+
279+
while !self.inner.closed.load(atomics::SeqCst) {
280+
unsafe {
281+
let mut storage: libc::sockaddr_storage = mem::zeroed();
282+
let storagep = &mut storage as *mut libc::sockaddr_storage;
283+
let size = mem::size_of::<libc::sockaddr_storage>();
284+
let mut size = size as libc::socklen_t;
285+
match retry(|| {
286+
libc::accept(self.fd(),
287+
storagep as *mut libc::sockaddr,
288+
&mut size as *mut libc::socklen_t) as libc::c_int
289+
}) {
290+
-1 if util::wouldblock() => {}
291+
-1 => return Err(super::last_error()),
292+
fd => return Ok(UnixStream::new(Arc::new(Inner::new(fd)))),
293+
}
294+
}
295+
try!(util::await([self.fd(), self.inner.reader.fd()],
296+
deadline, util::Readable));
265297
}
298+
299+
Err(util::eof())
266300
}
267301
}
268302

@@ -273,6 +307,24 @@ impl rtio::RtioUnixAcceptor for UnixAcceptor {
273307
fn set_timeout(&mut self, timeout: Option<u64>) {
274308
self.deadline = timeout.map(|a| ::io::timer::now() + a).unwrap_or(0);
275309
}
310+
311+
fn clone(&self) -> Box<rtio::RtioUnixAcceptor + Send> {
312+
box UnixAcceptor {
313+
inner: self.inner.clone(),
314+
deadline: 0,
315+
} as Box<rtio::RtioUnixAcceptor + Send>
316+
}
317+
318+
#[cfg(unix)]
319+
fn close_accept(&mut self) -> IoResult<()> {
320+
self.inner.closed.store(true, atomics::SeqCst);
321+
let mut fd = FileDesc::new(self.inner.writer.fd(), false);
322+
match fd.inner_write([0]) {
323+
Ok(..) => Ok(()),
324+
Err(..) if util::wouldblock() => Ok(()),
325+
Err(e) => Err(e),
326+
}
327+
}
276328
}
277329

278330
impl Drop for UnixListener {

branches/snap-stage3/src/libnative/io/pipe_windows.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,10 @@ use super::c;
9999
use super::util;
100100
use super::file::to_utf16;
101101

102-
struct Event(libc::HANDLE);
102+
pub struct Event(libc::HANDLE);
103103

104104
impl Event {
105-
fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
105+
pub fn new(manual_reset: bool, initial_state: bool) -> IoResult<Event> {
106106
let event = unsafe {
107107
libc::CreateEventW(ptr::mut_null(),
108108
manual_reset as libc::BOOL,
@@ -116,7 +116,7 @@ impl Event {
116116
}
117117
}
118118

119-
fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
119+
pub fn handle(&self) -> libc::HANDLE { let Event(handle) = *self; handle }
120120
}
121121

122122
impl Drop for Event {

branches/snap-stage3/src/libnative/io/process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ impl Drop for Process {
191191
}
192192
}
193193

194-
fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> {
194+
pub fn pipe() -> IoResult<(file::FileDesc, file::FileDesc)> {
195195
#[cfg(unix)] use libc::EMFILE as ERROR;
196196
#[cfg(windows)] use libc::WSAEMFILE as ERROR;
197197
struct Closer { fd: libc::c_int }

branches/snap-stage3/src/libnative/io/util.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
// except according to those terms.
1010

1111
use libc;
12+
use std::cmp;
1213
use std::mem;
1314
use std::os;
1415
use std::ptr;
@@ -166,10 +167,15 @@ pub fn connect_timeout(fd: net::sock_t,
166167
}
167168
}
168169

169-
pub fn await(fd: net::sock_t, deadline: Option<u64>,
170+
pub fn await(fds: &[net::sock_t], deadline: Option<u64>,
170171
status: SocketStatus) -> IoResult<()> {
171172
let mut set: c::fd_set = unsafe { mem::zeroed() };
172-
c::fd_set(&mut set, fd);
173+
let mut max = 0;
174+
for &fd in fds.iter() {
175+
c::fd_set(&mut set, fd);
176+
max = cmp::max(max, fd + 1);
177+
}
178+
173179
let (read, write) = match status {
174180
Readable => (&mut set as *mut _, ptr::mut_null()),
175181
Writable => (ptr::mut_null(), &mut set as *mut _),
@@ -188,8 +194,7 @@ pub fn await(fd: net::sock_t, deadline: Option<u64>,
188194
&mut tv as *mut _
189195
}
190196
};
191-
let n = if cfg!(windows) {1} else {fd as libc::c_int + 1};
192-
let r = unsafe { c::select(n, read, write, ptr::mut_null(), tvp) };
197+
let r = unsafe { c::select(max, read, write, ptr::mut_null(), tvp) };
193198
r
194199
}) {
195200
-1 => Err(last_error()),

branches/snap-stage3/src/librustc/back/link.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,8 +1312,6 @@ fn link_natively(sess: &Session, trans: &CrateTranslation, dylib: bool,
13121312
sess.note(str::from_utf8(output.as_slice()).unwrap());
13131313
sess.abort_if_errors();
13141314
}
1315-
debug!("linker stderr:\n{}", str::from_utf8_owned(prog.error).unwrap());
1316-
debug!("linker stdout:\n{}", str::from_utf8_owned(prog.output).unwrap());
13171315
},
13181316
Err(e) => {
13191317
sess.err(format!("could not exec the linker `{}`: {}",

branches/snap-stage3/src/librustrt/rtio.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,8 @@ pub trait RtioTcpAcceptor : RtioSocket {
246246
fn accept_simultaneously(&mut self) -> IoResult<()>;
247247
fn dont_accept_simultaneously(&mut self) -> IoResult<()>;
248248
fn set_timeout(&mut self, timeout: Option<u64>);
249+
fn clone(&self) -> Box<RtioTcpAcceptor + Send>;
250+
fn close_accept(&mut self) -> IoResult<()>;
249251
}
250252

251253
pub trait RtioTcpStream : RtioSocket {
@@ -335,6 +337,8 @@ pub trait RtioUnixListener {
335337
pub trait RtioUnixAcceptor {
336338
fn accept(&mut self) -> IoResult<Box<RtioPipe + Send>>;
337339
fn set_timeout(&mut self, timeout: Option<u64>);
340+
fn clone(&self) -> Box<RtioUnixAcceptor + Send>;
341+
fn close_accept(&mut self) -> IoResult<()>;
338342
}
339343

340344
pub trait RtioTTY {

0 commit comments

Comments
 (0)