-
Notifications
You must be signed in to change notification settings - Fork 300
Remove blocking IO #1803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove blocking IO #1803
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,166 +1,29 @@ | ||
use std::any::Any; | ||
use std::collections::HashMap; | ||
use std::error::Error; | ||
use std::sync::RwLock; | ||
use std::sync::Arc; | ||
use std::time::Instant; | ||
|
||
use rocket::tokio::sync::RwLock; | ||
use rocket::tokio::task; | ||
|
||
type CacheItem = (Box<dyn Any + Send + Sync>, Instant); | ||
type Generator = fn() -> Result<Box<dyn Any>, Box<dyn Error>>; | ||
|
||
const CACHE_TTL_SECS: u64 = 120; | ||
|
||
lazy_static! { | ||
static ref CACHE: RwLock<HashMap<Generator, CacheItem>> = RwLock::new(HashMap::new()); | ||
} | ||
|
||
pub async fn get<T>(generator: Generator) -> Result<T, Box<dyn Error>> | ||
where | ||
T: Send + Sync + Clone + 'static, | ||
{ | ||
if let Some(cached) = get_cached(generator) { | ||
Ok(cached) | ||
} else { | ||
task::spawn_blocking(move || { | ||
update_cache::<T>(generator) | ||
// stringify the error to make it Send | ||
.map_err(|e| e.to_string()) | ||
}) | ||
.await | ||
.map_err(Box::new)? | ||
// put the previously stringified error back in a box | ||
.map_err(|e| e.as_str().into()) | ||
} | ||
} | ||
|
||
fn get_cached<T>(generator: Generator) -> Option<T> | ||
where | ||
T: Send + Sync + Clone + 'static, | ||
{ | ||
let cache = CACHE.read().unwrap(); | ||
cache.get(&generator).map(|&(ref data, timestamp)| { | ||
#[async_trait] | ||
pub trait Cache: Send + Sync + Clone + 'static { | ||
fn get_timestamp(&self) -> Instant; | ||
async fn fetch() -> Result<Self, Box<dyn Error + Send + Sync>>; | ||
async fn get(cache: &Arc<RwLock<Self>>) -> Self { | ||
let cached = cache.read().await.clone(); | ||
let timestamp = cached.get_timestamp(); | ||
if timestamp.elapsed().as_secs() > CACHE_TTL_SECS { | ||
// Update the cache in the background | ||
task::spawn_blocking(move || { | ||
let _ = update_cache::<T>(generator); | ||
let cache: Arc<_> = cache.clone(); | ||
task::spawn(async move { | ||
match Self::fetch().await { | ||
Ok(data) => *cache.write().await = data, | ||
Err(e) => eprintln!("failed to update cache: {e}"), | ||
} | ||
}); | ||
} | ||
data.downcast_ref::<T>().unwrap().clone() | ||
}) | ||
} | ||
|
||
fn update_cache<T>(generator: Generator) -> Result<T, Box<dyn Error>> | ||
where | ||
T: Send + Sync + Clone + 'static, | ||
{ | ||
if let Ok(data) = generator()?.downcast::<T>() { | ||
let cloned: T = (*data).clone(); | ||
CACHE | ||
.write() | ||
.unwrap() | ||
.insert(generator, (Box::new(cloned), Instant::now())); | ||
Ok(*data) | ||
} else { | ||
Err("the generator returned the wrong type".into()) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use rocket::tokio; | ||
|
||
use super::{get, Generator, CACHE, CACHE_TTL_SECS}; | ||
use std::any::Any; | ||
use std::error::Error; | ||
use std::sync::atomic::{AtomicBool, Ordering}; | ||
use std::thread; | ||
use std::time::{Duration, Instant}; | ||
|
||
#[tokio::test] | ||
async fn test_cache_basic() { | ||
static GENERATOR_CALLED: AtomicBool = AtomicBool::new(false); | ||
|
||
fn generator() -> Result<Box<dyn Any>, Box<dyn Error>> { | ||
GENERATOR_CALLED.store(true, Ordering::SeqCst); | ||
Ok(Box::new("hello world")) | ||
} | ||
|
||
// The first time it will call the generator | ||
GENERATOR_CALLED.store(false, Ordering::SeqCst); | ||
assert_eq!(get::<&'static str>(generator).await.unwrap(), "hello world"); | ||
assert!(GENERATOR_CALLED.load(Ordering::SeqCst)); | ||
|
||
// The second time it won't call the generator, but reuse the latest value | ||
GENERATOR_CALLED.store(false, Ordering::SeqCst); | ||
assert_eq!(get::<&'static str>(generator).await.unwrap(), "hello world"); | ||
assert!(!GENERATOR_CALLED.load(Ordering::SeqCst)); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_cache_refresh() { | ||
static GENERATOR_CALLED: AtomicBool = AtomicBool::new(false); | ||
|
||
fn generator() -> Result<Box<dyn Any>, Box<dyn Error>> { | ||
GENERATOR_CALLED.store(true, Ordering::SeqCst); | ||
thread::sleep(Duration::from_millis(100)); | ||
Ok(Box::new("hello world")) | ||
} | ||
|
||
// Initialize the value in the cache | ||
GENERATOR_CALLED.store(false, Ordering::SeqCst); | ||
assert_eq!(get::<&'static str>(generator).await.unwrap(), "hello world"); | ||
assert!(GENERATOR_CALLED.load(Ordering::SeqCst)); | ||
|
||
// Tweak the cache to fake an expired TTL | ||
let expired = Instant::now() - Duration::from_secs(CACHE_TTL_SECS * 2); | ||
CACHE | ||
.write() | ||
.unwrap() | ||
.get_mut(&(generator as Generator)) | ||
.unwrap() | ||
.1 = expired; | ||
|
||
// The second time it won't call the generator, but start another thread to refresh the | ||
// value in the background | ||
GENERATOR_CALLED.store(false, Ordering::SeqCst); | ||
assert_eq!(get::<&'static str>(generator).await.unwrap(), "hello world"); | ||
assert!(!GENERATOR_CALLED.load(Ordering::SeqCst)); | ||
|
||
// Then the background updater thread will finish | ||
thread::sleep(Duration::from_millis(200)); | ||
assert!(GENERATOR_CALLED.load(Ordering::SeqCst)); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_errors_skip_cache() { | ||
static GENERATOR_CALLED: AtomicBool = AtomicBool::new(false); | ||
|
||
fn generator() -> Result<Box<dyn Any>, Box<dyn Error>> { | ||
GENERATOR_CALLED.store(true, Ordering::SeqCst); | ||
Err("an error".into()) | ||
} | ||
|
||
// The first time it will call the generator | ||
GENERATOR_CALLED.store(false, Ordering::SeqCst); | ||
assert_eq!( | ||
get::<&'static str>(generator) | ||
.await | ||
.unwrap_err() | ||
.to_string(), | ||
"an error" | ||
); | ||
assert!(GENERATOR_CALLED.load(Ordering::SeqCst)); | ||
|
||
// The second time it will also call the generator | ||
GENERATOR_CALLED.store(false, Ordering::SeqCst); | ||
assert_eq!( | ||
get::<&'static str>(generator) | ||
.await | ||
.unwrap_err() | ||
.to_string(), | ||
"an error" | ||
); | ||
assert!(GENERATOR_CALLED.load(Ordering::SeqCst)); | ||
cached | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,14 +30,21 @@ mod redirect; | |
mod rust_version; | ||
mod teams; | ||
|
||
use cache::Cache; | ||
use production::User; | ||
use rocket::tokio::sync::RwLock; | ||
use rocket::State; | ||
use rust_version::RustReleasePost; | ||
use rust_version::RustVersion; | ||
use teams::encode_zulip_stream; | ||
use teams::RustTeams; | ||
|
||
use std::collections::hash_map::DefaultHasher; | ||
use std::env; | ||
use std::fs; | ||
use std::hash::Hasher; | ||
use std::path::{Path, PathBuf}; | ||
use std::sync::Arc; | ||
|
||
use rand::seq::SliceRandom; | ||
|
||
|
@@ -182,13 +189,20 @@ fn robots_txt() -> Option<content::RawText<&'static str>> { | |
} | ||
|
||
#[get("/")] | ||
async fn index() -> Template { | ||
render_index(ENGLISH.into()).await | ||
async fn index( | ||
version_cache: &State<Arc<RwLock<RustVersion>>>, | ||
release_post_cache: &State<Arc<RwLock<RustReleasePost>>>, | ||
) -> Template { | ||
render_index(ENGLISH.into(), version_cache, release_post_cache).await | ||
} | ||
|
||
#[get("/<locale>", rank = 3)] | ||
async fn index_locale(locale: SupportedLocale) -> Template { | ||
render_index(locale.0).await | ||
async fn index_locale( | ||
locale: SupportedLocale, | ||
version_cache: &State<Arc<RwLock<RustVersion>>>, | ||
release_post_cache: &State<Arc<RwLock<RustReleasePost>>>, | ||
) -> Template { | ||
render_index(locale.0, version_cache, release_post_cache).await | ||
} | ||
|
||
#[get("/<category>")] | ||
|
@@ -202,27 +216,35 @@ fn category_locale(category: Category, locale: SupportedLocale) -> Template { | |
} | ||
|
||
#[get("/governance")] | ||
async fn governance() -> Result<Template, Status> { | ||
render_governance(ENGLISH.into()).await | ||
async fn governance(teams_cache: &State<Arc<RwLock<RustTeams>>>) -> Result<Template, Status> { | ||
render_governance(ENGLISH.into(), teams_cache).await | ||
} | ||
|
||
#[get("/governance/<section>/<team>", rank = 2)] | ||
async fn team(section: String, team: String) -> Result<Template, Result<Redirect, Status>> { | ||
render_team(section, team, ENGLISH.into()).await | ||
async fn team( | ||
section: String, | ||
team: String, | ||
teams_cache: &State<Arc<RwLock<RustTeams>>>, | ||
) -> Result<Template, Result<Redirect, Status>> { | ||
render_team(section, team, ENGLISH.into(), teams_cache).await | ||
} | ||
|
||
#[get("/<locale>/governance", rank = 8)] | ||
async fn governance_locale(locale: SupportedLocale) -> Result<Template, Status> { | ||
render_governance(locale.0).await | ||
async fn governance_locale( | ||
locale: SupportedLocale, | ||
teams_cache: &State<Arc<RwLock<RustTeams>>>, | ||
) -> Result<Template, Status> { | ||
render_governance(locale.0, teams_cache).await | ||
} | ||
|
||
#[get("/<locale>/governance/<section>/<team>", rank = 12)] | ||
async fn team_locale( | ||
section: String, | ||
team: String, | ||
locale: SupportedLocale, | ||
teams_cache: &State<Arc<RwLock<RustTeams>>>, | ||
) -> Result<Template, Result<Redirect, Status>> { | ||
render_team(section, team, locale.0).await | ||
render_team(section, team, locale.0, teams_cache).await | ||
} | ||
|
||
#[get("/production/users")] | ||
|
@@ -344,19 +366,26 @@ fn concat_app_js(files: Vec<&str>) -> String { | |
String::from(&js_path[1..]) | ||
} | ||
|
||
async fn render_index(lang: String) -> Template { | ||
async fn render_index( | ||
lang: String, | ||
version_cache: &State<Arc<RwLock<RustVersion>>>, | ||
release_post_cache: &State<Arc<RwLock<RustReleasePost>>>, | ||
) -> Template { | ||
#[derive(Serialize)] | ||
struct IndexData { | ||
rust_version: String, | ||
rust_release_post: String, | ||
} | ||
|
||
let page = "index".to_string(); | ||
let release_post = rust_version::rust_release_post(release_post_cache).await; | ||
let data = IndexData { | ||
rust_version: rust_version::rust_version().await.unwrap_or_default(), | ||
rust_release_post: rust_version::rust_release_post() | ||
.await | ||
.map_or_else(String::new, |v| format!("https://blog.rust-lang.org/{}", v)), | ||
rust_version: rust_version::rust_version(version_cache).await, | ||
rust_release_post: if !release_post.is_empty() { | ||
format!("https://blog.rust-lang.org/{}", release_post) | ||
} else { | ||
String::new() | ||
}, | ||
}; | ||
let context = Context::new(page.clone(), "", true, data, lang); | ||
Template::render(page, context) | ||
|
@@ -383,8 +412,11 @@ fn render_production(lang: String) -> Template { | |
Template::render(page, context) | ||
} | ||
|
||
async fn render_governance(lang: String) -> Result<Template, Status> { | ||
match teams::index_data().await { | ||
async fn render_governance( | ||
lang: String, | ||
teams_cache: &State<Arc<RwLock<RustTeams>>>, | ||
) -> Result<Template, Status> { | ||
match teams::index_data(teams_cache).await { | ||
Ok(data) => { | ||
let page = "governance/index".to_string(); | ||
let context = Context::new(page.clone(), "governance-page-title", false, data, lang); | ||
|
@@ -402,8 +434,9 @@ async fn render_team( | |
section: String, | ||
team: String, | ||
lang: String, | ||
teams_cache: &State<Arc<RwLock<RustTeams>>>, | ||
) -> Result<Template, Result<Redirect, Status>> { | ||
match teams::page_data(§ion, &team).await { | ||
match teams::page_data(§ion, &team, teams_cache).await { | ||
Ok(data) => { | ||
let page = "governance/group".to_string(); | ||
let name = format!("governance-team-{}-name", data.team.name); | ||
|
@@ -448,7 +481,7 @@ fn render_subject(category: Category, subject: String, lang: String) -> Result<T | |
} | ||
|
||
#[launch] | ||
fn rocket() -> _ { | ||
async fn rocket() -> _ { | ||
let templating = Template::custom(|engine| { | ||
engine | ||
.handlebars | ||
|
@@ -461,9 +494,16 @@ fn rocket() -> _ { | |
.register_helper("encode-zulip-stream", Box::new(encode_zulip_stream)); | ||
}); | ||
|
||
let rust_version = RustVersion::fetch().await.unwrap_or_default(); | ||
let rust_release_post = RustReleasePost::fetch().await.unwrap_or_default(); | ||
let teams = RustTeams::fetch().await.unwrap_or_default(); | ||
Comment on lines
+497
to
+499
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a change in behavior, the caches will now already be prefilled with data at startup. Previously, I think the first request using a cache would actually block until some data could be retrieved. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that's a good call, yeah. |
||
|
||
rocket::build() | ||
.attach(templating) | ||
.attach(headers::InjectHeaders) | ||
.manage(Arc::new(RwLock::new(rust_version))) | ||
.manage(Arc::new(RwLock::new(rust_release_post))) | ||
.manage(Arc::new(RwLock::new(teams))) | ||
.mount( | ||
"/", | ||
routes![ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we use some kind of typedef for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
trait Cached
+type Cache<T> = ...
would probably be fine. Open to other opinionsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that's cleaner 👍