Skip to content

Commit 2c33f08

Browse files
committed
adding monitor
1 parent 1fcb2d5 commit 2c33f08

File tree

3 files changed

+79
-1
lines changed

3 files changed

+79
-1
lines changed

src/rt/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::utils::abort_on_panic;
99
pub use reactor::{Reactor, Watcher};
1010
pub use runtime::Runtime;
1111

12+
mod monitor;
1213
mod reactor;
1314
mod runtime;
1415

@@ -19,5 +20,7 @@ pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
1920
.spawn(|| abort_on_panic(|| RUNTIME.run()))
2021
.expect("cannot start a runtime thread");
2122

23+
monitor::spawn_thread();
24+
2225
Runtime::new()
2326
});

src/rt/monitor.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use std::cell::Cell;
2+
use std::future::Future;
3+
use std::sync::{Arc, Mutex};
4+
use std::thread;
5+
use std::time::Duration;
6+
7+
use crate::rt::RUNTIME;
8+
use crate::task;
9+
use crate::utils::abort_on_panic;
10+
11+
fn spawn_monitor_task<F>(future: F)
12+
where
13+
F: Future<Output = ()> + Send + 'static,
14+
{
15+
task::Builder::new()
16+
.spawn_in_runtime(future, &RUNTIME)
17+
.expect("cannot spawn monitor task");
18+
}
19+
20+
pub fn spawn_thread() {
21+
thread::Builder::new()
22+
.name("async-std/monitor".to_string())
23+
.spawn(|| {
24+
const PROBING_DURATION_MS: u64 = 500;
25+
const SCALING_DOWN_SEC: u64 = 1 * 60; // 1 minute
26+
27+
abort_on_panic(|| {
28+
let running = &Arc::new(Mutex::new(Cell::new(false)));
29+
30+
{
31+
let running = Arc::clone(running);
32+
spawn_monitor_task(async move {
33+
loop {
34+
running.lock().unwrap().set(true);
35+
task::sleep(Duration::from_millis(PROBING_DURATION_MS)).await;
36+
}
37+
});
38+
}
39+
40+
{
41+
spawn_monitor_task(async {
42+
loop {
43+
task::sleep(Duration::from_secs(SCALING_DOWN_SEC)).await;
44+
RUNTIME.scale_down();
45+
}
46+
});
47+
}
48+
49+
loop {
50+
running.lock().unwrap().set(false);
51+
thread::sleep(Duration::from_millis(PROBING_DURATION_MS));
52+
if !running.lock().unwrap().get() {
53+
eprintln!(
54+
"WARNING: You are blocking the runtime, please use spawn_blocking"
55+
);
56+
RUNTIME.scale_up();
57+
}
58+
}
59+
})
60+
})
61+
.expect("cannot start a monitor thread");
62+
}

src/task/builder.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::future::Future;
33
use kv_log_macro::trace;
44

55
use crate::io;
6+
use crate::rt::Runtime;
67
use crate::rt::RUNTIME;
78
use crate::task::{JoinHandle, Task};
89
use crate::utils::abort_on_panic;
@@ -29,6 +30,18 @@ impl Builder {
2930

3031
/// Spawns a task with the configured settings.
3132
pub fn spawn<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
33+
where
34+
F: Future<Output = T> + Send + 'static,
35+
T: Send + 'static,
36+
{
37+
self.spawn_in_runtime(future, &RUNTIME)
38+
}
39+
40+
pub(crate) fn spawn_in_runtime<F, T>(
41+
self,
42+
future: F,
43+
rt: &'static Runtime,
44+
) -> io::Result<JoinHandle<T>>
3245
where
3346
F: Future<Output = T> + Send + 'static,
3447
T: Send + 'static,
@@ -58,7 +71,7 @@ impl Builder {
5871
future.await
5972
};
6073

61-
let schedule = move |t| RUNTIME.schedule(Runnable(t));
74+
let schedule = move |t| rt.schedule(Runnable(t));
6275
let (task, handle) = async_task::spawn(future, schedule, task);
6376
task.schedule();
6477
Ok(JoinHandle::new(handle))

0 commit comments

Comments
 (0)