Skip to content

Commit 3386a3b

Browse files
committed
Drop the full message queue when a Port is gone
This fixes the deadlock where you send ports on channels, but then the receiving task fails, dropping the port, but the queue isn't dropped so if you then wait for data you'll wait forever.
1 parent e2b1808 commit 3386a3b

File tree

3 files changed

+175
-40
lines changed

3 files changed

+175
-40
lines changed

src/libstd/comm/mod.rs

Lines changed: 168 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,6 @@
227227

228228
use cast;
229229
use clone::Clone;
230-
use container::Container;
231230
use int;
232231
use iter::Iterator;
233232
use kinds::Send;
@@ -295,32 +294,32 @@ pub struct PortIterator<'a, T> {
295294
/// task
296295
#[no_freeze] // can't share chans in an arc
297296
pub struct Chan<T> {
298-
priv inner: UnsafeArc<SingleInner<T>>,
297+
priv inner: UnsafeArc<Stream<T>>,
299298
}
300299

301300
/// The sending-half of Rust's channel type. This half can be shared among many
302301
/// tasks by creating copies of itself through the `clone` method.
303302
#[no_freeze] // technically this implementation is shareable, but it shouldn't
304303
// be required to be shareable in an arc
305304
pub struct SharedChan<T> {
306-
priv inner: UnsafeArc<SharedInner<T>>,
305+
priv inner: UnsafeArc<Shared<T>>,
307306
}
308307

309308
///////////////////////////////////////////////////////////////////////////////
310309
// Internal struct definitions
311310
///////////////////////////////////////////////////////////////////////////////
312311

313312
enum PortInner<T> {
314-
Single(UnsafeArc<SingleInner<T>>),
315-
Shared(UnsafeArc<SharedInner<T>>),
313+
SingleInner(UnsafeArc<Stream<T>>),
314+
SharedInner(UnsafeArc<Shared<T>>),
316315
}
317316

318-
struct SingleInner<T> {
317+
struct Stream<T> {
319318
queue: spsc::Queue<T>,
320319
packet: Packet,
321320
}
322321

323-
struct SharedInner<T> {
322+
struct Shared<T> {
324323
queue: mpsc::Queue<T>,
325324
packet: Packet,
326325
}
@@ -339,6 +338,11 @@ struct Packet {
339338
select_next: *mut Packet,
340339
select_prev: *mut Packet,
341340
recv_cnt: int,
341+
342+
// See the discussion in Port::drop and the channel send methods for what
343+
// these are used for
344+
go_home: AtomicBool,
345+
sender_drain: AtomicInt,
342346
}
343347

344348
///////////////////////////////////////////////////////////////////////////////
@@ -351,8 +355,8 @@ static RESCHED_FREQ: int = 200;
351355
impl<T: Send> PortInner<T> {
352356
fn packet<'a>(&'a mut self) -> &'a mut Packet {
353357
match *self {
354-
Single(ref arc) => unsafe { &mut (*arc.get()).packet },
355-
Shared(ref arc) => unsafe { &mut (*arc.get()).packet },
358+
SingleInner(ref arc) => unsafe { &mut (*arc.get()).packet },
359+
SharedInner(ref arc) => unsafe { &mut (*arc.get()).packet },
356360
}
357361
}
358362
}
@@ -370,6 +374,9 @@ impl Packet {
370374
select_next: 0 as *mut Packet,
371375
select_prev: 0 as *mut Packet,
372376
recv_cnt: 0,
377+
378+
go_home: AtomicBool::new(false),
379+
sender_drain: AtomicInt::new(0),
373380
}
374381
}
375382

@@ -530,11 +537,11 @@ impl<T: Send> Chan<T> {
530537
pub fn new() -> (Port<T>, Chan<T>) {
531538
// arbitrary 128 size cache -- this is just a max cache size, not a
532539
// maximum buffer size
533-
let (a, b) = UnsafeArc::new2(SingleInner {
540+
let (a, b) = UnsafeArc::new2(Stream {
534541
queue: spsc::Queue::new(128),
535542
packet: Packet::new(),
536543
});
537-
(Port { inner: Single(a) }, Chan { inner: b })
544+
(Port { inner: SingleInner(a) }, Chan { inner: b })
538545
}
539546

540547
/// Sends a value along this channel to be received by the corresponding
@@ -584,14 +591,35 @@ impl<T: Send> Chan<T> {
584591
fn try(&self, t: T, can_resched: bool) -> bool {
585592
unsafe {
586593
let inner = self.inner.get();
594+
595+
// See the discussion in Port::drop for what's going on here
596+
if (*inner).packet.go_home.load(Relaxed) { return false }
597+
587598
(*inner).queue.push(t);
588599
match (*inner).packet.increment() {
589600
// As described above, -1 == wakeup
590601
-1 => { (*inner).packet.wakeup(can_resched); true }
591602
// Also as above, SPSC queues must be >= -2
592603
-2 => true,
593-
// We succeeded if we sent data
594-
DISCONNECTED => (*inner).queue.is_empty(),
604+
605+
DISCONNECTED => {
606+
// After the go_home check, the port could have been
607+
// dropped, so we need to be sure to drain the queue here
608+
// (we own the queue now that the port is gone). Note that
609+
// it is only possible for there to be one item in the queue
610+
// because the port ensures that there is 0 data in the
611+
// queue when it flags disconnected, and we could have only
612+
// pushed on one more item
613+
let first = (*inner).queue.pop();
614+
let second = (*inner).queue.pop();
615+
assert!(second.is_none());
616+
617+
match first {
618+
Some(..) => false, // we failed to send the data
619+
None => true, // we successfully sent data
620+
}
621+
}
622+
595623
// In order to prevent starvation of other tasks in situations
596624
// where a task sends repeatedly without ever receiving, we
597625
// occassionally yield instead of doing a send immediately.
@@ -626,11 +654,11 @@ impl<T: Send> SharedChan<T> {
626654
/// same time. All data sent on any channel will become available on the
627655
/// provided port as well.
628656
pub fn new() -> (Port<T>, SharedChan<T>) {
629-
let (a, b) = UnsafeArc::new2(SharedInner {
657+
let (a, b) = UnsafeArc::new2(Shared {
630658
queue: mpsc::Queue::new(),
631659
packet: Packet::new(),
632660
});
633-
(Port { inner: Shared(a) }, SharedChan { inner: b })
661+
(Port { inner: SharedInner(a) }, SharedChan { inner: b })
634662
}
635663

636664
/// Equivalent method to `send` on the `Chan` type (using the same
@@ -645,6 +673,10 @@ impl<T: Send> SharedChan<T> {
645673
/// semantics)
646674
pub fn try_send(&self, t: T) -> bool {
647675
unsafe {
676+
let inner = self.inner.get();
677+
// See Port::drop for what's going on
678+
if (*inner).packet.go_home.load(Relaxed) { return false }
679+
648680
// Note that the multiple sender case is a little tricker
649681
// semantically than the single sender case. The logic for
650682
// incrementing is "add and if disconnected store disconnected".
@@ -670,15 +702,49 @@ impl<T: Send> SharedChan<T> {
670702
// preflight check serves as the definitive "this will never be
671703
// received". Once we get beyond this check, we have permanently
672704
// entered the realm of "this may be received"
673-
let inner = self.inner.get();
674705
if (*inner).packet.cnt.load(Relaxed) < DISCONNECTED + 1024 {
675706
return false
676707
}
677708

678709
(*inner).queue.push(t);
679710
match (*inner).packet.increment() {
680-
DISCONNECTED => {} // oh well, we tried
681711
-1 => { (*inner).packet.wakeup(true); }
712+
713+
// In this case, we have possibly failed to send our data, and
714+
// we need to consider re-popping the data in order to fully
715+
// destroy it. We must arbitrate among the multiple senders,
716+
// however, because the queues that we're using are
717+
// single-consumer queues. In order to do this, all exiting
718+
// pushers will use an atomic count in order to count those
719+
// flowing through. Pushers who see 0 are required to drain as
720+
// much as possible, and then can only exit when they are the
721+
// only pusher (otherwise they must try again).
722+
n if n < DISCONNECTED + 1024 => {
723+
if (*inner).packet.sender_drain.fetch_add(1, SeqCst) == 0 {
724+
loop {
725+
// drain the queue
726+
loop {
727+
match (*inner).queue.pop() {
728+
mpsc::Data(..) => {}
729+
mpsc::Empty => break,
730+
mpsc::Inconsistent => Thread::yield_now(),
731+
}
732+
}
733+
// maybe we're done, if we're not the last ones
734+
// here, then we need to go try again.
735+
if (*inner).packet.sender_drain.compare_and_swap(
736+
1, 0, SeqCst) == 1 {
737+
break
738+
}
739+
}
740+
741+
// At this point, there may still be data on the queue,
742+
// but only if the count hasn't been incremented and
743+
// some other sender hasn't finished pushing data just
744+
// yet.
745+
}
746+
}
747+
682748
n => {
683749
if n > 0 && n % RESCHED_FREQ == 0 {
684750
let task: ~Task = Local::take();
@@ -763,8 +829,8 @@ impl<T: Send> Port<T> {
763829
}
764830

765831
let ret = match this.inner {
766-
Single(ref mut arc) => unsafe { (*arc.get()).queue.pop() },
767-
Shared(ref mut arc) => match unsafe { (*arc.get()).queue.pop() } {
832+
SingleInner(ref mut arc) => unsafe { (*arc.get()).queue.pop() },
833+
SharedInner(ref mut arc) => match unsafe { (*arc.get()).queue.pop() } {
768834
mpsc::Data(t) => Some(t),
769835
mpsc::Empty => None,
770836

@@ -868,10 +934,69 @@ impl<'a, T: Send> Iterator<T> for PortIterator<'a, T> {
868934
#[unsafe_destructor]
869935
impl<T: Send> Drop for Port<T> {
870936
fn drop(&mut self) {
871-
// All we need to do is store that we're disconnected. If the channel
872-
// half has already disconnected, then we'll just deallocate everything
873-
// when the shared packet is deallocated.
874-
self.inner.packet().cnt.store(DISCONNECTED, SeqCst);
937+
// Dropping a port seems like a fairly trivial thing. In theory all we
938+
// need to do is flag that we're disconnected and then everything else
939+
// can take over (we don't have anyone to wake up).
940+
//
941+
// The catch for Ports is that we want to drop the entire contents of
942+
// the queue. There are multiple reasons for having this property, the
943+
// largest of which is that if another port is waiting in this channel
944+
// (but not received yet), then waiting on that port will cause a
945+
// deadlock.
946+
//
947+
// So if we accept that we must now destroy the entire contents of the
948+
// queue, this code may make a bit more sense. The tricky part is that
949+
// we can't let any in-flight sends go un-dropped, we have to make sure
950+
// *everything* is dropped and nothing new will come onto the channel.
951+
952+
// The first thing we do is set a flag saying that we're done for. All
953+
// sends are gated on this flag, so we're immediately guaranteed that
954+
// there are a bounded number of active sends that we'll have to deal
955+
// with.
956+
self.inner.packet().go_home.store(true, Relaxed);
957+
958+
// Now that we're guaranteed to deal with a bounded number of senders,
959+
// we need to drain the queue. This draining process happens atomically
960+
// with respect to the "count" of the channel. If the count is nonzero
961+
// (with steals taken into account), then there must be data on the
962+
// channel. In this case we drain everything and then try again. We will
963+
// continue to fail while active senders send data while we're dropping
964+
// data, but eventually we're guaranteed to break out of this loop
965+
// (because there is a bounded number of senders).
966+
let mut steals = self.inner.packet().steals;
967+
while {
968+
let cnt = self.inner.packet().cnt.compare_and_swap(
969+
steals, DISCONNECTED, SeqCst);
970+
cnt != DISCONNECTED && cnt != steals
971+
} {
972+
match self.inner {
973+
SingleInner(ref mut arc) => {
974+
loop {
975+
match unsafe { (*arc.get()).queue.pop() } {
976+
Some(..) => { steals += 1; }
977+
None => break
978+
}
979+
}
980+
}
981+
SharedInner(ref mut arc) => {
982+
// See the discussion in 'try_recv_inc' for why we yield
983+
// control of this thread.
984+
loop {
985+
match unsafe { (*arc.get()).queue.pop() } {
986+
mpsc::Data(..) => { steals += 1; }
987+
mpsc::Empty => break,
988+
mpsc::Inconsistent => Thread::yield_now(),
989+
}
990+
}
991+
}
992+
}
993+
}
994+
995+
// At this point in time, we have gated all future senders from sending,
996+
// and we have flagged the channel as being disconnected. The senders
997+
// still have some responsibility, however, because some sends may not
998+
// complete until after we flag the disconnection. There are more
999+
// details in the sending methods that see DISCONNECTED
8751000
}
8761001
}
8771002

@@ -1322,4 +1447,24 @@ mod test {
13221447
drop(chan);
13231448
assert_eq!(count_port.recv(), 4);
13241449
})
1450+
1451+
test!(fn pending_dropped() {
1452+
let (a, b) = Chan::new();
1453+
let (c, d) = Chan::<()>::new();
1454+
b.send(d);
1455+
drop(a);
1456+
c.recv();
1457+
} #[should_fail])
1458+
1459+
test!(fn pending_dropped_stress() {
1460+
let (a, b) = Chan::new();
1461+
let (c, d) = Chan::new();
1462+
do spawn {
1463+
while b.try_send(~3) { continue }
1464+
d.send(());
1465+
}
1466+
a.recv();
1467+
drop(a);
1468+
c.recv();
1469+
})
13251470
}

src/libstd/sync/mpsc_queue.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -156,22 +156,23 @@ impl<T: Send> Drop for Queue<T> {
156156
mod tests {
157157
use prelude::*;
158158

159-
use super::{Queue, Data, Empty, Inconsistent};
160159
use native;
160+
use super::{Queue, Data, Empty, Inconsistent};
161+
use sync::arc::UnsafeArc;
161162

162163
#[test]
163164
fn test_full() {
164165
let mut q = Queue::new();
165-
p.push(~1);
166-
p.push(~2);
166+
q.push(~1);
167+
q.push(~2);
167168
}
168169

169170
#[test]
170171
fn test() {
171172
let nthreads = 8u;
172173
let nmsgs = 1000u;
173174
let mut q = Queue::new();
174-
match c.pop() {
175+
match q.pop() {
175176
Empty => {}
176177
Inconsistent | Data(..) => fail!()
177178
}

src/libstd/sync/spsc_queue.rs

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -193,17 +193,6 @@ impl<T: Send> Queue<T> {
193193
return ret;
194194
}
195195
}
196-
197-
/// Tests whether this queue is empty or not. Remember that there can only
198-
/// be one tester/popper, and also keep in mind that the answer returned
199-
/// from this is likely to change if it is `false`.
200-
pub fn is_empty(&self) -> bool {
201-
unsafe {
202-
let tail = self.tail;
203-
let next = (*tail).next.load(Acquire);
204-
return next.is_null();
205-
}
206-
}
207196
}
208197

209198
#[unsafe_destructor]
@@ -223,8 +212,9 @@ impl<T: Send> Drop for Queue<T> {
223212
#[cfg(test)]
224213
mod test {
225214
use prelude::*;
226-
use super::Queue;
227215
use native;
216+
use super::Queue;
217+
use sync::arc::UnsafeArc;
228218

229219
#[test]
230220
fn smoke() {
@@ -272,7 +262,6 @@ mod test {
272262
let (a, b) = UnsafeArc::new2(Queue::new(bound));
273263
let (port, chan) = Chan::new();
274264
do native::task::spawn {
275-
let mut c = c;
276265
for _ in range(0, 100000) {
277266
loop {
278267
match unsafe { (*b.get()).pop() } {

0 commit comments

Comments
 (0)