Skip to content

Commit 443879f

Browse files
toffalettibrson
authored andcommitted
---
yaml --- r: 95674 b: refs/heads/dist-snap c: 5876e21 h: refs/heads/master v: v3
1 parent 97fd7c5 commit 443879f

File tree

4 files changed

+212
-57
lines changed

4 files changed

+212
-57
lines changed

[refs]

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ refs/heads/try: c274a6888410ce3e357e014568b43310ed787d36
66
refs/tags/release-0.1: 1f5c5126e96c79d22cb7862f75304136e204f105
77
refs/heads/ndm: f3868061cd7988080c30d6d5bf352a5a5fe2460b
88
refs/heads/try2: 147ecfdd8221e4a4d4e090486829a06da1e0ca3c
9-
refs/heads/dist-snap: bf0e6eb346665d779ad012f7def9b4948c5c6b26
9+
refs/heads/dist-snap: 5876e21225f0cf34e8caa40b18db56fa716e8c92
1010
refs/tags/release-0.2: c870d2dffb391e14efb05aa27898f1f6333a9596
1111
refs/tags/release-0.3: b5f0d0f648d9a6153664837026ba1be43d3e2503
1212
refs/heads/try3: 9387340aab40a73e8424c48fd42f0c521a4875c0

branches/dist-snap/src/libstd/rt/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,9 @@ mod message_queue;
139139
/// A mostly lock-free multi-producer, single consumer queue.
140140
mod mpsc_queue;
141141

142+
/// A lock-free multi-producer, multi-consumer bounded queue.
143+
mod mpmc_bounded_queue;
144+
142145
/// A parallel data structure for tracking sleeping schedulers.
143146
mod sleeper_list;
144147

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
/* Multi-producer/multi-consumer bounded queue
2+
* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
3+
* Redistribution and use in source and binary forms, with or without
4+
* modification, are permitted provided that the following conditions are met:
5+
*
6+
* 1. Redistributions of source code must retain the above copyright notice,
7+
* this list of conditions and the following disclaimer.
8+
*
9+
* 2. Redistributions in binary form must reproduce the above copyright
10+
* notice, this list of conditions and the following disclaimer in the
11+
* documentation and/or other materials provided with the distribution.
12+
*
13+
* THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED
14+
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
15+
* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
16+
* SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
17+
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
18+
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
19+
* PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
20+
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
21+
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
22+
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
23+
*
24+
* The views and conclusions contained in the software and documentation are
25+
* those of the authors and should not be interpreted as representing official
26+
* policies, either expressed or implied, of Dmitry Vyukov.
27+
*/
28+
29+
use unstable::sync::UnsafeArc;
30+
use unstable::atomics::{AtomicUint,Relaxed,Release,Acquire};
31+
use option::*;
32+
use vec;
33+
use clone::Clone;
34+
use kinds::Send;
35+
use num::{Exponential,Algebraic,Round};
36+
37+
struct Node<T> {
38+
sequence: AtomicUint,
39+
value: Option<T>,
40+
}
41+
42+
struct State<T> {
43+
buffer: ~[Node<T>],
44+
mask: uint,
45+
enqueue_pos: AtomicUint,
46+
dequeue_pos: AtomicUint,
47+
}
48+
49+
struct Queue<T> {
50+
priv state: UnsafeArc<State<T>>,
51+
}
52+
53+
impl<T: Send> State<T> {
54+
fn with_capacity(capacity: uint) -> State<T> {
55+
let capacity = if capacity < 2 || (capacity & (capacity - 1)) != 0 {
56+
// use next power of 2 as capacity
57+
2f64.pow(&((capacity as f64).log2().floor()+1f64)) as uint
58+
} else {
59+
capacity
60+
};
61+
let buffer = do vec::from_fn(capacity) |i:uint| {
62+
Node{sequence:AtomicUint::new(i),value:None}
63+
};
64+
State{
65+
buffer: buffer,
66+
mask: capacity-1,
67+
enqueue_pos: AtomicUint::new(0),
68+
dequeue_pos: AtomicUint::new(0),
69+
}
70+
}
71+
72+
fn push(&mut self, value: T) -> bool {
73+
let mask = self.mask;
74+
let mut pos = self.enqueue_pos.load(Relaxed);
75+
loop {
76+
let node = &mut self.buffer[pos & mask];
77+
let seq = node.sequence.load(Acquire);
78+
let diff: int = seq as int - pos as int;
79+
80+
if diff == 0 {
81+
let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
82+
if enqueue_pos == pos {
83+
node.value = Some(value);
84+
node.sequence.store(pos+1, Release);
85+
break
86+
} else {
87+
pos = enqueue_pos;
88+
}
89+
} else if (diff < 0) {
90+
return false
91+
} else {
92+
pos = self.enqueue_pos.load(Relaxed);
93+
}
94+
}
95+
true
96+
}
97+
98+
fn pop(&mut self) -> Option<T> {
99+
let mask = self.mask;
100+
let mut pos = self.dequeue_pos.load(Relaxed);
101+
loop {
102+
let node = &mut self.buffer[pos & mask];
103+
let seq = node.sequence.load(Acquire);
104+
let diff: int = seq as int - (pos + 1) as int;
105+
if diff == 0 {
106+
let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
107+
if dequeue_pos == pos {
108+
let value = node.value.take();
109+
node.sequence.store(pos + mask + 1, Release);
110+
return value
111+
} else {
112+
pos = dequeue_pos;
113+
}
114+
} else if diff < 0 {
115+
return None
116+
} else {
117+
pos = self.dequeue_pos.load(Relaxed);
118+
}
119+
}
120+
}
121+
}
122+
123+
impl<T: Send> Queue<T> {
124+
pub fn with_capacity(capacity: uint) -> Queue<T> {
125+
Queue{
126+
state: UnsafeArc::new(State::with_capacity(capacity))
127+
}
128+
}
129+
130+
pub fn push(&mut self, value: T) -> bool {
131+
unsafe { (*self.state.get()).push(value) }
132+
}
133+
134+
pub fn pop(&mut self) -> Option<T> {
135+
unsafe { (*self.state.get()).pop() }
136+
}
137+
}
138+
139+
impl<T: Send> Clone for Queue<T> {
140+
fn clone(&self) -> Queue<T> {
141+
Queue {
142+
state: self.state.clone()
143+
}
144+
}
145+
}
146+
147+
#[cfg(test)]
148+
mod tests {
149+
use prelude::*;
150+
use option::*;
151+
use task;
152+
use comm;
153+
use super::Queue;
154+
155+
#[test]
156+
fn test() {
157+
let nthreads = 8u;
158+
let nmsgs = 1000u;
159+
let mut q = Queue::with_capacity(nthreads*nmsgs);
160+
assert_eq!(None, q.pop());
161+
162+
for _ in range(0, nthreads) {
163+
let (port, chan) = comm::stream();
164+
chan.send(q.clone());
165+
do task::spawn_sched(task::SingleThreaded) {
166+
let mut q = port.recv();
167+
for i in range(0, nmsgs) {
168+
assert!(q.push(i));
169+
}
170+
}
171+
}
172+
173+
let mut completion_ports = ~[];
174+
for _ in range(0, nthreads) {
175+
let (completion_port, completion_chan) = comm::stream();
176+
completion_ports.push(completion_port);
177+
let (port, chan) = comm::stream();
178+
chan.send(q.clone());
179+
do task::spawn_sched(task::SingleThreaded) {
180+
let mut q = port.recv();
181+
let mut i = 0u;
182+
loop {
183+
match q.pop() {
184+
None => {},
185+
Some(_) => {
186+
i += 1;
187+
if i == nmsgs { break }
188+
}
189+
}
190+
}
191+
completion_chan.send(i);
192+
}
193+
}
194+
195+
for completion_port in completion_ports.iter() {
196+
assert_eq!(nmsgs, completion_port.recv());
197+
}
198+
}
199+
}

branches/dist-snap/src/libstd/rt/sleeper_list.rs

Lines changed: 9 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -11,84 +11,37 @@
1111
//! Maintains a shared list of sleeping schedulers. Schedulers
1212
//! use this to wake each other up.
1313
14-
use container::Container;
15-
use vec::OwnedVector;
16-
use option::{Option, Some, None};
17-
use cell::Cell;
18-
use unstable::sync::{UnsafeArc, LittleLock};
1914
use rt::sched::SchedHandle;
15+
use rt::mpmc_bounded_queue::Queue;
16+
use option::*;
2017
use clone::Clone;
2118

2219
pub struct SleeperList {
23-
priv state: UnsafeArc<State>
24-
}
25-
26-
struct State {
27-
count: uint,
28-
stack: ~[SchedHandle],
29-
lock: LittleLock
20+
priv q: Queue<SchedHandle>,
3021
}
3122

3223
impl SleeperList {
3324
pub fn new() -> SleeperList {
34-
SleeperList {
35-
state: UnsafeArc::new(State {
36-
count: 0,
37-
stack: ~[],
38-
lock: LittleLock::new()
39-
})
40-
}
25+
SleeperList{q: Queue::with_capacity(8*1024)}
4126
}
4227

43-
pub fn push(&mut self, handle: SchedHandle) {
44-
let handle = Cell::new(handle);
45-
unsafe {
46-
let state = self.state.get();
47-
do (*state).lock.lock {
48-
(*state).count += 1;
49-
(*state).stack.push(handle.take());
50-
}
51-
}
28+
pub fn push(&mut self, value: SchedHandle) {
29+
assert!(self.q.push(value))
5230
}
5331

5432
pub fn pop(&mut self) -> Option<SchedHandle> {
55-
unsafe {
56-
let state = self.state.get();
57-
do (*state).lock.lock {
58-
if !(*state).stack.is_empty() {
59-
(*state).count -= 1;
60-
Some((*state).stack.pop())
61-
} else {
62-
None
63-
}
64-
}
65-
}
33+
self.q.pop()
6634
}
6735

68-
/// A pop that may sometimes miss enqueued elements, but is much faster
69-
/// to give up without doing any synchronization
7036
pub fn casual_pop(&mut self) -> Option<SchedHandle> {
71-
unsafe {
72-
let state = self.state.get();
73-
// NB: Unsynchronized check
74-
if (*state).count == 0 { return None; }
75-
do (*state).lock.lock {
76-
if !(*state).stack.is_empty() {
77-
// NB: count is also protected by the lock
78-
(*state).count -= 1;
79-
Some((*state).stack.pop())
80-
} else {
81-
None
82-
}
83-
}
84-
}
37+
self.q.pop()
8538
}
8639
}
8740

8841
impl Clone for SleeperList {
8942
fn clone(&self) -> SleeperList {
9043
SleeperList {
91-
state: self.state.clone()
44+
q: self.q.clone()
9245
}
9346
}
9447
}

0 commit comments

Comments
 (0)