Skip to content

Commit f980424

Browse files
committed
---
yaml --- r: 154608 b: refs/heads/try2 c: cb8df7a h: refs/heads/master v: v3
1 parent 1356ca1 commit f980424

File tree

5 files changed

+291
-174
lines changed

5 files changed

+291
-174
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ refs/heads/snap-stage3: 78a7676898d9f80ab540c6df5d4c9ce35bb50463
55
refs/heads/try: 519addf6277dbafccbb4159db4b710c37eaa2ec5
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105
77
refs/heads/ndm: f3868061cd7988080c30d6d5bf352a5a5fe2460b
8-
refs/heads/try2: 110168de2a7b529a7c4839ca1e19c4c42f68be12
8+
refs/heads/try2: cb8df7a8e3c53f16d22f76da50e6e2e4734bdf62
99
refs/heads/dist-snap: ba4081a5a8573875fed17545846f6f6902c8ba8d
1010
refs/tags/release-0.2: c870d2dffb391e14efb05aa27898f1f6333a9596
1111
refs/tags/release-0.3: b5f0d0f648d9a6153664837026ba1be43d3e2503

branches/try2/src/librustuv/access.rs

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,38 +22,40 @@ use std::cell::UnsafeCell;
2222

2323
use homing::HomingMissile;
2424

25-
pub struct Access {
26-
inner: Arc<UnsafeCell<Inner>>,
25+
pub struct Access<T> {
26+
inner: Arc<UnsafeCell<Inner<T>>>,
2727
}
2828

29-
pub struct Guard<'a> {
30-
access: &'a mut Access,
29+
pub struct Guard<'a, T> {
30+
access: &'a mut Access<T>,
3131
missile: Option<HomingMissile>,
3232
}
3333

34-
struct Inner {
34+
struct Inner<T> {
3535
queue: Vec<(BlockedTask, uint)>,
3636
held: bool,
3737
closed: bool,
38+
data: T,
3839
}
3940

40-
impl Access {
41-
pub fn new() -> Access {
41+
impl<T: Send> Access<T> {
42+
pub fn new(data: T) -> Access<T> {
4243
Access {
4344
inner: Arc::new(UnsafeCell::new(Inner {
4445
queue: vec![],
4546
held: false,
4647
closed: false,
48+
data: data,
4749
}))
4850
}
4951
}
5052

5153
pub fn grant<'a>(&'a mut self, token: uint,
52-
missile: HomingMissile) -> Guard<'a> {
54+
missile: HomingMissile) -> Guard<'a, T> {
5355
// This unsafety is actually OK because the homing missile argument
5456
// guarantees that we're on the same event loop as all the other objects
5557
// attempting to get access granted.
56-
let inner: &mut Inner = unsafe { &mut *self.inner.get() };
58+
let inner = unsafe { &mut *self.inner.get() };
5759

5860
if inner.held {
5961
let t: Box<Task> = Local::take();
@@ -69,6 +71,15 @@ impl Access {
6971
Guard { access: self, missile: Some(missile) }
7072
}
7173

74+
pub fn unsafe_get(&self) -> *mut T {
75+
unsafe { &mut (*self.inner.get()).data as *mut _ }
76+
}
77+
78+
// Safe version which requires proof that you are on the home scheduler.
79+
pub fn get_mut<'a>(&'a mut self, _missile: &HomingMissile) -> &'a mut T {
80+
unsafe { &mut *self.unsafe_get() }
81+
}
82+
7283
pub fn close(&self, _missile: &HomingMissile) {
7384
// This unsafety is OK because with a homing missile we're guaranteed to
7485
// be the only task looking at the `closed` flag (and are therefore
@@ -82,35 +93,55 @@ impl Access {
8293
// is only safe to invoke while on the home event loop, and there is no
8394
// guarantee that this i being invoked on the home event loop.
8495
pub unsafe fn dequeue(&mut self, token: uint) -> Option<BlockedTask> {
85-
let inner: &mut Inner = &mut *self.inner.get();
96+
let inner = &mut *self.inner.get();
8697
match inner.queue.iter().position(|&(_, t)| t == token) {
8798
Some(i) => Some(inner.queue.remove(i).unwrap().val0()),
8899
None => None,
89100
}
90101
}
102+
103+
/// Test whether this access is closed, using a homing missile to prove
104+
/// that it's safe
105+
pub fn is_closed(&self, _missile: &HomingMissile) -> bool {
106+
unsafe { (*self.inner.get()).closed }
107+
}
91108
}
92109

93-
impl Clone for Access {
94-
fn clone(&self) -> Access {
110+
impl<T: Send> Clone for Access<T> {
111+
fn clone(&self) -> Access<T> {
95112
Access { inner: self.inner.clone() }
96113
}
97114
}
98115

99-
impl<'a> Guard<'a> {
116+
impl<'a, T: Send> Guard<'a, T> {
100117
pub fn is_closed(&self) -> bool {
101118
// See above for why this unsafety is ok, it just applies to the read
102119
// instead of the write.
103120
unsafe { (*self.access.inner.get()).closed }
104121
}
105122
}
106123

124+
impl<'a, T: Send> Deref<T> for Guard<'a, T> {
125+
fn deref<'a>(&'a self) -> &'a T {
126+
// A guard represents exclusive access to a piece of data, so it's safe
127+
// to hand out shared and mutable references
128+
unsafe { &(*self.access.inner.get()).data }
129+
}
130+
}
131+
132+
impl<'a, T: Send> DerefMut<T> for Guard<'a, T> {
133+
fn deref_mut<'a>(&'a mut self) -> &'a mut T {
134+
unsafe { &mut (*self.access.inner.get()).data }
135+
}
136+
}
137+
107138
#[unsafe_destructor]
108-
impl<'a> Drop for Guard<'a> {
139+
impl<'a, T> Drop for Guard<'a, T> {
109140
fn drop(&mut self) {
110141
// This guard's homing missile is still armed, so we're guaranteed to be
111142
// on the same I/O event loop, so this unsafety should be ok.
112143
assert!(self.missile.is_some());
113-
let inner: &mut Inner = unsafe {
144+
let inner: &mut Inner<T> = unsafe {
114145
mem::transmute(self.access.inner.get())
115146
};
116147

@@ -133,7 +164,8 @@ impl<'a> Drop for Guard<'a> {
133164
}
134165
}
135166

136-
impl Drop for Inner {
167+
#[unsafe_destructor]
168+
impl<T> Drop for Inner<T> {
137169
fn drop(&mut self) {
138170
assert!(!self.held);
139171
assert_eq!(self.queue.len(), 0);

branches/try2/src/librustuv/net.rs

Lines changed: 67 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use stream::StreamWatcher;
2222
use super::{Loop, Request, UvError, Buf, status_to_io_result,
2323
uv_error_to_io_error, UvHandle, slice_to_uv_buf,
2424
wait_until_woken_after, wakeup};
25-
use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx};
25+
use timeout::{AccessTimeout, ConnectCtx, AcceptTimeout};
2626
use uvio::UvIoFactory;
2727
use uvll;
2828

@@ -158,20 +158,20 @@ pub struct TcpWatcher {
158158
// stream object, so we use these access guards in order to arbitrate among
159159
// multiple concurrent reads and writes. Note that libuv *can* read and
160160
// write simultaneously, it just can't read and read simultaneously.
161-
read_access: AccessTimeout,
162-
write_access: AccessTimeout,
161+
read_access: AccessTimeout<()>,
162+
write_access: AccessTimeout<()>,
163163
}
164164

165165
pub struct TcpListener {
166166
home: HomeHandle,
167-
handle: *mut uvll::uv_pipe_t,
168-
outgoing: Sender<Result<Box<rtio::RtioTcpStream + Send>, IoError>>,
169-
incoming: Receiver<Result<Box<rtio::RtioTcpStream + Send>, IoError>>,
167+
handle: *mut uvll::uv_tcp_t,
170168
}
171169

172170
pub struct TcpAcceptor {
173-
listener: Box<TcpListener>,
174-
timeout: AcceptTimeout,
171+
home: HomeHandle,
172+
handle: *mut uvll::uv_tcp_t,
173+
access: AcceptTimeout<Box<rtio::RtioTcpStream + Send>>,
174+
refcount: Refcount,
175175
}
176176

177177
// TCP watchers (clients/streams)
@@ -192,8 +192,8 @@ impl TcpWatcher {
192192
handle: handle,
193193
stream: StreamWatcher::new(handle, true),
194194
refcount: Refcount::new(),
195-
read_access: AccessTimeout::new(),
196-
write_access: AccessTimeout::new(),
195+
read_access: AccessTimeout::new(()),
196+
write_access: AccessTimeout::new(()),
197197
}
198198
}
199199

@@ -291,7 +291,7 @@ impl rtio::RtioTcpStream for TcpWatcher {
291291
let task = {
292292
let m = self.fire_homing_missile();
293293
self.read_access.access.close(&m);
294-
self.stream.cancel_read(uvll::EOF as libc::ssize_t)
294+
self.stream.cancel_read(uvll::EOF as libc::ssize_t)
295295
};
296296
let _ = task.map(|t| t.reawaken());
297297
Ok(())
@@ -354,12 +354,9 @@ impl TcpListener {
354354
assert_eq!(unsafe {
355355
uvll::uv_tcp_init(io.uv_loop(), handle)
356356
}, 0);
357-
let (tx, rx) = channel();
358357
let l = box TcpListener {
359358
home: io.make_handle(),
360359
handle: handle,
361-
outgoing: tx,
362-
incoming: rx,
363360
};
364361
let mut storage = unsafe { mem::zeroed() };
365362
let _len = addr_to_sockaddr(address, &mut storage);
@@ -392,15 +389,19 @@ impl rtio::RtioSocket for TcpListener {
392389
impl rtio::RtioTcpListener for TcpListener {
393390
fn listen(self: Box<TcpListener>)
394391
-> Result<Box<rtio::RtioTcpAcceptor + Send>, IoError> {
392+
let _m = self.fire_homing_missile();
393+
395394
// create the acceptor object from ourselves
396-
let mut acceptor = box TcpAcceptor {
397-
listener: self,
398-
timeout: AcceptTimeout::new(),
399-
};
395+
let acceptor = (box TcpAcceptor {
396+
handle: self.handle,
397+
home: self.home.clone(),
398+
access: AcceptTimeout::new(),
399+
refcount: Refcount::new(),
400+
}).install();
401+
self.handle = 0 as *mut _;
400402

401-
let _m = acceptor.fire_homing_missile();
402403
// FIXME: the 128 backlog should be configurable
403-
match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
404+
match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } {
404405
0 => Ok(acceptor as Box<rtio::RtioTcpAcceptor + Send>),
405406
n => Err(uv_error_to_io_error(UvError(n))),
406407
}
@@ -409,7 +410,7 @@ impl rtio::RtioTcpListener for TcpListener {
409410

410411
extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) {
411412
assert!(status != uvll::ECANCELED);
412-
let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
413+
let tcp: &mut TcpAcceptor = unsafe { UvHandle::from_uv_handle(&server) };
413414
let msg = match status {
414415
0 => {
415416
let loop_ = Loop::wrap(unsafe {
@@ -421,11 +422,15 @@ extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) {
421422
}
422423
n => Err(uv_error_to_io_error(UvError(n)))
423424
};
424-
tcp.outgoing.send(msg);
425+
426+
// If we're running then we have exclusive access, so the unsafe_get() is ok
427+
unsafe { tcp.access.push(msg); }
425428
}
426429

427430
impl Drop for TcpListener {
428431
fn drop(&mut self) {
432+
if self.handle.is_null() { return }
433+
429434
let _m = self.fire_homing_missile();
430435
self.close();
431436
}
@@ -434,40 +439,68 @@ impl Drop for TcpListener {
434439
// TCP acceptors (bound servers)
435440

436441
impl HomingIO for TcpAcceptor {
437-
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
442+
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
438443
}
439444

440445
impl rtio::RtioSocket for TcpAcceptor {
441446
fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
442447
let _m = self.fire_homing_missile();
443-
socket_name(Tcp, self.listener.handle)
448+
socket_name(Tcp, self.handle)
444449
}
445450
}
446451

452+
impl UvHandle<uvll::uv_tcp_t> for TcpAcceptor {
453+
fn uv_handle(&self) -> *mut uvll::uv_tcp_t { self.handle }
454+
}
455+
447456
impl rtio::RtioTcpAcceptor for TcpAcceptor {
448457
fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream + Send>, IoError> {
449-
self.timeout.accept(&self.listener.incoming)
458+
let m = self.fire_homing_missile();
459+
let loop_ = self.uv_loop();
460+
self.access.accept(m, &loop_)
450461
}
451462

452463
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
453464
let _m = self.fire_homing_missile();
454465
status_to_io_result(unsafe {
455-
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
466+
uvll::uv_tcp_simultaneous_accepts(self.handle, 1)
456467
})
457468
}
458469

459470
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
460471
let _m = self.fire_homing_missile();
461472
status_to_io_result(unsafe {
462-
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
473+
uvll::uv_tcp_simultaneous_accepts(self.handle, 0)
463474
})
464475
}
465476

466477
fn set_timeout(&mut self, ms: Option<u64>) {
467478
let _m = self.fire_homing_missile();
468-
match ms {
469-
None => self.timeout.clear(),
470-
Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
479+
let loop_ = self.uv_loop();
480+
self.access.set_timeout(ms, &loop_, &self.home);
481+
}
482+
483+
fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
484+
box TcpAcceptor {
485+
refcount: self.refcount.clone(),
486+
home: self.home.clone(),
487+
handle: self.handle,
488+
access: self.access.clone(),
489+
} as Box<rtio::RtioTcpAcceptor + Send>
490+
}
491+
492+
fn close_accept(&mut self) -> Result<(), IoError> {
493+
let m = self.fire_homing_missile();
494+
self.access.close(m);
495+
Ok(())
496+
}
497+
}
498+
499+
impl Drop for TcpAcceptor {
500+
fn drop(&mut self) {
501+
let _m = self.fire_homing_missile();
502+
if self.refcount.decrement() {
503+
self.close();
471504
}
472505
}
473506
}
@@ -482,8 +515,8 @@ pub struct UdpWatcher {
482515

483516
// See above for what these fields are
484517
refcount: Refcount,
485-
read_access: AccessTimeout,
486-
write_access: AccessTimeout,
518+
read_access: AccessTimeout<()>,
519+
write_access: AccessTimeout<()>,
487520

488521
blocked_sender: Option<BlockedTask>,
489522
}
@@ -507,8 +540,8 @@ impl UdpWatcher {
507540
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
508541
home: io.make_handle(),
509542
refcount: Refcount::new(),
510-
read_access: AccessTimeout::new(),
511-
write_access: AccessTimeout::new(),
543+
read_access: AccessTimeout::new(()),
544+
write_access: AccessTimeout::new(()),
512545
blocked_sender: None,
513546
};
514547
assert_eq!(unsafe {

0 commit comments

Comments
 (0)