Skip to content

Commit 81fa1d4

Browse files
author
Tyler Neely
committed
Remove all of the dynamic control stuff for now and just use nonblocking sends + 1 second receive timeouts
1 parent ab613a5 commit 81fa1d4

File tree

1 file changed

+10
-42
lines changed

1 file changed

+10
-42
lines changed

src/task/blocking.rs

Lines changed: 10 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@ use lazy_static::lazy_static;
1414
use crate::utils::abort_on_panic;
1515

1616
const MAX_THREADS: u64 = 10_000;
17-
const MIN_WAIT_US: u64 = 10;
18-
const MAX_WAIT_US: u64 = 10_000;
19-
const WAIT_SPREAD: u64 = MAX_WAIT_US - MIN_WAIT_US;
2017

2118
static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
2219

@@ -52,8 +49,7 @@ lazy_static! {
5249

5350
// Create up to MAX_THREADS dynamic blocking task worker threads.
5451
// Dynamic threads will terminate themselves if they don't
55-
// receive any work after a timeout that scales down as the
56-
// total number of threads scales up.
52+
// receive any work after one second.
5753
fn maybe_create_another_blocking_thread() {
5854
// We use a `Relaxed` atomic operation because
5955
// it's just a heuristic, and would not lose correctness
@@ -63,20 +59,11 @@ fn maybe_create_another_blocking_thread() {
6359
return;
6460
}
6561

66-
// We want to give up earlier when we have more threads
67-
// to exert backpressure on the system submitting work
68-
// to do.
69-
let utilization_percent = (workers * 100) / MAX_THREADS;
70-
let relative_wait_limit = (WAIT_SPREAD * utilization_percent) / 100;
71-
72-
// higher utilization -> lower wait time
73-
let wait_limit_us = MAX_WAIT_US - relative_wait_limit;
74-
assert!(wait_limit_us >= MIN_WAIT_US);
75-
let wait_limit = Duration::from_micros(wait_limit_us);
76-
7762
thread::Builder::new()
7863
.name("async-blocking-driver-dynamic".to_string())
79-
.spawn(move || {
64+
.spawn(|| {
65+
let wait_limit = Duration::from_secs(1);
66+
8067
DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
8168
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
8269
abort_on_panic(|| task.run());
@@ -86,43 +73,24 @@ fn maybe_create_another_blocking_thread() {
8673
.expect("cannot start a dynamic thread driving blocking tasks");
8774
}
8875

89-
// Enqueues work, blocking on a threadpool for a certain amount of
90-
// time based on the number of worker threads currently active in
91-
// the system. If we cannot send our work to the pool after the
92-
// given timeout, we will attempt to increase the number of
93-
// worker threads active in the system, up to MAX_THREADS. The
94-
// timeout is dynamic, and when we have more threads we block
95-
// for longer before spinning up another thread for backpressure.
76+
// Enqueues work, attempting to send to the threadpool in a
77+
// nonblocking way and spinning up another worker thread if
78+
// there is not a thread ready to accept the work.
9679
fn schedule(t: async_task::Task<()>) {
97-
// We use a `Relaxed` atomic operation because
98-
// it's just a heuristic, and would not lose correctness
99-
// even if it's random.
100-
let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed);
101-
102-
// We want to block for longer when we have more threads to
103-
// exert backpressure on the system submitting work to do.
104-
let utilization_percent = (workers * 100) / MAX_THREADS;
105-
let relative_wait_limit = (WAIT_SPREAD * utilization_percent) / 100;
106-
107-
// higher utilization -> higher block time
108-
let wait_limit_us = MIN_WAIT_US + relative_wait_limit;
109-
assert!(wait_limit_us <= MAX_WAIT_US);
110-
let wait_limit = Duration::from_micros(wait_limit_us);
111-
112-
let first_try_result = POOL.sender.send_timeout(t, wait_limit);
80+
let first_try_result = POOL.sender.try_send(t);
11381
match first_try_result {
11482
Ok(()) => {
11583
// NICEEEE
11684
}
117-
Err(crossbeam::channel::SendTimeoutError::Timeout(t)) => {
85+
Err(crossbeam::channel::TrySendError::Full(t)) => {
11886
// We were not able to send to the channel within our
11987
// budget. Try to spin up another thread, and then
12088
// block without a time limit on the submission of
12189
// the task.
12290
maybe_create_another_blocking_thread();
12391
POOL.sender.send(t).unwrap()
12492
}
125-
Err(crossbeam::channel::SendTimeoutError::Disconnected(_)) => {
93+
Err(crossbeam::channel::TrySendError::Disconnected(_)) => {
12694
panic!(
12795
"unable to send to blocking threadpool \
12896
due to receiver disconnection"

0 commit comments

Comments
 (0)