Skip to content

Minor Chan tweaks #11413

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/libgreen/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use task::GreenTask;

mod macros;
mod simple;
mod message_queue;

pub mod basic;
pub mod context;
Expand Down
61 changes: 61 additions & 0 deletions src/libgreen/message_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright 2014 The Rust Project Developers. See the COPYRIGHT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason this is bottled up in libgreen? I feel like this would be more generally useful as the safe layer on top of the Queue types.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can certainly add convenience wrappers if the need arises, but right now I don't imagine these queues will be widely used, and we should probably make them semi-private or unstable anyway.

// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

use mpsc = std::sync::mpsc_queue;
use std::sync::arc::UnsafeArc;

pub enum PopResult<T> {
Inconsistent,
Empty,
Data(T),
}

pub fn queue<T: Send>() -> (Consumer<T>, Producer<T>) {
let (a, b) = UnsafeArc::new2(mpsc::Queue::new());
(Consumer { inner: a }, Producer { inner: b })
}

pub struct Producer<T> {
priv inner: UnsafeArc<mpsc::Queue<T>>,
}

pub struct Consumer<T> {
priv inner: UnsafeArc<mpsc::Queue<T>>,
}

impl<T: Send> Consumer<T> {
pub fn pop(&mut self) -> PopResult<T> {
match unsafe { (*self.inner.get()).pop() } {
mpsc::Inconsistent => Inconsistent,
mpsc::Empty => Empty,
mpsc::Data(t) => Data(t),
}
}

pub fn casual_pop(&mut self) -> Option<T> {
match unsafe { (*self.inner.get()).pop() } {
mpsc::Inconsistent => None,
mpsc::Empty => None,
mpsc::Data(t) => Some(t),
}
}
}

impl<T: Send> Producer<T> {
pub fn push(&mut self, t: T) {
unsafe { (*self.inner.get()).push(t); }
}
}

impl<T: Send> Clone for Producer<T> {
fn clone(&self) -> Producer<T> {
Producer { inner: self.inner.clone() }
}
}
16 changes: 8 additions & 8 deletions src/libgreen/sched.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ use std::rt::task::Task;
use std::sync::deque;
use std::unstable::mutex::Mutex;
use std::unstable::raw;
use mpsc = std::sync::mpsc_queue;

use TaskState;
use context::Context;
use coroutine::Coroutine;
use sleeper_list::SleeperList;
use stack::StackPool;
use task::{TypeSched, GreenTask, HomeSched, AnySched};
use msgq = message_queue;

/// A scheduler is responsible for coordinating the execution of Tasks
/// on a single thread. The scheduler runs inside a slightly modified
Expand All @@ -47,9 +47,9 @@ pub struct Scheduler {
/// The queue of incoming messages from other schedulers.
/// These are enqueued by SchedHandles after which a remote callback
/// is triggered to handle the message.
message_queue: mpsc::Consumer<SchedMessage, ()>,
message_queue: msgq::Consumer<SchedMessage>,
/// Producer used to clone sched handles from
message_producer: mpsc::Producer<SchedMessage, ()>,
message_producer: msgq::Producer<SchedMessage>,
/// A shared list of sleeping schedulers. We'll use this to wake
/// up schedulers when pushing work onto the work queue.
sleeper_list: SleeperList,
Expand Down Expand Up @@ -143,7 +143,7 @@ impl Scheduler {
state: TaskState)
-> Scheduler {

let (consumer, producer) = mpsc::queue(());
let (consumer, producer) = msgq::queue();
let mut sched = Scheduler {
pool_id: pool_id,
sleeper_list: sleeper_list,
Expand Down Expand Up @@ -215,7 +215,7 @@ impl Scheduler {

// Should not have any messages
let message = stask.sched.get_mut_ref().message_queue.pop();
rtassert!(match message { mpsc::Empty => true, _ => false });
rtassert!(match message { msgq::Empty => true, _ => false });

stask.task.get_mut_ref().destroyed = true;
}
Expand Down Expand Up @@ -340,8 +340,8 @@ impl Scheduler {
//
// I have chosen to take route #2.
match self.message_queue.pop() {
mpsc::Data(t) => Some(t),
mpsc::Empty | mpsc::Inconsistent => None
msgq::Data(t) => Some(t),
msgq::Empty | msgq::Inconsistent => None
}
};

Expand Down Expand Up @@ -849,7 +849,7 @@ pub enum SchedMessage {

pub struct SchedHandle {
priv remote: ~RemoteCallback,
priv queue: mpsc::Producer<SchedMessage, ()>,
priv queue: msgq::Producer<SchedMessage>,
sched_id: uint
}

Expand Down
51 changes: 26 additions & 25 deletions src/librustuv/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::cast;
use std::libc::{c_void, c_int};
use std::rt::task::BlockedTask;
use std::unstable::sync::LittleLock;
use std::sync::arc::UnsafeArc;
use mpsc = std::sync::mpsc_queue;

use async::AsyncWatcher;
Expand All @@ -39,46 +40,46 @@ enum Message {
struct State {
handle: *uvll::uv_async_t,
lock: LittleLock, // see comments in async_cb for why this is needed
queue: mpsc::Queue<Message>,
}

/// This structure is intended to be stored next to the event loop, and it is
/// used to create new `Queue` structures.
pub struct QueuePool {
priv producer: mpsc::Producer<Message, State>,
priv consumer: mpsc::Consumer<Message, State>,
priv queue: UnsafeArc<State>,
priv refcnt: uint,
}

/// This type is used to send messages back to the original event loop.
pub struct Queue {
priv queue: mpsc::Producer<Message, State>,
priv queue: UnsafeArc<State>,
}

extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
assert_eq!(status, 0);
let state: &mut QueuePool = unsafe {
let pool: &mut QueuePool = unsafe {
cast::transmute(uvll::get_data_for_uv_handle(handle))
};
let packet = unsafe { state.consumer.packet() };
let state: &mut State = unsafe { cast::transmute(pool.queue.get()) };

// Remember that there is no guarantee about how many times an async
// callback is called with relation to the number of sends, so process the
// entire queue in a loop.
loop {
match state.consumer.pop() {
match state.queue.pop() {
mpsc::Data(Task(task)) => {
task.wake().map(|t| t.reawaken(true));
}
mpsc::Data(Increment) => unsafe {
if state.refcnt == 0 {
uvll::uv_ref((*packet).handle);
if pool.refcnt == 0 {
uvll::uv_ref(state.handle);
}
state.refcnt += 1;
pool.refcnt += 1;
},
mpsc::Data(Decrement) => unsafe {
state.refcnt -= 1;
if state.refcnt == 0 {
uvll::uv_unref((*packet).handle);
pool.refcnt -= 1;
if pool.refcnt == 0 {
uvll::uv_unref(state.handle);
}
},
mpsc::Empty | mpsc::Inconsistent => break
Expand All @@ -99,24 +100,24 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
// If we acquire the mutex here, then we are guaranteed that there are no
// longer any senders which are holding on to their handles, so we can
// safely allow the event loop to exit.
if state.refcnt == 0 {
if pool.refcnt == 0 {
unsafe {
let _l = (*packet).lock.lock();
let _l = state.lock.lock();
}
}
}

impl QueuePool {
pub fn new(loop_: &mut Loop) -> ~QueuePool {
let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
let (c, p) = mpsc::queue(State {
let state = UnsafeArc::new(State {
handle: handle,
lock: LittleLock::new(),
queue: mpsc::Queue::new(),
});
let q = ~QueuePool {
producer: p,
consumer: c,
refcnt: 0,
queue: state,
};

unsafe {
Expand All @@ -132,23 +133,23 @@ impl QueuePool {
pub fn queue(&mut self) -> Queue {
unsafe {
if self.refcnt == 0 {
uvll::uv_ref((*self.producer.packet()).handle);
uvll::uv_ref((*self.queue.get()).handle);
}
self.refcnt += 1;
}
Queue { queue: self.producer.clone() }
Queue { queue: self.queue.clone() }
}

pub fn handle(&self) -> *uvll::uv_async_t {
unsafe { (*self.producer.packet()).handle }
unsafe { (*self.queue.get()).handle }
}
}

impl Queue {
pub fn push(&mut self, task: BlockedTask) {
self.queue.push(Task(task));
unsafe {
uvll::uv_async_send((*self.queue.packet()).handle);
(*self.queue.get()).queue.push(Task(task));
uvll::uv_async_send((*self.queue.get()).handle);
}
}
}
Expand All @@ -161,7 +162,7 @@ impl Clone for Queue {
// and if the queue is dropped later on it'll see the increment for the
// decrement anyway.
unsafe {
cast::transmute_mut(self).queue.push(Increment);
(*self.queue.get()).queue.push(Increment);
}
Queue { queue: self.queue.clone() }
}
Expand All @@ -172,9 +173,9 @@ impl Drop for Queue {
// See the comments in the async_cb function for why there is a lock
// that is acquired only on a drop.
unsafe {
let state = self.queue.packet();
let state = self.queue.get();
let _l = (*state).lock.lock();
self.queue.push(Decrement);
(*state).queue.push(Decrement);
uvll::uv_async_send((*state).handle);
}
}
Expand Down
Loading