Skip to content

Commit 20eacda

Browse files
authored
Merge pull request #53 from pietroalbini/multiple-learning
Avoid learning the same job multiple times
2 parents a1a198e + 4d64493 commit 20eacda

File tree

1 file changed

+92
-34
lines changed

1 file changed

+92
-34
lines changed

src/bin/server/worker.rs

Lines changed: 92 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,25 @@ use super::{QueueItem, QueueItemKind};
22

33
use crate::rla;
44
use crate::rla::ci::{self, BuildCommit, CiPlatform};
5-
use std::collections::VecDeque;
5+
use std::collections::{HashSet, VecDeque};
6+
use std::hash::Hash;
67
use std::path::PathBuf;
78
use std::str;
89

9-
// We keep track of the last several unique job IDs. This is because
10-
// Azure sends us a notification for every individual builder's
11-
// state (around 70 notifications/job as of this time), but we want
12-
// to only process a given job once.
13-
//
14-
// You might ask -- why is this not a HashSet/HashMap? That would
15-
// also work, but be a little more complicated to remove things
16-
// from. We would need to keep track of order somehow to remove the
17-
// oldest job ID. An attempt at such an API was tried in PR #29, but
18-
// ultimately scrapped as too complicated.
19-
//
20-
// We keep few enough elements in this "set" that a Vec isn't too bad.
21-
//
22-
// Note: Don't update this number too high, as we O(n) loop through it on every
23-
// notification from GitHub (twice).
24-
const KEEP_IDS: usize = 16;
25-
2610
pub struct Worker {
2711
debug_post: Option<(String, u32)>,
2812
index_file: PathBuf,
2913
index: rla::Index,
3014
extract_config: rla::extract::Config,
3115
github: rla::github::Client,
3216
queue: crossbeam::channel::Receiver<QueueItem>,
33-
notified: VecDeque<u64>,
3417
ci: Box<dyn CiPlatform + Send>,
3518
repo: String,
3619
secondary_repos: Vec<String>,
3720
query_builds_from_primary_repo: bool,
21+
22+
recently_notified: RecentlySeen<u64>,
23+
recently_learned: RecentlySeen<String>,
3824
}
3925

4026
impl Worker {
@@ -66,12 +52,14 @@ impl Worker {
6652
index_file,
6753
extract_config: Default::default(),
6854
github: rla::github::Client::new()?,
69-
notified: VecDeque::new(),
7055
queue,
7156
ci,
7257
repo,
7358
secondary_repos,
7459
query_builds_from_primary_repo,
60+
61+
recently_notified: RecentlySeen::new(32),
62+
recently_learned: RecentlySeen::new(256),
7563
})
7664
}
7765

@@ -133,10 +121,6 @@ impl Worker {
133121

134122
info!("started processing");
135123

136-
if self.notified.contains(&build_id) {
137-
info!("ignoring recently notified build");
138-
return Ok(());
139-
}
140124
let query_from = if self.query_builds_from_primary_repo {
141125
&self.repo
142126
} else {
@@ -160,14 +144,7 @@ impl Worker {
160144

161145
// Avoid processing the same build multiple times.
162146
if !outcome.is_passed() {
163-
info!("preparing report");
164-
self.report_failed(build.as_ref())?;
165-
166-
info!("marked as notified");
167-
self.notified.push_front(build_id);
168-
if self.notified.len() > KEEP_IDS {
169-
self.notified.pop_back();
170-
}
147+
self.report_failed(build_id, build.as_ref())?;
171148
}
172149
if build.pr_number().is_none() && build.branch_name() == "auto" {
173150
info!("learning from the log");
@@ -179,8 +156,13 @@ impl Worker {
179156
Ok(())
180157
}
181158

182-
fn report_failed(&mut self, build: &dyn rla::ci::Build) -> rla::Result<()> {
183-
debug!("Preparing report...");
159+
fn report_failed(&mut self, build_id: u64, build: &dyn rla::ci::Build) -> rla::Result<()> {
160+
if self.recently_notified.recently_witnessed(&build_id) {
161+
info!("avoided reporting recently notified build");
162+
return Ok(());
163+
}
164+
165+
info!("preparing report");
184166

185167
let job = match build.jobs().iter().find(|j| j.outcome().is_failed()) {
186168
Some(job) => *job,
@@ -301,6 +283,9 @@ impl Worker {
301283
[I'm a bot](https://github.com/rust-lang/rust-log-analyzer)! I can only do what humans tell me to, so if this was not helpful or you have suggestions for improvements, please ping or otherwise contact **`@rust-lang/infra`**. ([Feature Requests](https://github.com/rust-lang/rust-log-analyzer/issues?q=is%3Aopen+is%3Aissue+label%3Afeature-request))
302284
"#, opening = opening, html_url = job.html_url(), log_url = pretty_log_url, raw_log_url = raw_log_url, log = extracted))?;
303285

286+
info!("marked build {} as recently notified", build_id);
287+
self.recently_notified.store(build_id);
288+
304289
Ok(())
305290
}
306291

@@ -310,6 +295,11 @@ impl Worker {
310295
continue;
311296
}
312297

298+
if self.recently_learned.recently_witnessed(&job.id()) {
299+
info!("Skipped already processed {}", job);
300+
return Ok(());
301+
}
302+
313303
debug!("Processing {}...", job);
314304

315305
match ci::download_log(self.ci.as_ref(), *job, self.github.internal()) {
@@ -320,6 +310,7 @@ impl Worker {
320310
1,
321311
);
322312
}
313+
self.recently_learned.store(job.id());
323314
}
324315
None => {
325316
warn!(
@@ -341,3 +332,70 @@ impl Worker {
341332
Ok(())
342333
}
343334
}
335+
336+
/// Keeps track of the recently seen IDs for both the failed build reports and the learned jobs.
337+
/// Only the most recent IDs are stored, to avoid growing the memory usage endlessly.
338+
///
339+
/// Internally this uses both an HashSet to provide fast lookups and a VecDeque to know which old
340+
/// jobs needs to be removed.
341+
struct RecentlySeen<T: Clone + Eq + Hash> {
342+
size: usize,
343+
lookup: HashSet<T>,
344+
removal: VecDeque<T>,
345+
}
346+
347+
impl<T: Clone + Eq + Hash> RecentlySeen<T> {
348+
fn new(size: usize) -> Self {
349+
Self {
350+
size,
351+
lookup: HashSet::with_capacity(size),
352+
removal: VecDeque::with_capacity(size),
353+
}
354+
}
355+
356+
fn recently_witnessed(&self, key: &T) -> bool {
357+
self.lookup.contains(key)
358+
}
359+
360+
fn store(&mut self, key: T) {
361+
if self.lookup.contains(&key) {
362+
return;
363+
}
364+
if self.removal.len() >= self.size {
365+
if let Some(item) = self.removal.pop_back() {
366+
self.lookup.remove(&item);
367+
}
368+
}
369+
self.lookup.insert(key.clone());
370+
self.removal.push_front(key);
371+
}
372+
}
373+
374+
#[cfg(test)]
375+
mod tests {
376+
use super::*;
377+
378+
#[test]
379+
fn test_recently_seen() {
380+
let mut recently = RecentlySeen::new(2);
381+
382+
assert!(!recently.recently_witnessed(&0));
383+
assert!(!recently.recently_witnessed(&1));
384+
assert!(!recently.recently_witnessed(&2));
385+
386+
recently.store(0);
387+
assert!(recently.recently_witnessed(&0));
388+
assert!(!recently.recently_witnessed(&1));
389+
assert!(!recently.recently_witnessed(&2));
390+
391+
recently.store(1);
392+
assert!(recently.recently_witnessed(&0));
393+
assert!(recently.recently_witnessed(&1));
394+
assert!(!recently.recently_witnessed(&2));
395+
396+
recently.store(2);
397+
assert!(!recently.recently_witnessed(&0));
398+
assert!(recently.recently_witnessed(&1));
399+
assert!(recently.recently_witnessed(&2));
400+
}
401+
}

0 commit comments

Comments
 (0)