3
3
use std:: fmt;
4
4
use std:: future:: Future ;
5
5
use std:: pin:: Pin ;
6
+ use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
6
7
use std:: task:: { Context , Poll } ;
7
8
use std:: thread;
9
+ use std:: time:: Duration ;
8
10
9
- use crossbeam:: channel:: { unbounded , Receiver , Sender } ;
11
+ use crossbeam:: channel:: { bounded , Receiver , Sender } ;
10
12
use lazy_static:: lazy_static;
11
13
12
14
use crate :: utils:: abort_on_panic;
13
15
16
+ const MAX_THREADS : u64 = 10_000 ;
17
+ const MIN_WAIT_MS : u64 = 1 ;
18
+ const MAX_WAIT_MS : u64 = 100 ;
19
+ const WAIT_SPREAD : u64 = MAX_WAIT_MS - MIN_WAIT_MS ;
20
+
21
+ static DYNAMIC_THREAD_COUNT : AtomicU64 = AtomicU64 :: new ( 0 ) ;
22
+
14
23
struct Pool {
15
24
sender : Sender < async_task:: Task < ( ) > > ,
16
25
receiver : Receiver < async_task:: Task < ( ) > > ,
@@ -29,11 +38,90 @@ lazy_static! {
29
38
. expect( "cannot start a thread driving blocking tasks" ) ;
30
39
}
31
40
32
- let ( sender, receiver) = unbounded( ) ;
41
+ // We want to bound the work queue to make it more
42
+ // suitable as a backpressure mechanism.
43
+ let ( sender, receiver) = bounded( MAX_THREADS as usize ) ;
33
44
Pool { sender, receiver }
34
45
} ;
35
46
}
36
47
48
+ // Create up to 10,000 dynamic blocking task worker threads.
49
+ // Dynamic threads will terminate themselves if they don't
50
+ // receive any work after a timeout that scales down as the
51
+ // total number of threads scales up.
52
+ fn maybe_create_another_blocking_thread ( ) {
53
+ let workers = DYNAMIC_THREAD_COUNT . load ( Ordering :: Relaxed ) ;
54
+ if workers >= MAX_THREADS {
55
+ return ;
56
+ }
57
+
58
+ // We want to give up earlier when we have more threads
59
+ // to exert backpressure on the system submitting work
60
+ // to do. We use a `Relaxed` atomic operation because
61
+ // it's just a heuristic, and would not lose correctness
62
+ // even if it's random.
63
+ let utilization_percent = ( workers * 100 ) / MAX_THREADS ;
64
+ let relative_wait_limit = ( WAIT_SPREAD * utilization_percent) / 100 ;
65
+
66
+ // higher utilization -> lower wait time
67
+ let wait_limit_ms = MAX_WAIT_MS - relative_wait_limit;
68
+ assert ! ( wait_limit_ms >= MIN_WAIT_MS ) ;
69
+ let wait_limit = Duration :: from_millis ( wait_limit_ms) ;
70
+
71
+ thread:: Builder :: new ( )
72
+ . name ( "async-blocking-driver-dynamic" . to_string ( ) )
73
+ . spawn ( move || {
74
+ DYNAMIC_THREAD_COUNT . fetch_add ( 1 , Ordering :: Relaxed ) ;
75
+ while let Ok ( task) = POOL . receiver . recv_timeout ( wait_limit) {
76
+ abort_on_panic ( || task. run ( ) ) ;
77
+ }
78
+ DYNAMIC_THREAD_COUNT . fetch_sub ( 1 , Ordering :: Relaxed ) ;
79
+ } )
80
+ . expect ( "cannot start a dynamic thread driving blocking tasks" ) ;
81
+ }
82
+
83
+ // Enqueues work, blocking on a threadpool for a certain amount of
84
+ // time based on the number of worker threads currently active in
85
+ // the system. If we cannot send our work to the pool after the
86
+ // given timeout, we will attempt to increase the number of
87
+ // worker threads active in the system, up to MAX_THREADS. The
88
+ // timeout is dynamic, and when we have more threads we block
89
+ // for longer before spinning up another thread for backpressure.
90
+ fn schedule ( t : async_task:: Task < ( ) > ) {
91
+ let workers = DYNAMIC_THREAD_COUNT . load ( Ordering :: Relaxed ) ;
92
+
93
+ // We want to block for longer when we have more threads to
94
+ // exert backpressure on the system submitting work to do.
95
+ let utilization_percent = ( workers * 100 ) / MAX_THREADS ;
96
+ let relative_wait_limit = ( WAIT_SPREAD * utilization_percent) / 100 ;
97
+
98
+ // higher utilization -> higher block time
99
+ let wait_limit_ms = MIN_WAIT_MS + relative_wait_limit;
100
+ assert ! ( wait_limit_ms <= MAX_WAIT_MS ) ;
101
+ let wait_limit = Duration :: from_millis ( wait_limit_ms) ;
102
+
103
+ let first_try_result = POOL . sender . send_timeout ( t, wait_limit) ;
104
+ match first_try_result {
105
+ Ok ( ( ) ) => {
106
+ // NICEEEE
107
+ }
108
+ Err ( crossbeam:: channel:: SendTimeoutError :: Timeout ( t) ) => {
109
+ // We were not able to send to the channel within our
110
+ // budget. Try to spin up another thread, and then
111
+ // block without a time limit on the submission of
112
+ // the task.
113
+ maybe_create_another_blocking_thread ( ) ;
114
+ POOL . sender . send ( t) . unwrap ( )
115
+ }
116
+ Err ( crossbeam:: channel:: SendTimeoutError :: Disconnected ( _) ) => {
117
+ panic ! (
118
+ "unable to send to blocking threadpool \
119
+ due to receiver disconnection"
120
+ ) ;
121
+ }
122
+ }
123
+ }
124
+
37
125
/// Spawns a blocking task.
38
126
///
39
127
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
42
130
F : Future < Output = R > + Send + ' static ,
43
131
R : Send + ' static ,
44
132
{
45
- let schedule = |t| POOL . sender . send ( t) . unwrap ( ) ;
46
133
let ( task, handle) = async_task:: spawn ( future, schedule, ( ) ) ;
47
134
task. schedule ( ) ;
48
135
JoinHandle ( handle)
0 commit comments