Skip to content

Commit 8a8c2ba

Browse files
kiukchungfacebook-github-bot
authored andcommitted
Create process_allocator binary for OSS
Summary: Adds the OSS equivalent of `hyperactor_meta mesh-worker` called `process_allocator`. Also created a meta version of it (but haven't switched over to it in the job launcher). Reviewed By: colin2328 Differential Revision: D75299505 fbshipit-source-id: 1b4a89ddd2da424ce8f8ded9343d134979ebabb5
1 parent 13d7be6 commit 8a8c2ba

File tree

5 files changed

+255
-2
lines changed

5 files changed

+255
-2
lines changed

monarch_hyperactor/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# @generated by autocargo from //monarch/monarch_hyperactor:monarch_hyperactor
1+
# @generated by autocargo from //monarch/monarch_hyperactor:[monarch_hyperactor,process_allocator-oss]
22

33
[package]
44
name = "monarch_hyperactor"
@@ -11,6 +11,7 @@ license = "BSD-3"
1111
anyhow = "1.0.95"
1212
async-trait = "0.1.86"
1313
bincode = "1.3.3"
14+
clap = { version = "4.5.38", features = ["derive", "env", "string", "unicode", "wrap_help"] }
1415
hyperactor = { version = "0.0.0", path = "../hyperactor" }
1516
hyperactor_extension = { version = "0.0.0", path = "../hyperactor_extension" }
1617
hyperactor_mesh = { version = "0.0.0", path = "../hyperactor_mesh" }
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
use std::result::Result;
10+
11+
use clap::Parser;
12+
use clap::command;
13+
use hyperactor::channel::ChannelAddr;
14+
use hyperactor_mesh::alloc::remoteprocess::RemoteProcessAllocator;
15+
use tokio::process::Command;
16+
#[derive(Parser, Debug)]
17+
#[command(about = "Runs hyperactor's process allocator")]
18+
pub struct Args {
19+
#[arg(
20+
long,
21+
default_value = "[::]",
22+
help = "The address bind to. The process allocator runs on `bind_addr:port`"
23+
)]
24+
pub addr: String,
25+
26+
#[arg(
27+
long,
28+
default_value_t = 26600,
29+
help = "Port to bind to. The process allocator runs on `bind_addr:port`"
30+
)]
31+
pub port: u16,
32+
33+
#[arg(
34+
long,
35+
default_value = "monarch_bootstrap",
36+
help = "The path to the binary that this process allocator spawns on an `allocate` request"
37+
)]
38+
pub program: String,
39+
}
40+
41+
pub fn main_impl(
42+
serve_address: ChannelAddr,
43+
program: String,
44+
) -> tokio::task::JoinHandle<Result<(), anyhow::Error>> {
45+
tracing::info!("bind address is: {}", serve_address);
46+
tracing::info!("program to spawn on allocation request: [{}]", &program);
47+
48+
tokio::spawn(async {
49+
RemoteProcessAllocator::new()
50+
.start(Command::new(program), serve_address)
51+
.await
52+
})
53+
}
54+
55+
#[cfg(test)]
56+
mod tests {
57+
use std::collections::HashSet;
58+
59+
use clap::Parser;
60+
use hyperactor::WorldId;
61+
use hyperactor::channel::ChannelTransport;
62+
use hyperactor_mesh::alloc;
63+
use hyperactor_mesh::alloc::Alloc;
64+
use hyperactor_mesh::alloc::remoteprocess;
65+
use ndslice::shape;
66+
67+
use super::*;
68+
69+
#[tokio::test]
70+
async fn test_args_defaults() -> Result<(), anyhow::Error> {
71+
let args = vec!["process_allocator"];
72+
73+
let parsed_args = Args::parse_from(args);
74+
75+
assert_eq!(parsed_args.addr, "[::]");
76+
assert_eq!(parsed_args.port, 26600);
77+
assert_eq!(parsed_args.program, "monarch_bootstrap");
78+
Ok(())
79+
}
80+
81+
#[tokio::test]
82+
async fn test_args() -> Result<(), anyhow::Error> {
83+
let args = vec![
84+
"process_allocator",
85+
"--addr=127.0.0.1",
86+
"--port=29500",
87+
"--program=/bin/echo",
88+
];
89+
90+
let parsed_args = Args::parse_from(args);
91+
92+
assert_eq!(parsed_args.addr, "127.0.0.1");
93+
assert_eq!(parsed_args.port, 29500);
94+
assert_eq!(parsed_args.program, "/bin/echo");
95+
Ok(())
96+
}
97+
98+
#[tokio::test]
99+
async fn test_main_impl() -> Result<(), anyhow::Error> {
100+
hyperactor::initialize();
101+
102+
let serve_address = ChannelAddr::any(ChannelTransport::Unix);
103+
let program = String::from("/bin/date"); // date is usually a unix built-in command
104+
let server_handle = main_impl(serve_address.clone(), program);
105+
106+
let spec = alloc::AllocSpec {
107+
// NOTE: x cannot be more than 1 since we created a single process-allocator server instance!
108+
shape: shape! { x=1, y=4 },
109+
constraints: alloc::AllocConstraints::none(),
110+
};
111+
112+
let mut initializer = remoteprocess::MockRemoteProcessAllocInitializer::new();
113+
initializer.expect_initialize_alloc().return_once(move || {
114+
Ok(vec![remoteprocess::RemoteProcessAllocHost {
115+
hostname: serve_address.to_string(),
116+
id: serve_address.to_string(),
117+
}])
118+
});
119+
120+
let heartbeat = std::time::Duration::from_millis(100);
121+
let world_id = WorldId("__unused__".to_string());
122+
123+
let mut alloc = remoteprocess::RemoteProcessAlloc::new(
124+
spec.clone(),
125+
world_id,
126+
ChannelTransport::Unix,
127+
0,
128+
heartbeat,
129+
initializer,
130+
)
131+
.await
132+
.unwrap();
133+
134+
// make sure we accounted for `world_size` number of Created and Stopped proc states
135+
let world_size = spec.shape.slice().iter().count();
136+
let mut created_ranks: HashSet<usize> = HashSet::new();
137+
let mut stopped_ranks: HashSet<usize> = HashSet::new();
138+
139+
while created_ranks.len() < world_size || stopped_ranks.len() < world_size {
140+
let proc_state = alloc.next().await.unwrap();
141+
match proc_state {
142+
alloc::ProcState::Created { proc_id, coords: _ } => {
143+
// alloc.next() will keep creating procs and incrementing rank id
144+
// so we mod the rank by world_size to map it to its logical rank
145+
created_ranks.insert(proc_id.rank() % world_size);
146+
}
147+
alloc::ProcState::Stopped { proc_id, reason: _ } => {
148+
stopped_ranks.insert(proc_id.rank() % world_size);
149+
}
150+
_ => {}
151+
}
152+
}
153+
154+
let expected_ranks: HashSet<usize> = (0..world_size).collect();
155+
assert_eq!(created_ranks, expected_ranks);
156+
assert_eq!(stopped_ranks, expected_ranks);
157+
158+
server_handle.abort();
159+
Ok(())
160+
}
161+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
mod common;
10+
11+
use clap::Parser;
12+
use common::Args;
13+
use common::main_impl;
14+
use hyperactor::channel::ChannelAddr;
15+
16+
#[tokio::main]
17+
async fn main() {
18+
let args = Args::parse();
19+
hyperactor::initialize();
20+
21+
let bind = format!("{}:{}", args.addr, args.port);
22+
let socket_addr: std::net::SocketAddr = bind.parse().unwrap();
23+
let serve_address = ChannelAddr::Tcp(socket_addr);
24+
25+
let _ = main_impl(serve_address, args.program).await.unwrap();
26+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
* All rights reserved.
4+
*
5+
* This source code is licensed under the BSD-style license found in the
6+
* LICENSE file in the root directory of this source tree.
7+
*/
8+
9+
// std::time::SystemTime::now is used by #[cli::main] suppress the warning since
10+
// it is used around the CLI not in hyperactor core code
11+
#![allow(clippy::disallowed_methods)]
12+
13+
mod common;
14+
15+
use anyhow::Result;
16+
use cli::ExitCode;
17+
use common::Args;
18+
use common::main_impl;
19+
use fbinit::FacebookInit;
20+
use hyperactor::channel::ChannelAddr;
21+
use hyperactor_meta_lib::system_resolution::canonicalize_hostname;
22+
use hyperactor_telemetry::env::Env;
23+
use hyperactor_telemetry::env::HYPERACTOR_EXECUTION_ID_ENV;
24+
use hyperactor_telemetry::env::MAST_HPC_JOB_NAME_ENV;
25+
26+
fn hostname() -> String {
27+
canonicalize_hostname(
28+
hostname::get()
29+
.ok()
30+
.and_then(|hostname| hostname.into_string().ok())
31+
.expect("failed to retrieve hostname")
32+
.as_str(),
33+
)
34+
}
35+
36+
#[cli::main("process_allocator")]
37+
async fn main(_fb: FacebookInit, args: Args) -> Result<ExitCode> {
38+
match Env::current() {
39+
Env::Mast => {
40+
let job_name =
41+
std::env::var(MAST_HPC_JOB_NAME_ENV).expect("MAST_HPC_JOB_NAME not set in MAST");
42+
std::env::set_var(HYPERACTOR_EXECUTION_ID_ENV, job_name);
43+
}
44+
_ => {}
45+
}
46+
hyperactor::initialize();
47+
48+
let current_host = hostname();
49+
tracing::info!(
50+
"NOTE: argument `--addr` is ignored when running internally at Meta! \
51+
Process allocator runs on current host: `{}:{}` using Meta-TLS over TCP",
52+
&current_host,
53+
args.port
54+
);
55+
let serve_address = ChannelAddr::MetaTls(current_host, args.port);
56+
let result = main_impl(serve_address, args.program).await;
57+
58+
match result {
59+
Ok(_) => Ok(ExitCode::SUCCESS),
60+
Err(e) => {
61+
tracing::error!("Error running process allocator: {:?}", e);
62+
Ok(ExitCode::FAILURE)
63+
}
64+
}
65+
}

tools/components/hyperactor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def proc_mesh(
4141
mesh_role = specs.Role(
4242
name=mesh.name,
4343
image=image,
44-
entrypoint="hyperactor", # TODO install hyperactor cargo and add ~/.cargo/bin to PATH in Dockerfile
44+
entrypoint="process_allocator", # 'cargo install monarch_hyperactor' to get this binary
4545
args=[
4646
"mesh-worker",
4747
f"--port={port}",

0 commit comments

Comments
 (0)