Skip to content

Avoid learning the same job multiple times #53

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 15, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 92 additions & 34 deletions src/bin/server/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,25 @@ use super::{QueueItem, QueueItemKind};

use crate::rla;
use crate::rla::ci::{self, BuildCommit, CiPlatform};
use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::hash::Hash;
use std::path::PathBuf;
use std::str;

// We keep track of the last several unique job IDs. This is because
// Azure sends us a notification for every individual builder's
// state (around 70 notifications/job as of this time), but we want
// to only process a given job once.
//
// You might ask -- why is this not a HashSet/HashMap? That would
// also work, but be a little more complicated to remove things
// from. We would need to keep track of order somehow to remove the
// oldest job ID. An attempt at such an API was tried in PR #29, but
// ultimately scrapped as too complicated.
//
// We keep few enough elements in this "set" that a Vec isn't too bad.
//
// Note: Don't update this number too high, as we O(n) loop through it on every
// notification from GitHub (twice).
const KEEP_IDS: usize = 16;

pub struct Worker {
debug_post: Option<(String, u32)>,
index_file: PathBuf,
index: rla::Index,
extract_config: rla::extract::Config,
github: rla::github::Client,
queue: crossbeam::channel::Receiver<QueueItem>,
notified: VecDeque<u64>,
ci: Box<dyn CiPlatform + Send>,
repo: String,
secondary_repos: Vec<String>,
query_builds_from_primary_repo: bool,

recently_notified: RecentlySeen<u64>,
recently_learned: RecentlySeen<String>,
}

impl Worker {
Expand Down Expand Up @@ -66,12 +52,14 @@ impl Worker {
index_file,
extract_config: Default::default(),
github: rla::github::Client::new()?,
notified: VecDeque::new(),
queue,
ci,
repo,
secondary_repos,
query_builds_from_primary_repo,

recently_notified: RecentlySeen::new(32),
recently_learned: RecentlySeen::new(256),
})
}

Expand Down Expand Up @@ -133,10 +121,6 @@ impl Worker {

info!("started processing");

if self.notified.contains(&build_id) {
info!("ignoring recently notified build");
return Ok(());
}
let query_from = if self.query_builds_from_primary_repo {
&self.repo
} else {
Expand All @@ -160,14 +144,7 @@ impl Worker {

// Avoid processing the same build multiple times.
if !outcome.is_passed() {
info!("preparing report");
self.report_failed(build.as_ref())?;

info!("marked as notified");
self.notified.push_front(build_id);
if self.notified.len() > KEEP_IDS {
self.notified.pop_back();
}
self.report_failed(build_id, build.as_ref())?;
}
if build.pr_number().is_none() && build.branch_name() == "auto" {
info!("learning from the log");
Expand All @@ -179,8 +156,13 @@ impl Worker {
Ok(())
}

fn report_failed(&mut self, build: &dyn rla::ci::Build) -> rla::Result<()> {
debug!("Preparing report...");
fn report_failed(&mut self, build_id: u64, build: &dyn rla::ci::Build) -> rla::Result<()> {
if self.recently_notified.recently_witnessed(&build_id) {
info!("avoided reporting recently notified build");
return Ok(());
}

info!("preparing report");

let job = match build.jobs().iter().find(|j| j.outcome().is_failed()) {
Some(job) => *job,
Expand Down Expand Up @@ -301,6 +283,9 @@ impl Worker {
[I'm a bot](https://github.com/rust-lang/rust-log-analyzer)! I can only do what humans tell me to, so if this was not helpful or you have suggestions for improvements, please ping or otherwise contact **`@rust-lang/infra`**. ([Feature Requests](https://github.com/rust-lang/rust-log-analyzer/issues?q=is%3Aopen+is%3Aissue+label%3Afeature-request))
"#, opening = opening, html_url = job.html_url(), log_url = pretty_log_url, raw_log_url = raw_log_url, log = extracted))?;

info!("marked build {} as recently notified", build_id);
self.recently_notified.store(build_id);

Ok(())
}

Expand All @@ -310,6 +295,11 @@ impl Worker {
continue;
}

if self.recently_learned.recently_witnessed(&job.id()) {
info!("Skipped already processed {}", job);
return Ok(());
}

debug!("Processing {}...", job);

match ci::download_log(self.ci.as_ref(), *job, self.github.internal()) {
Expand All @@ -320,6 +310,7 @@ impl Worker {
1,
);
}
self.recently_learned.store(job.id());
}
None => {
warn!(
Expand All @@ -341,3 +332,70 @@ impl Worker {
Ok(())
}
}

/// Keeps track of the recently seen IDs for both the failed build reports and the learned jobs.
/// Only the most recent IDs are stored, to avoid growing the memory usage endlessly.
///
/// Internally this uses both an HashSet to provide fast lookups and a VecDeque to know which old
/// jobs needs to be removed.
struct RecentlySeen<T: Clone + Eq + Hash> {
size: usize,
lookup: HashSet<T>,
removal: VecDeque<T>,
}

impl<T: Clone + Eq + Hash> RecentlySeen<T> {
fn new(size: usize) -> Self {
Self {
size,
lookup: HashSet::with_capacity(size),
removal: VecDeque::with_capacity(size),
}
}

fn recently_witnessed(&self, key: &T) -> bool {
self.lookup.contains(key)
}

fn store(&mut self, key: T) {
if self.lookup.contains(&key) {
return;
}
if self.removal.len() >= self.size {
if let Some(item) = self.removal.pop_back() {
self.lookup.remove(&item);
}
}
self.lookup.insert(key.clone());
self.removal.push_front(key);
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_recently_seen() {
let mut recently = RecentlySeen::new(2);

assert!(!recently.recently_witnessed(&0));
assert!(!recently.recently_witnessed(&1));
assert!(!recently.recently_witnessed(&2));

recently.store(0);
assert!(recently.recently_witnessed(&0));
assert!(!recently.recently_witnessed(&1));
assert!(!recently.recently_witnessed(&2));

recently.store(1);
assert!(recently.recently_witnessed(&0));
assert!(recently.recently_witnessed(&1));
assert!(!recently.recently_witnessed(&2));

recently.store(2);
assert!(!recently.recently_witnessed(&0));
assert!(recently.recently_witnessed(&1));
assert!(recently.recently_witnessed(&2));
}
}