Skip to content

Commit 368818b

Browse files
committed
---
yaml --- r: 124277 b: refs/heads/try c: af40d3056e5382eaf57dd2b32b7533409d688e2f h: refs/heads/master i: 124275: 8a78dd5 v: v3
1 parent b01f7b8 commit 368818b

File tree

5 files changed

+293
-176
lines changed

5 files changed

+293
-176
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
refs/heads/master: 6c35d513cea468b30759b4f78becf28f11a123c0
33
refs/heads/snap-stage1: e33de59e47c5076a89eadeb38f4934f58a3618a6
44
refs/heads/snap-stage3: afbcbbc77ffc6b10053bc543daf7d2e05d68cc01
5-
refs/heads/try: 5a7ecff2d9c362befddac7c4e036b155fae3bd93
5+
refs/heads/try: af40d3056e5382eaf57dd2b32b7533409d688e2f
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105
77
refs/heads/ndm: f3868061cd7988080c30d6d5bf352a5a5fe2460b
88
refs/heads/try2: 147ecfdd8221e4a4d4e090486829a06da1e0ca3c

branches/try/src/librustuv/access.rs

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,38 +22,40 @@ use std::ty::Unsafe;
2222

2323
use homing::HomingMissile;
2424

25-
pub struct Access {
26-
inner: Arc<Unsafe<Inner>>,
25+
pub struct Access<T> {
26+
inner: Arc<Unsafe<Inner<T>>>,
2727
}
2828

29-
pub struct Guard<'a> {
30-
access: &'a mut Access,
29+
pub struct Guard<'a, T> {
30+
access: &'a mut Access<T>,
3131
missile: Option<HomingMissile>,
3232
}
3333

34-
struct Inner {
34+
struct Inner<T> {
3535
queue: Vec<(BlockedTask, uint)>,
3636
held: bool,
3737
closed: bool,
38+
data: T,
3839
}
3940

40-
impl Access {
41-
pub fn new() -> Access {
41+
impl<T: Send> Access<T> {
42+
pub fn new(data: T) -> Access<T> {
4243
Access {
4344
inner: Arc::new(Unsafe::new(Inner {
4445
queue: vec![],
4546
held: false,
4647
closed: false,
48+
data: data,
4749
}))
4850
}
4951
}
5052

5153
pub fn grant<'a>(&'a mut self, token: uint,
52-
missile: HomingMissile) -> Guard<'a> {
54+
missile: HomingMissile) -> Guard<'a, T> {
5355
// This unsafety is actually OK because the homing missile argument
5456
// guarantees that we're on the same event loop as all the other objects
5557
// attempting to get access granted.
56-
let inner: &mut Inner = unsafe { &mut *self.inner.get() };
58+
let inner = unsafe { &mut *self.inner.get() };
5759

5860
if inner.held {
5961
let t: Box<Task> = Local::take();
@@ -69,6 +71,15 @@ impl Access {
6971
Guard { access: self, missile: Some(missile) }
7072
}
7173

74+
pub fn unsafe_get(&self) -> *mut T {
75+
unsafe { &mut (*self.inner.get()).data as *mut _ }
76+
}
77+
78+
// Safe version which requires proof that you are on the home scheduler.
79+
pub fn get_mut<'a>(&'a mut self, _missile: &HomingMissile) -> &'a mut T {
80+
unsafe { &mut *self.unsafe_get() }
81+
}
82+
7283
pub fn close(&self, _missile: &HomingMissile) {
7384
// This unsafety is OK because with a homing missile we're guaranteed to
7485
// be the only task looking at the `closed` flag (and are therefore
@@ -82,35 +93,55 @@ impl Access {
8293
// is only safe to invoke while on the home event loop, and there is no
8394
// guarantee that this i being invoked on the home event loop.
8495
pub unsafe fn dequeue(&mut self, token: uint) -> Option<BlockedTask> {
85-
let inner: &mut Inner = &mut *self.inner.get();
96+
let inner = &mut *self.inner.get();
8697
match inner.queue.iter().position(|&(_, t)| t == token) {
8798
Some(i) => Some(inner.queue.remove(i).unwrap().val0()),
8899
None => None,
89100
}
90101
}
102+
103+
/// Test whether this access is closed, using a homing missile to prove
104+
/// that it's safe
105+
pub fn is_closed(&self, _missile: &HomingMissile) -> bool {
106+
unsafe { (*self.inner.get()).closed }
107+
}
91108
}
92109

93-
impl Clone for Access {
94-
fn clone(&self) -> Access {
110+
impl<T: Send> Clone for Access<T> {
111+
fn clone(&self) -> Access<T> {
95112
Access { inner: self.inner.clone() }
96113
}
97114
}
98115

99-
impl<'a> Guard<'a> {
116+
impl<'a, T: Send> Guard<'a, T> {
100117
pub fn is_closed(&self) -> bool {
101118
// See above for why this unsafety is ok, it just applies to the read
102119
// instead of the write.
103120
unsafe { (*self.access.inner.get()).closed }
104121
}
105122
}
106123

124+
impl<'a, T: Send> Deref<T> for Guard<'a, T> {
125+
fn deref<'a>(&'a self) -> &'a T {
126+
// A guard represents exclusive access to a piece of data, so it's safe
127+
// to hand out shared and mutable references
128+
unsafe { &(*self.access.inner.get()).data }
129+
}
130+
}
131+
132+
impl<'a, T: Send> DerefMut<T> for Guard<'a, T> {
133+
fn deref_mut<'a>(&'a mut self) -> &'a mut T {
134+
unsafe { &mut (*self.access.inner.get()).data }
135+
}
136+
}
137+
107138
#[unsafe_destructor]
108-
impl<'a> Drop for Guard<'a> {
139+
impl<'a, T> Drop for Guard<'a, T> {
109140
fn drop(&mut self) {
110141
// This guard's homing missile is still armed, so we're guaranteed to be
111142
// on the same I/O event loop, so this unsafety should be ok.
112143
assert!(self.missile.is_some());
113-
let inner: &mut Inner = unsafe {
144+
let inner: &mut Inner<T> = unsafe {
114145
mem::transmute(self.access.inner.get())
115146
};
116147

@@ -133,7 +164,8 @@ impl<'a> Drop for Guard<'a> {
133164
}
134165
}
135166

136-
impl Drop for Inner {
167+
#[unsafe_destructor]
168+
impl<T> Drop for Inner<T> {
137169
fn drop(&mut self) {
138170
assert!(!self.held);
139171
assert_eq!(self.queue.len(), 0);

branches/try/src/librustuv/net.rs

Lines changed: 68 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use stream::StreamWatcher;
2222
use super::{Loop, Request, UvError, Buf, status_to_io_result,
2323
uv_error_to_io_error, UvHandle, slice_to_uv_buf,
2424
wait_until_woken_after, wakeup};
25-
use timeout::{AccessTimeout, AcceptTimeout, ConnectCtx};
25+
use timeout::{AccessTimeout, ConnectCtx, AcceptTimeout};
2626
use uvio::UvIoFactory;
2727
use uvll;
2828

@@ -159,20 +159,20 @@ pub struct TcpWatcher {
159159
// stream object, so we use these access guards in order to arbitrate among
160160
// multiple concurrent reads and writes. Note that libuv *can* read and
161161
// write simultaneously, it just can't read and read simultaneously.
162-
read_access: AccessTimeout,
163-
write_access: AccessTimeout,
162+
read_access: AccessTimeout<()>,
163+
write_access: AccessTimeout<()>,
164164
}
165165

166166
pub struct TcpListener {
167167
home: HomeHandle,
168-
handle: *mut uvll::uv_pipe_t,
169-
outgoing: Sender<Result<Box<rtio::RtioTcpStream + Send>, IoError>>,
170-
incoming: Receiver<Result<Box<rtio::RtioTcpStream + Send>, IoError>>,
168+
handle: *mut uvll::uv_tcp_t,
171169
}
172170

173171
pub struct TcpAcceptor {
174-
listener: Box<TcpListener>,
175-
timeout: AcceptTimeout,
172+
home: HomeHandle,
173+
handle: *mut uvll::uv_tcp_t,
174+
access: AcceptTimeout<Box<rtio::RtioTcpStream + Send>>,
175+
refcount: Refcount,
176176
}
177177

178178
// TCP watchers (clients/streams)
@@ -193,8 +193,8 @@ impl TcpWatcher {
193193
handle: handle,
194194
stream: StreamWatcher::new(handle, true),
195195
refcount: Refcount::new(),
196-
read_access: AccessTimeout::new(),
197-
write_access: AccessTimeout::new(),
196+
read_access: AccessTimeout::new(()),
197+
write_access: AccessTimeout::new(()),
198198
}
199199
}
200200

@@ -291,7 +291,7 @@ impl rtio::RtioTcpStream for TcpWatcher {
291291
let task = {
292292
let m = self.fire_homing_missile();
293293
self.read_access.access.close(&m);
294-
self.stream.cancel_read(uvll::EOF as libc::ssize_t)
294+
self.stream.cancel_read(uvll::EOF as libc::ssize_t)
295295
};
296296
let _ = task.map(|t| t.reawaken());
297297
Ok(())
@@ -354,12 +354,9 @@ impl TcpListener {
354354
assert_eq!(unsafe {
355355
uvll::uv_tcp_init(io.uv_loop(), handle)
356356
}, 0);
357-
let (tx, rx) = channel();
358357
let l = box TcpListener {
359358
home: io.make_handle(),
360359
handle: handle,
361-
outgoing: tx,
362-
incoming: rx,
363360
};
364361
let (addr, _len) = addr_to_sockaddr(address);
365362
let res = unsafe {
@@ -389,16 +386,20 @@ impl rtio::RtioSocket for TcpListener {
389386
}
390387

391388
impl rtio::RtioTcpListener for TcpListener {
392-
fn listen(~self) -> Result<Box<rtio::RtioTcpAcceptor + Send>, IoError> {
389+
fn listen(mut ~self) -> Result<Box<rtio::RtioTcpAcceptor + Send>, IoError> {
390+
let _m = self.fire_homing_missile();
391+
393392
// create the acceptor object from ourselves
394-
let mut acceptor = box TcpAcceptor {
395-
listener: self,
396-
timeout: AcceptTimeout::new(),
397-
};
393+
let acceptor = (box TcpAcceptor {
394+
handle: self.handle,
395+
home: self.home.clone(),
396+
access: AcceptTimeout::new(),
397+
refcount: Refcount::new(),
398+
}).install();
399+
self.handle = 0 as *mut _;
398400

399-
let _m = acceptor.fire_homing_missile();
400401
// FIXME: the 128 backlog should be configurable
401-
match unsafe { uvll::uv_listen(acceptor.listener.handle, 128, listen_cb) } {
402+
match unsafe { uvll::uv_listen(acceptor.handle, 128, listen_cb) } {
402403
0 => Ok(acceptor as Box<rtio::RtioTcpAcceptor + Send>),
403404
n => Err(uv_error_to_io_error(UvError(n))),
404405
}
@@ -407,7 +408,7 @@ impl rtio::RtioTcpListener for TcpListener {
407408

408409
extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) {
409410
assert!(status != uvll::ECANCELED);
410-
let tcp: &mut TcpListener = unsafe { UvHandle::from_uv_handle(&server) };
411+
let tcp: &mut TcpAcceptor = unsafe { UvHandle::from_uv_handle(&server) };
411412
let msg = match status {
412413
0 => {
413414
let loop_ = Loop::wrap(unsafe {
@@ -419,11 +420,15 @@ extern fn listen_cb(server: *mut uvll::uv_stream_t, status: c_int) {
419420
}
420421
n => Err(uv_error_to_io_error(UvError(n)))
421422
};
422-
tcp.outgoing.send(msg);
423+
424+
// If we're running then we have exclusive access, so the unsafe_get() is ok
425+
unsafe { tcp.access.push(msg); }
423426
}
424427

425428
impl Drop for TcpListener {
426429
fn drop(&mut self) {
430+
if self.handle.is_null() { return }
431+
427432
let _m = self.fire_homing_missile();
428433
self.close();
429434
}
@@ -432,40 +437,68 @@ impl Drop for TcpListener {
432437
// TCP acceptors (bound servers)
433438

434439
impl HomingIO for TcpAcceptor {
435-
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { self.listener.home() }
440+
fn home<'r>(&'r mut self) -> &'r mut HomeHandle { &mut self.home }
436441
}
437442

438443
impl rtio::RtioSocket for TcpAcceptor {
439444
fn socket_name(&mut self) -> Result<rtio::SocketAddr, IoError> {
440445
let _m = self.fire_homing_missile();
441-
socket_name(Tcp, self.listener.handle)
446+
socket_name(Tcp, self.handle)
442447
}
443448
}
444449

450+
impl UvHandle<uvll::uv_tcp_t> for TcpAcceptor {
451+
fn uv_handle(&self) -> *mut uvll::uv_tcp_t { self.handle }
452+
}
453+
445454
impl rtio::RtioTcpAcceptor for TcpAcceptor {
446455
fn accept(&mut self) -> Result<Box<rtio::RtioTcpStream + Send>, IoError> {
447-
self.timeout.accept(&self.listener.incoming)
456+
let m = self.fire_homing_missile();
457+
let loop_ = self.uv_loop();
458+
self.access.accept(m, &loop_)
448459
}
449460

450461
fn accept_simultaneously(&mut self) -> Result<(), IoError> {
451462
let _m = self.fire_homing_missile();
452463
status_to_io_result(unsafe {
453-
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 1)
464+
uvll::uv_tcp_simultaneous_accepts(self.handle, 1)
454465
})
455466
}
456467

457468
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError> {
458469
let _m = self.fire_homing_missile();
459470
status_to_io_result(unsafe {
460-
uvll::uv_tcp_simultaneous_accepts(self.listener.handle, 0)
471+
uvll::uv_tcp_simultaneous_accepts(self.handle, 0)
461472
})
462473
}
463474

464475
fn set_timeout(&mut self, ms: Option<u64>) {
465476
let _m = self.fire_homing_missile();
466-
match ms {
467-
None => self.timeout.clear(),
468-
Some(ms) => self.timeout.set_timeout(ms, &mut *self.listener),
477+
let loop_ = self.uv_loop();
478+
self.access.set_timeout(ms, &loop_, &self.home);
479+
}
480+
481+
fn clone(&self) -> Box<rtio::RtioTcpAcceptor + Send> {
482+
box TcpAcceptor {
483+
refcount: self.refcount.clone(),
484+
home: self.home.clone(),
485+
handle: self.handle,
486+
access: self.access.clone(),
487+
} as Box<rtio::RtioTcpAcceptor + Send>
488+
}
489+
490+
fn close_accept(&mut self) -> Result<(), IoError> {
491+
let m = self.fire_homing_missile();
492+
self.access.close(m);
493+
Ok(())
494+
}
495+
}
496+
497+
impl Drop for TcpAcceptor {
498+
fn drop(&mut self) {
499+
let _m = self.fire_homing_missile();
500+
if self.refcount.decrement() {
501+
self.close();
469502
}
470503
}
471504
}
@@ -480,8 +513,8 @@ pub struct UdpWatcher {
480513

481514
// See above for what these fields are
482515
refcount: Refcount,
483-
read_access: AccessTimeout,
484-
write_access: AccessTimeout,
516+
read_access: AccessTimeout<()>,
517+
write_access: AccessTimeout<()>,
485518

486519
blocked_sender: Option<BlockedTask>,
487520
}
@@ -505,8 +538,8 @@ impl UdpWatcher {
505538
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
506539
home: io.make_handle(),
507540
refcount: Refcount::new(),
508-
read_access: AccessTimeout::new(),
509-
write_access: AccessTimeout::new(),
541+
read_access: AccessTimeout::new(()),
542+
write_access: AccessTimeout::new(()),
510543
blocked_sender: None,
511544
};
512545
assert_eq!(unsafe {

0 commit comments

Comments
 (0)