Skip to content

Commit 1a0dfb3

Browse files
committed
Add a jobserver based concurrency limiter
1 parent 5896e5c commit 1a0dfb3

File tree

2 files changed

+155
-0
lines changed

2 files changed

+155
-0
lines changed

src/concurrency_limiter.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
use std::sync::{Arc, Condvar, Mutex};
2+
3+
use rustc_session::Session;
4+
5+
use jobserver::HelperThread;
6+
7+
pub(super) struct ConcurrencyLimiter {
8+
helper_thread: Option<HelperThread>,
9+
state: Arc<Mutex<state::ConcurrencyLimiterState>>,
10+
available_token_condvar: Arc<Condvar>,
11+
}
12+
13+
impl ConcurrencyLimiter {
14+
pub(super) fn new(sess: &Session, pending_jobs: usize) -> Self {
15+
let state = Arc::new(Mutex::new(state::ConcurrencyLimiterState::new(pending_jobs)));
16+
let available_token_condvar = Arc::new(Condvar::new());
17+
18+
let state_helper = state.clone();
19+
let available_token_condvar_helper = available_token_condvar.clone();
20+
let helper_thread = sess
21+
.jobserver
22+
.clone()
23+
.into_helper_thread(move |token| {
24+
let mut state = state_helper.lock().unwrap();
25+
state.add_new_token(token.unwrap());
26+
available_token_condvar_helper.notify_one();
27+
})
28+
.unwrap();
29+
ConcurrencyLimiter {
30+
helper_thread: Some(helper_thread),
31+
state,
32+
available_token_condvar: Arc::new(Condvar::new()),
33+
}
34+
}
35+
36+
pub(super) fn acquire(&mut self) -> ConcurrencyLimiterToken {
37+
let mut state = self.state.lock().unwrap();
38+
loop {
39+
state.assert_invariants();
40+
41+
if state.try_start_job() {
42+
return ConcurrencyLimiterToken {
43+
state: self.state.clone(),
44+
available_token_condvar: self.available_token_condvar.clone(),
45+
};
46+
}
47+
48+
self.helper_thread.as_mut().unwrap().request_token();
49+
state = self.available_token_condvar.wait(state).unwrap();
50+
}
51+
}
52+
}
53+
54+
impl Drop for ConcurrencyLimiter {
55+
fn drop(&mut self) {
56+
//
57+
self.helper_thread.take();
58+
59+
// Assert that all jobs have finished
60+
let state = Mutex::get_mut(Arc::get_mut(&mut self.state).unwrap()).unwrap();
61+
state.assert_done();
62+
}
63+
}
64+
65+
#[derive(Debug)]
66+
pub(super) struct ConcurrencyLimiterToken {
67+
state: Arc<Mutex<state::ConcurrencyLimiterState>>,
68+
available_token_condvar: Arc<Condvar>,
69+
}
70+
71+
impl Drop for ConcurrencyLimiterToken {
72+
fn drop(&mut self) {
73+
let mut state = self.state.lock().unwrap();
74+
state.job_finished();
75+
self.available_token_condvar.notify_one();
76+
}
77+
}
78+
79+
mod state {
80+
use jobserver::Acquired;
81+
82+
#[derive(Debug)]
83+
pub(super) struct ConcurrencyLimiterState {
84+
pending_jobs: usize,
85+
active_jobs: usize,
86+
87+
// None is used to represent the implicit token, Some to represent explicit tokens
88+
tokens: Vec<Option<Acquired>>,
89+
}
90+
91+
impl ConcurrencyLimiterState {
92+
pub(super) fn new(pending_jobs: usize) -> Self {
93+
ConcurrencyLimiterState { pending_jobs, active_jobs: 0, tokens: vec![None] }
94+
}
95+
96+
pub(super) fn assert_invariants(&self) {
97+
// There must be no excess active jobs
98+
assert!(self.active_jobs <= self.pending_jobs);
99+
100+
// There may not be more active jobs than there are tokens
101+
assert!(self.active_jobs <= self.tokens.len());
102+
}
103+
104+
pub(super) fn assert_done(&self) {
105+
assert_eq!(self.pending_jobs, 0);
106+
assert_eq!(self.active_jobs, 0);
107+
}
108+
109+
pub(super) fn add_new_token(&mut self, token: Acquired) {
110+
self.tokens.push(Some(token));
111+
self.drop_excess_capacity();
112+
}
113+
114+
pub(super) fn try_start_job(&mut self) -> bool {
115+
if self.active_jobs < self.tokens.len() {
116+
// Using existing token
117+
self.job_started();
118+
return true;
119+
}
120+
121+
false
122+
}
123+
124+
pub(super) fn job_started(&mut self) {
125+
self.assert_invariants();
126+
self.active_jobs += 1;
127+
self.drop_excess_capacity();
128+
self.assert_invariants();
129+
}
130+
131+
pub(super) fn job_finished(&mut self) {
132+
self.assert_invariants();
133+
self.pending_jobs -= 1;
134+
self.active_jobs -= 1;
135+
self.assert_invariants();
136+
self.drop_excess_capacity();
137+
self.assert_invariants();
138+
}
139+
140+
fn drop_excess_capacity(&mut self) {
141+
self.assert_invariants();
142+
if self.active_jobs == self.pending_jobs {
143+
// Drop all excess tokens
144+
self.tokens.truncate(std::cmp::max(self.active_jobs, 1));
145+
} else {
146+
// Keep some excess tokens to satisfy requests faster
147+
const MAX_EXTRA_CAPACITY: usize = 2;
148+
self.tokens.truncate(std::cmp::max(self.active_jobs + MAX_EXTRA_CAPACITY, 1));
149+
}
150+
self.assert_invariants();
151+
}
152+
}
153+
}

src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#![warn(unused_lifetimes)]
55
#![warn(unreachable_pub)]
66

7+
extern crate jobserver;
78
#[macro_use]
89
extern crate rustc_middle;
910
extern crate rustc_ast;
@@ -53,6 +54,7 @@ mod cast;
5354
mod codegen_i128;
5455
mod common;
5556
mod compiler_builtins;
57+
mod concurrency_limiter;
5658
mod config;
5759
mod constant;
5860
mod debuginfo;

0 commit comments

Comments
 (0)