Skip to content
This repository was archived by the owner on May 28, 2025. It is now read-only.

Commit 2924fd2

Browse files
committed
Implement custom QoS-aware thread pool
This code replaces the thread pool implementation we were using previously (from the `threadpool` crate). By making the thread pool aware of QoS, each job spawned on the thread pool can have a different QoS class. This commit also replaces every QoS class used previously with Default as a temporary measure so that each usage can be chosen deliberately.
1 parent f6e3a87 commit 2924fd2

File tree

14 files changed

+184
-93
lines changed

14 files changed

+184
-93
lines changed

Cargo.lock

Lines changed: 1 addition & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/flycheck/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl FlycheckHandle {
9090
) -> FlycheckHandle {
9191
let actor = FlycheckActor::new(id, sender, config, workspace_root);
9292
let (sender, receiver) = unbounded::<StateChange>();
93-
let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility)
93+
let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Default)
9494
.name("Flycheck".to_owned())
9595
.spawn(move || actor.run(receiver))
9696
.expect("failed to spawn thread");
@@ -409,7 +409,7 @@ impl CargoHandle {
409409

410410
let (sender, receiver) = unbounded();
411411
let actor = CargoActor::new(sender, stdout, stderr);
412-
let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Utility)
412+
let thread = stdx::thread::Builder::new(stdx::thread::QoSClass::Default)
413413
.name("CargoHandle".to_owned())
414414
.spawn(move || actor.run())
415415
.expect("failed to spawn thread");

crates/ide/src/prime_caches.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ pub(crate) fn parallel_prime_caches(
8181
let worker = prime_caches_worker.clone();
8282
let db = db.snapshot();
8383

84-
stdx::thread::Builder::new(stdx::thread::QoSClass::Utility)
84+
stdx::thread::Builder::new(stdx::thread::QoSClass::Default)
8585
.allow_leak(true)
8686
.spawn(move || Cancelled::catch(|| worker(db)))
8787
.expect("failed to spawn thread");

crates/rust-analyzer/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ oorandom = "11.1.3"
3131
rustc-hash = "1.1.0"
3232
serde_json = { workspace = true, features = ["preserve_order"] }
3333
serde.workspace = true
34-
threadpool = "1.8.1"
3534
rayon = "1.6.1"
3635
num_cpus = "1.15.0"
3736
mimalloc = { version = "0.1.30", default-features = false, optional = true }

crates/rust-analyzer/src/bin/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ fn try_main(flags: flags::RustAnalyzer) -> Result<()> {
8585
// will make actions like hitting enter in the editor slow.
8686
// rust-analyzer does not block the editor’s render loop,
8787
// so we don’t use User Interactive.
88-
with_extra_thread("LspServer", stdx::thread::QoSClass::UserInitiated, run_server)?;
88+
with_extra_thread("LspServer", stdx::thread::QoSClass::Default, run_server)?;
8989
}
9090
flags::RustAnalyzerCmd::Parse(cmd) => cmd.run()?,
9191
flags::RustAnalyzerCmd::Symbols(cmd) => cmd.run()?,

crates/rust-analyzer/src/dispatch.rs

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::{fmt, panic, thread};
44
use ide::Cancelled;
55
use lsp_server::ExtractError;
66
use serde::{de::DeserializeOwned, Serialize};
7+
use stdx::thread::QoSClass;
78

89
use crate::{
910
global_state::{GlobalState, GlobalStateSnapshot},
@@ -102,7 +103,7 @@ impl<'a> RequestDispatcher<'a> {
102103
None => return self,
103104
};
104105

105-
self.global_state.task_pool.handle.spawn({
106+
self.global_state.task_pool.handle.spawn(QoSClass::Default, {
106107
let world = self.global_state.snapshot();
107108
move || {
108109
let result = panic::catch_unwind(move || {
@@ -128,6 +129,44 @@ impl<'a> RequestDispatcher<'a> {
128129
&mut self,
129130
f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
130131
) -> &mut Self
132+
where
133+
R: lsp_types::request::Request + 'static,
134+
R::Params: DeserializeOwned + panic::UnwindSafe + Send + fmt::Debug,
135+
R::Result: Serialize,
136+
{
137+
self.on_with_qos::<R>(QoSClass::Default, f)
138+
}
139+
140+
/// Dispatches a latency-sensitive request onto the thread pool.
141+
pub(crate) fn on_latency_sensitive<R>(
142+
&mut self,
143+
f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
144+
) -> &mut Self
145+
where
146+
R: lsp_types::request::Request + 'static,
147+
R::Params: DeserializeOwned + panic::UnwindSafe + Send + fmt::Debug,
148+
R::Result: Serialize,
149+
{
150+
self.on_with_qos::<R>(QoSClass::Default, f)
151+
}
152+
153+
pub(crate) fn finish(&mut self) {
154+
if let Some(req) = self.req.take() {
155+
tracing::error!("unknown request: {:?}", req);
156+
let response = lsp_server::Response::new_err(
157+
req.id,
158+
lsp_server::ErrorCode::MethodNotFound as i32,
159+
"unknown request".to_string(),
160+
);
161+
self.global_state.respond(response);
162+
}
163+
}
164+
165+
fn on_with_qos<R>(
166+
&mut self,
167+
qos_class: QoSClass,
168+
f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
169+
) -> &mut Self
131170
where
132171
R: lsp_types::request::Request + 'static,
133172
R::Params: DeserializeOwned + panic::UnwindSafe + Send + fmt::Debug,
@@ -138,7 +177,7 @@ impl<'a> RequestDispatcher<'a> {
138177
None => return self,
139178
};
140179

141-
self.global_state.task_pool.handle.spawn({
180+
self.global_state.task_pool.handle.spawn(qos_class, {
142181
let world = self.global_state.snapshot();
143182
move || {
144183
let result = panic::catch_unwind(move || {
@@ -155,18 +194,6 @@ impl<'a> RequestDispatcher<'a> {
155194
self
156195
}
157196

158-
pub(crate) fn finish(&mut self) {
159-
if let Some(req) = self.req.take() {
160-
tracing::error!("unknown request: {:?}", req);
161-
let response = lsp_server::Response::new_err(
162-
req.id,
163-
lsp_server::ErrorCode::MethodNotFound as i32,
164-
"unknown request".to_string(),
165-
);
166-
self.global_state.respond(response);
167-
}
168-
}
169-
170197
fn parse<R>(&mut self) -> Option<(lsp_server::Request, R::Params, String)>
171198
where
172199
R: lsp_types::request::Request,

crates/rust-analyzer/src/handlers/notification.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ fn run_flycheck(state: &mut GlobalState, vfs_path: VfsPath) -> bool {
291291
}
292292
Ok(())
293293
};
294-
state.task_pool.handle.spawn_with_sender(move |_| {
294+
state.task_pool.handle.spawn_with_sender(stdx::thread::QoSClass::Default, move |_| {
295295
if let Err(e) = std::panic::catch_unwind(task) {
296296
tracing::error!("flycheck task panicked: {e:?}")
297297
}

crates/rust-analyzer/src/main_loop.rs

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ impl GlobalState {
397397
tracing::debug!(%cause, "will prime caches");
398398
let num_worker_threads = self.config.prime_caches_num_threads();
399399

400-
self.task_pool.handle.spawn_with_sender({
400+
self.task_pool.handle.spawn_with_sender(stdx::thread::QoSClass::Default, {
401401
let analysis = self.snapshot().analysis;
402402
move |sender| {
403403
sender.send(Task::PrimeCaches(PrimeCachesProgress::Begin)).unwrap();
@@ -678,7 +678,24 @@ impl GlobalState {
678678
.on_sync::<lsp_types::request::SelectionRangeRequest>(handlers::handle_selection_range)
679679
.on_sync::<lsp_ext::MatchingBrace>(handlers::handle_matching_brace)
680680
.on_sync::<lsp_ext::OnTypeFormatting>(handlers::handle_on_type_formatting)
681-
// All other request handlers:
681+
// We can’t run latency-sensitive request handlers which do semantic
682+
// analysis on the main thread because that would block other
683+
// requests. Instead, we run these request handlers on higher QoS
684+
// threads in the threadpool.
685+
.on_latency_sensitive::<lsp_types::request::Completion>(handlers::handle_completion)
686+
.on_latency_sensitive::<lsp_types::request::ResolveCompletionItem>(
687+
handlers::handle_completion_resolve,
688+
)
689+
.on_latency_sensitive::<lsp_types::request::SemanticTokensFullRequest>(
690+
handlers::handle_semantic_tokens_full,
691+
)
692+
.on_latency_sensitive::<lsp_types::request::SemanticTokensFullDeltaRequest>(
693+
handlers::handle_semantic_tokens_full_delta,
694+
)
695+
.on_latency_sensitive::<lsp_types::request::SemanticTokensRangeRequest>(
696+
handlers::handle_semantic_tokens_range,
697+
)
698+
// All other request handlers
682699
.on::<lsp_ext::FetchDependencyList>(handlers::fetch_dependency_list)
683700
.on::<lsp_ext::AnalyzerStatus>(handlers::handle_analyzer_status)
684701
.on::<lsp_ext::SyntaxTree>(handlers::handle_syntax_tree)
@@ -706,8 +723,6 @@ impl GlobalState {
706723
.on::<lsp_types::request::GotoTypeDefinition>(handlers::handle_goto_type_definition)
707724
.on_no_retry::<lsp_types::request::InlayHintRequest>(handlers::handle_inlay_hints)
708725
.on::<lsp_types::request::InlayHintResolveRequest>(handlers::handle_inlay_hints_resolve)
709-
.on::<lsp_types::request::Completion>(handlers::handle_completion)
710-
.on::<lsp_types::request::ResolveCompletionItem>(handlers::handle_completion_resolve)
711726
.on::<lsp_types::request::CodeLensRequest>(handlers::handle_code_lens)
712727
.on::<lsp_types::request::CodeLensResolve>(handlers::handle_code_lens_resolve)
713728
.on::<lsp_types::request::FoldingRangeRequest>(handlers::handle_folding_range)
@@ -725,15 +740,6 @@ impl GlobalState {
725740
.on::<lsp_types::request::CallHierarchyOutgoingCalls>(
726741
handlers::handle_call_hierarchy_outgoing,
727742
)
728-
.on::<lsp_types::request::SemanticTokensFullRequest>(
729-
handlers::handle_semantic_tokens_full,
730-
)
731-
.on::<lsp_types::request::SemanticTokensFullDeltaRequest>(
732-
handlers::handle_semantic_tokens_full_delta,
733-
)
734-
.on::<lsp_types::request::SemanticTokensRangeRequest>(
735-
handlers::handle_semantic_tokens_range,
736-
)
737743
.on::<lsp_types::request::WillRenameFiles>(handlers::handle_will_rename_files)
738744
.on::<lsp_ext::Ssr>(handlers::handle_ssr)
739745
.finish();
@@ -781,7 +787,7 @@ impl GlobalState {
781787
tracing::trace!("updating notifications for {:?}", subscriptions);
782788

783789
let snapshot = self.snapshot();
784-
self.task_pool.handle.spawn(move || {
790+
self.task_pool.handle.spawn(stdx::thread::QoSClass::Default, move || {
785791
let _p = profile::span("publish_diagnostics");
786792
let diagnostics = subscriptions
787793
.into_iter()

crates/rust-analyzer/src/reload.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ impl GlobalState {
185185
pub(crate) fn fetch_workspaces(&mut self, cause: Cause) {
186186
tracing::info!(%cause, "will fetch workspaces");
187187

188-
self.task_pool.handle.spawn_with_sender({
188+
self.task_pool.handle.spawn_with_sender(stdx::thread::QoSClass::Default, {
189189
let linked_projects = self.config.linked_projects();
190190
let detached_files = self.config.detached_files().to_vec();
191191
let cargo_config = self.config.cargo();
@@ -260,7 +260,7 @@ impl GlobalState {
260260
tracing::info!(%cause, "will fetch build data");
261261
let workspaces = Arc::clone(&self.workspaces);
262262
let config = self.config.cargo();
263-
self.task_pool.handle.spawn_with_sender(move |sender| {
263+
self.task_pool.handle.spawn_with_sender(stdx::thread::QoSClass::Default, move |sender| {
264264
sender.send(Task::FetchBuildData(BuildDataProgress::Begin)).unwrap();
265265

266266
let progress = {
@@ -280,7 +280,7 @@ impl GlobalState {
280280
let dummy_replacements = self.config.dummy_replacements().clone();
281281
let proc_macro_clients = self.proc_macro_clients.clone();
282282

283-
self.task_pool.handle.spawn_with_sender(move |sender| {
283+
self.task_pool.handle.spawn_with_sender(stdx::thread::QoSClass::Default, move |sender| {
284284
sender.send(Task::LoadProcMacros(ProcMacroProgress::Begin)).unwrap();
285285

286286
let dummy_replacements = &dummy_replacements;

crates/rust-analyzer/src/task_pool.rs

Lines changed: 11 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,42 @@
1-
//! A thin wrapper around `ThreadPool` to make sure that we join all things
2-
//! properly.
3-
use std::sync::{Arc, Barrier};
1+
//! A thin wrapper around [`stdx::thread::Pool`] which threads a sender through spawned jobs.
2+
//! It is used in [`crate::global_state::GlobalState`] throughout the main loop.
43
54
use crossbeam_channel::Sender;
5+
use stdx::thread::{Pool, QoSClass};
66

77
pub(crate) struct TaskPool<T> {
88
sender: Sender<T>,
9-
inner: threadpool::ThreadPool,
9+
pool: Pool,
1010
}
1111

1212
impl<T> TaskPool<T> {
1313
pub(crate) fn new_with_threads(sender: Sender<T>, threads: usize) -> TaskPool<T> {
14-
const STACK_SIZE: usize = 8 * 1024 * 1024;
15-
16-
let inner = threadpool::Builder::new()
17-
.thread_name("Worker".into())
18-
.thread_stack_size(STACK_SIZE)
19-
.num_threads(threads)
20-
.build();
21-
22-
// Set QoS of all threads in threadpool.
23-
let barrier = Arc::new(Barrier::new(threads + 1));
24-
for _ in 0..threads {
25-
let barrier = barrier.clone();
26-
inner.execute(move || {
27-
stdx::thread::set_current_thread_qos_class(stdx::thread::QoSClass::Utility);
28-
barrier.wait();
29-
});
30-
}
31-
barrier.wait();
32-
33-
TaskPool { sender, inner }
14+
TaskPool { sender, pool: Pool::new(threads) }
3415
}
3516

36-
pub(crate) fn spawn<F>(&mut self, task: F)
17+
pub(crate) fn spawn<F>(&mut self, qos_class: QoSClass, task: F)
3718
where
3819
F: FnOnce() -> T + Send + 'static,
3920
T: Send + 'static,
4021
{
41-
self.inner.execute({
22+
self.pool.spawn(qos_class, {
4223
let sender = self.sender.clone();
43-
move || {
44-
if stdx::thread::IS_QOS_AVAILABLE {
45-
debug_assert_eq!(
46-
stdx::thread::get_current_thread_qos_class(),
47-
Some(stdx::thread::QoSClass::Utility)
48-
);
49-
}
50-
51-
sender.send(task()).unwrap()
52-
}
24+
move || sender.send(task()).unwrap()
5325
})
5426
}
5527

56-
pub(crate) fn spawn_with_sender<F>(&mut self, task: F)
28+
pub(crate) fn spawn_with_sender<F>(&mut self, qos_class: QoSClass, task: F)
5729
where
5830
F: FnOnce(Sender<T>) + Send + 'static,
5931
T: Send + 'static,
6032
{
61-
self.inner.execute({
33+
self.pool.spawn(qos_class, {
6234
let sender = self.sender.clone();
6335
move || task(sender)
6436
})
6537
}
6638

6739
pub(crate) fn len(&self) -> usize {
68-
self.inner.queued_count()
69-
}
70-
}
71-
72-
impl<T> Drop for TaskPool<T> {
73-
fn drop(&mut self) {
74-
self.inner.join()
40+
self.pool.len()
7541
}
7642
}

crates/stdx/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ libc = "0.2.135"
1616
backtrace = { version = "0.3.65", optional = true }
1717
always-assert = { version = "0.1.2", features = ["log"] }
1818
jod-thread = "0.1.2"
19+
crossbeam-channel = "0.5.5"
1920
# Think twice before adding anything here
2021

2122
[target.'cfg(windows)'.dependencies]

crates/stdx/src/thread.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
1414
use std::fmt;
1515

16+
mod pool;
17+
pub use pool::Pool;
18+
1619
pub fn spawn<F, T>(qos_class: QoSClass, f: F) -> JoinHandle<T>
1720
where
1821
F: FnOnce() -> T,
@@ -152,6 +155,8 @@ pub enum QoSClass {
152155
/// performance, responsiveness and efficiency.
153156
Utility,
154157

158+
Default,
159+
155160
/// TLDR: tasks that block using your app
156161
///
157162
/// Contract:
@@ -229,6 +234,7 @@ mod imp {
229234
let c = match class {
230235
QoSClass::UserInteractive => libc::qos_class_t::QOS_CLASS_USER_INTERACTIVE,
231236
QoSClass::UserInitiated => libc::qos_class_t::QOS_CLASS_USER_INITIATED,
237+
QoSClass::Default => libc::qos_class_t::QOS_CLASS_DEFAULT,
232238
QoSClass::Utility => libc::qos_class_t::QOS_CLASS_UTILITY,
233239
QoSClass::Background => libc::qos_class_t::QOS_CLASS_BACKGROUND,
234240
};

0 commit comments

Comments
 (0)