Skip to content

Commit 5c87745

Browse files
Merge pull request #1788 from Kobzol/s3-upload-refactor
Refactor self-profile S3 uploading
2 parents 70af968 + cd54cc4 commit 5c87745

File tree

3 files changed

+180
-158
lines changed

3 files changed

+180
-158
lines changed

collector/src/compile/benchmark/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ impl Benchmark {
416416
self.name,
417417
benchmark_start.elapsed().as_secs_f64()
418418
);
419-
419+
processor.postprocess_results().await;
420420
Ok(())
421421
}
422422
}

collector/src/compile/execute/bencher.rs

Lines changed: 174 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,30 @@ use crate::compile::benchmark::scenario::Scenario;
44
use crate::compile::benchmark::BenchmarkName;
55
use crate::compile::execute;
66
use crate::compile::execute::{
7-
rustc, DeserializeStatError, PerfTool, ProcessOutputData, Processor, Retry, SelfProfile,
8-
SelfProfileFiles, Stats, Upload,
7+
rustc, DeserializeStatError, PerfTool, ProcessOutputData, Processor, Retry, SelfProfileFiles,
8+
Stats,
99
};
1010
use crate::toolchain::Toolchain;
1111
use crate::utils::git::get_rustc_perf_commit;
12+
use anyhow::Context;
13+
use database::CollectionId;
1214
use futures::stream::FuturesUnordered;
13-
use futures::StreamExt;
15+
use futures::{future, StreamExt};
16+
use std::collections::VecDeque;
1417
use std::future::Future;
15-
use std::path::PathBuf;
18+
use std::io::Read;
19+
use std::path::{Path, PathBuf};
1620
use std::pin::Pin;
1721
use std::process::Command;
1822
use std::{env, process};
1923

24+
pub struct RecordedSelfProfile {
25+
collection: CollectionId,
26+
scenario: database::Scenario,
27+
profile: database::Profile,
28+
files: SelfProfileFiles,
29+
}
30+
2031
// Tools usable with the benchmarking subcommands.
2132
#[derive(Clone, Copy, Debug, PartialEq)]
2233
pub enum Bencher {
@@ -31,10 +42,10 @@ pub struct BenchProcessor<'a> {
3142
conn: &'a mut dyn database::Connection,
3243
artifact: &'a database::ArtifactId,
3344
artifact_row_id: database::ArtifactIdNumber,
34-
upload: Option<Upload>,
3545
is_first_collection: bool,
3646
is_self_profile: bool,
3747
tries: u8,
48+
self_profiles: Vec<RecordedSelfProfile>,
3849
}
3950

4051
impl<'a> BenchProcessor<'a> {
@@ -63,14 +74,14 @@ impl<'a> BenchProcessor<'a> {
6374
}
6475

6576
BenchProcessor {
66-
upload: None,
6777
conn,
6878
benchmark,
6979
artifact,
7080
artifact_row_id,
7181
is_first_collection: true,
7282
is_self_profile,
7383
tries: 0,
84+
self_profiles: vec![],
7485
}
7586
}
7687

@@ -79,8 +90,8 @@ impl<'a> BenchProcessor<'a> {
7990
scenario: database::Scenario,
8091
profile: Profile,
8192
backend: CodegenBackend,
82-
stats: (Stats, Option<SelfProfile>, Option<SelfProfileFiles>),
83-
) {
93+
stats: Stats,
94+
) -> (CollectionId, database::Profile) {
8495
let version = get_rustc_perf_commit();
8596

8697
let collection = self.conn.collection_id(&version).await;
@@ -97,41 +108,8 @@ impl<'a> BenchProcessor<'a> {
97108
CodegenBackend::Cranelift => database::CodegenBackend::Cranelift,
98109
};
99110

100-
if let Some(files) = stats.2 {
101-
if env::var_os("RUSTC_PERF_UPLOAD_TO_S3").is_some() {
102-
// FIXME: Record codegen backend in the self profile name
103-
104-
// We can afford to have the uploads run concurrently with
105-
// rustc. Generally speaking, they take up almost no CPU time
106-
// (just copying data into the network). Plus, during
107-
// self-profile data timing noise doesn't matter as much. (We'll
108-
// be migrating to instructions soon, hopefully, where the
109-
// upload will cause even less noise). We may also opt at some
110-
// point to defer these uploads entirely to the *end* or
111-
// something like that. For now though this works quite well.
112-
if let Some(u) = self.upload.take() {
113-
u.wait();
114-
}
115-
let prefix = PathBuf::from("self-profile")
116-
.join(self.artifact_row_id.0.to_string())
117-
.join(self.benchmark.0.as_str())
118-
.join(profile.to_string())
119-
.join(scenario.to_id());
120-
self.upload = Some(Upload::new(prefix, collection, files));
121-
self.conn
122-
.record_raw_self_profile(
123-
collection,
124-
self.artifact_row_id,
125-
self.benchmark.0.as_str(),
126-
profile,
127-
scenario,
128-
)
129-
.await;
130-
}
131-
}
132-
133111
let mut buf = FuturesUnordered::new();
134-
for (stat, value) in stats.0.iter() {
112+
for (stat, value) in stats.iter() {
135113
buf.push(self.conn.record_statistic(
136114
collection,
137115
self.artifact_row_id,
@@ -145,6 +123,7 @@ impl<'a> BenchProcessor<'a> {
145123
}
146124

147125
while let Some(()) = buf.next().await {}
126+
(collection, profile)
148127
}
149128

150129
pub async fn measure_rustc(&mut self, toolchain: &Toolchain) -> anyhow::Result<()> {
@@ -196,36 +175,28 @@ impl<'a> Processor for BenchProcessor<'a> {
196175
}
197176
}
198177

199-
let fut = match data.scenario {
200-
Scenario::Full => self.insert_stats(
201-
database::Scenario::Empty,
202-
data.profile,
203-
data.backend,
204-
res,
205-
),
206-
Scenario::IncrFull => self.insert_stats(
207-
database::Scenario::IncrementalEmpty,
208-
data.profile,
209-
data.backend,
210-
res,
211-
),
212-
Scenario::IncrUnchanged => self.insert_stats(
213-
database::Scenario::IncrementalFresh,
214-
data.profile,
215-
data.backend,
216-
res,
217-
),
178+
let scenario = match data.scenario {
179+
Scenario::Full => database::Scenario::Empty,
180+
Scenario::IncrFull => database::Scenario::IncrementalEmpty,
181+
Scenario::IncrUnchanged => database::Scenario::IncrementalFresh,
218182
Scenario::IncrPatched => {
219183
let patch = data.patch.unwrap();
220-
self.insert_stats(
221-
database::Scenario::IncrementalPatch(patch.name),
222-
data.profile,
223-
data.backend,
224-
res,
225-
)
184+
database::Scenario::IncrementalPatch(patch.name)
226185
}
227186
};
228-
fut.await;
187+
let (collection_id, profile) = self
188+
.insert_stats(scenario, data.profile, data.backend, res.0)
189+
.await;
190+
191+
if let Some(files) = res.2 {
192+
self.self_profiles.push(RecordedSelfProfile {
193+
collection: collection_id,
194+
scenario,
195+
profile,
196+
files,
197+
});
198+
}
199+
229200
Ok(Retry::No)
230201
}
231202
Err(DeserializeStatError::NoOutput(output)) => {
@@ -250,4 +221,139 @@ impl<'a> Processor for BenchProcessor<'a> {
250221
}
251222
})
252223
}
224+
225+
fn postprocess_results<'b>(&'b mut self) -> Pin<Box<dyn Future<Output = ()> + 'b>> {
226+
Box::pin(async move {
227+
if env::var_os("RUSTC_PERF_UPLOAD_TO_S3").is_some() {
228+
let futs = self
229+
.self_profiles
230+
.iter()
231+
.map(|profile| {
232+
self.conn.record_raw_self_profile(
233+
profile.collection,
234+
self.artifact_row_id,
235+
self.benchmark.0.as_str(),
236+
profile.profile,
237+
profile.scenario,
238+
)
239+
})
240+
.collect::<Vec<_>>();
241+
future::join_all(futs).await;
242+
243+
// Upload profiles to S3. Buffer up to 10 uploads at a time.
244+
let mut uploads: VecDeque<SelfProfileS3Upload> = VecDeque::new();
245+
for profile in self.self_profiles.drain(..) {
246+
if uploads.len() == 10 {
247+
uploads.pop_front().unwrap().wait();
248+
}
249+
250+
// FIXME: Record codegen backend in the self profile name
251+
let prefix = PathBuf::from("self-profile")
252+
.join(self.artifact_row_id.0.to_string())
253+
.join(self.benchmark.0.as_str())
254+
.join(profile.profile.to_string())
255+
.join(profile.scenario.to_id());
256+
let upload =
257+
SelfProfileS3Upload::new(prefix, profile.collection, profile.files);
258+
uploads.push_back(upload);
259+
}
260+
for upload in uploads {
261+
upload.wait();
262+
}
263+
}
264+
})
265+
}
266+
}
267+
268+
/// Uploads self-profile results to S3
269+
struct SelfProfileS3Upload(std::process::Child, tempfile::NamedTempFile);
270+
271+
impl SelfProfileS3Upload {
272+
fn new(
273+
prefix: PathBuf,
274+
collection: database::CollectionId,
275+
files: SelfProfileFiles,
276+
) -> SelfProfileS3Upload {
277+
// Files are placed at
278+
// * self-profile/<artifact id>/<benchmark>/<profile>/<scenario>
279+
// /self-profile-<collection-id>.{extension}
280+
let upload = tempfile::NamedTempFile::new()
281+
.context("create temporary file")
282+
.unwrap();
283+
let filename = match files {
284+
SelfProfileFiles::Seven {
285+
string_index,
286+
string_data,
287+
events,
288+
} => {
289+
let tarball = snap::write::FrameEncoder::new(Vec::new());
290+
let mut builder = tar::Builder::new(tarball);
291+
builder.mode(tar::HeaderMode::Deterministic);
292+
293+
let append_file = |builder: &mut tar::Builder<_>,
294+
file: &Path,
295+
name: &str|
296+
-> anyhow::Result<()> {
297+
if file.exists() {
298+
// Silently ignore missing files, the new self-profile
299+
// experiment with one file has a different structure.
300+
builder.append_path_with_name(file, name)?;
301+
}
302+
Ok(())
303+
};
304+
305+
append_file(&mut builder, &string_index, "self-profile.string_index")
306+
.expect("append string index");
307+
append_file(&mut builder, &string_data, "self-profile.string_data")
308+
.expect("append string data");
309+
append_file(&mut builder, &events, "self-profile.events").expect("append events");
310+
builder.finish().expect("complete tarball");
311+
std::fs::write(
312+
upload.path(),
313+
builder
314+
.into_inner()
315+
.expect("get")
316+
.into_inner()
317+
.expect("snap success"),
318+
)
319+
.expect("wrote tarball");
320+
format!("self-profile-{}.tar.sz", collection)
321+
}
322+
SelfProfileFiles::Eight { file } => {
323+
let data = std::fs::read(file).expect("read profile data");
324+
let mut data = snap::read::FrameEncoder::new(&data[..]);
325+
let mut compressed = Vec::new();
326+
data.read_to_end(&mut compressed).expect("compressed");
327+
std::fs::write(upload.path(), &compressed).expect("write compressed profile data");
328+
329+
format!("self-profile-{}.mm_profdata.sz", collection)
330+
}
331+
};
332+
333+
let child = Command::new("aws")
334+
.arg("s3")
335+
.arg("cp")
336+
.arg("--storage-class")
337+
.arg("INTELLIGENT_TIERING")
338+
.arg("--only-show-errors")
339+
.arg(upload.path())
340+
.arg(&format!(
341+
"s3://rustc-perf/{}",
342+
&prefix.join(filename).to_str().unwrap()
343+
))
344+
.spawn()
345+
.expect("spawn aws");
346+
347+
SelfProfileS3Upload(child, upload)
348+
}
349+
350+
fn wait(mut self) {
351+
let start = std::time::Instant::now();
352+
let status = self.0.wait().expect("waiting for child");
353+
if !status.success() {
354+
panic!("S3 upload failed: {:?}", status);
355+
}
356+
357+
log::trace!("uploaded to S3, additional wait: {:?}", start.elapsed());
358+
}
253359
}

0 commit comments

Comments
 (0)