Skip to content

Commit 7165128

Browse files
committed
Add a Worker type which allows executing code in the background
1 parent b711179 commit 7165128

File tree

2 files changed

+171
-0
lines changed

2 files changed

+171
-0
lines changed

src/librustc_data_structures/sync.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ use std::marker::PhantomData;
2323
use std::ops::{Deref, DerefMut};
2424
use crate::owning_ref::{Erased, OwningRef};
2525

26+
pub mod worker;
27+
2628
pub fn serial_join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
2729
where A: FnOnce() -> RA,
2830
B: FnOnce() -> RB
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
use super::{Lrc, Lock};
2+
3+
pub trait Worker: super::Send {
4+
type Message: super::Send;
5+
type Result: super::Send;
6+
7+
fn message(&mut self, msg: Self::Message);
8+
fn complete(self) -> Self::Result;
9+
}
10+
11+
pub use executor::WorkerExecutor;
12+
13+
#[cfg(parallel_compiler)]
14+
mod executor {
15+
use super::*;
16+
use crate::jobserver;
17+
use parking_lot::Condvar;
18+
use std::mem;
19+
20+
struct WorkerQueue<T: Worker> {
21+
scheduled: bool,
22+
complete: bool,
23+
messages: Vec<T::Message>,
24+
result: Option<T::Result>,
25+
}
26+
27+
/// Allows executing a worker on any Rayon thread,
28+
/// sending it messages and waiting for it to complete its computation.
29+
pub struct WorkerExecutor<T: Worker> {
30+
queue: Lock<WorkerQueue<T>>,
31+
worker: Lock<Option<T>>,
32+
#[cfg(parallel_compiler)]
33+
cond_var: Condvar,
34+
}
35+
36+
impl<T: Worker> WorkerExecutor<T> {
37+
pub fn new(worker: T) -> Self {
38+
WorkerExecutor {
39+
queue: Lock::new(WorkerQueue {
40+
scheduled: false,
41+
complete: false,
42+
messages: Vec::new(),
43+
result: None,
44+
}),
45+
worker: Lock::new(Some(worker)),
46+
#[cfg(parallel_compiler)]
47+
cond_var: Condvar::new(),
48+
}
49+
}
50+
51+
fn try_run_worker(&self) {
52+
if let Some(mut worker) = self.worker.try_lock() {
53+
self.run_worker(&mut *worker);
54+
}
55+
}
56+
57+
fn run_worker(&self, worker: &mut Option<T>) {
58+
let worker_ref = if let Some(worker_ref) = worker.as_mut() {
59+
worker_ref
60+
} else {
61+
return
62+
};
63+
loop {
64+
let msgs = {
65+
let mut queue = self.queue.lock();
66+
let msgs = mem::replace(&mut queue.messages, Vec::new());
67+
if msgs.is_empty() {
68+
queue.scheduled = false;
69+
if queue.complete {
70+
queue.result = Some(worker.take().unwrap().complete());
71+
self.cond_var.notify_all();
72+
}
73+
break;
74+
}
75+
msgs
76+
};
77+
for msg in msgs {
78+
worker_ref.message(msg);
79+
}
80+
}
81+
}
82+
83+
pub fn complete(&self) -> T::Result {
84+
let mut queue = self.queue.lock();
85+
assert!(!queue.complete);
86+
queue.complete = true;
87+
if !queue.scheduled {
88+
// The worker is not scheduled to run, just run it on the current thread.
89+
queue.scheduled = true;
90+
mem::drop(queue);
91+
self.run_worker(&mut *self.worker.lock());
92+
queue = self.queue.lock();
93+
} else if let Some(mut worker) = self.worker.try_lock() {
94+
// Try to run the worker on the current thread.
95+
// It was scheduled to run, but it may not have started yet.
96+
// If we are using a single thread, it may never start at all.
97+
mem::drop(queue);
98+
self.run_worker(&mut *worker);
99+
queue = self.queue.lock();
100+
} else {
101+
// The worker must be running on some other thread,
102+
// and will eventually look at the queue again, since queue.scheduled is true.
103+
// Wait for it.
104+
105+
#[cfg(parallel_compiler)]
106+
{
107+
// Wait for the result
108+
jobserver::release_thread();
109+
self.cond_var.wait(&mut queue);
110+
jobserver::acquire_thread();
111+
}
112+
}
113+
queue.result.take().unwrap()
114+
}
115+
116+
fn queue_message(&self, msg: T::Message) -> bool {
117+
let mut queue = self.queue.lock();
118+
queue.messages.push(msg);
119+
let was_scheduled = queue.scheduled;
120+
if !was_scheduled {
121+
queue.scheduled = true;
122+
}
123+
was_scheduled
124+
}
125+
126+
pub fn message_in_pool(self: &Lrc<Self>, msg: T::Message)
127+
where
128+
T: 'static
129+
{
130+
if !self.queue_message(msg) {
131+
let this = self.clone();
132+
#[cfg(parallel_compiler)]
133+
rayon::spawn(move || this.try_run_worker());
134+
#[cfg(not(parallel_compiler))]
135+
this.try_run_worker();
136+
}
137+
}
138+
}
139+
}
140+
141+
#[cfg(not(parallel_compiler))]
142+
mod executor {
143+
use super::*;
144+
145+
pub struct WorkerExecutor<T: Worker> {
146+
worker: Lock<Option<T>>,
147+
}
148+
149+
impl<T: Worker> WorkerExecutor<T> {
150+
pub fn new(worker: T) -> Self {
151+
WorkerExecutor {
152+
worker: Lock::new(Some(worker)),
153+
}
154+
}
155+
156+
#[inline]
157+
pub fn complete(&self) -> T::Result {
158+
self.worker.lock().take().unwrap().complete()
159+
}
160+
161+
#[inline]
162+
pub fn message_in_pool(self: &Lrc<Self>, msg: T::Message)
163+
where
164+
T: 'static
165+
{
166+
self.worker.lock().as_mut().unwrap().message(msg);
167+
}
168+
}
169+
}

0 commit comments

Comments
 (0)