Skip to content

Commit bf08022

Browse files
authored
Merge pull request #11 from converged-computing/add-support-minicluster-autoscale
feat: support for flux minicluster
2 parents bb9520e + 1c14f37 commit bf08022

26 files changed

+895
-226
lines changed

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,6 @@ RUN python3 -m pip install --break-system-packages -r requirements.txt && \
2121
python3 -m pip install --break-system-packages -e .
2222

2323
# ensemble-server start --workers 10 --port <port>
24+
ENV PYTHONUNBUFFERED=1
2425
ENTRYPOINT ["ensemble-server"]
2526
CMD ["start"]

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ python: python ## Generate python proto files in python
1717
docker-build:
1818
docker build -t ${IMG} .
1919

20+
.PHONY: kind-load
21+
kind-load: docker-build # For local development with kind
22+
kind load docker-image ${IMG}
23+
2024
.PHONY: arm-build
2125
arm-build:
2226
docker buildx build --platform linux/arm64 --build-arg ARCH=aarch_64 -t ${ARMIMG} .

README.md

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,27 @@ This design will be translated into more consolidated design documentation. For
3232
- **Plugins**: A plugin is a collection of custom actions that are typically associated with a particular application. For example, a plugin for LAMMPS might know how to check LAMMPS output and act on a specific parsing of a result. Plugins are used equivalently to custom functions, and can accept arguments.
3333
- **Metrics** are summary metrics collected for groups of jobs, for customized algorithms. To support this, we use online (or streaming) ML algorithms for things like mean, IQR, etc. While there could be a named entity called an algorithm, since it's just a set of rules (triggers and actions) that means we don't need to explicitly define them (yet). I can see at some point creating "packaged" rule sets that are called that.
3434

35+
36+
#### Logging
37+
38+
By default, we have a pretty printed set of the "main" events (triggers and submissions). However, to see ALL events from Flux (good for debugging) you can turn them on:
39+
40+
```yaml
41+
logging:
42+
debug: true
43+
```
44+
45+
To set the event heartbeat to fire at some increment, set it:
46+
47+
```yaml
48+
logging:
49+
debug: true
50+
heartbeat: 60
51+
```
52+
53+
Note that by default it is turned off (set to 0 seconds) unless you include a grow or shrink action. In that case, it turns on and defaults to 60, unless you've specified another interval.
54+
If you have grow/shrink and explicitly turn it off, it will still default to 60 seconds, because grow/shrink won't work as expected without the heartbeat.
55+
3556
#### Rules
3657
3758
A rule defines a trigger and action to take. The library is event driven, meaning that the queue is expected to send events, and we don't do any polling.
@@ -65,15 +86,29 @@ Note that yes, this means "submit" is both an action and an event.
6586
6687
The design of a rule is to have an action, and the action is something your ensemble can be tasked to do when an event is triggered. Right now we support the following:
6788
68-
- **submit**: submit a job
69-
- **terminate**: terminate the member. This is usually what you want to call to have the entire thing exit on 0
70-
- **custom**: run a custom function that will receive known kwargs, and then should return an action (read more below)
89+
**Flux and MiniCluster**
90+
91+
- *submit*: submit a job
92+
- *terminate*: terminate the member. This is usually what you want to call to have the entire thing exit on 0
93+
- *custom*: run a custom function that will receive known kwargs, and then should return an action (read more below)
94+
95+
**MiniCluster Only**
96+
97+
- *grow*: grow (or request) the cluster to scale up
98+
- *shrink*: shrink (or request) the cluster to scale down
99+
100+
For the scale operations, since this brings in the issue of resource contention between different ensembles, we have to assume to start that a request to scale:
71101
102+
1. Should only happen once. If it's granted, great, if not, we aren't going to ask again.
103+
2. Does not need to consider another level of scheduler (e.g., fair share)
104+
105+
I started thinking about putting a second scheduler (or fair share algorithm) in the grpc service, but realized this was another level of complexity that although we might work on it later, is not warranted yet.
106+
Also note that since scale operation triggers might not be linked to job events (e.g., if we want to trigger when a job group has been in the queue for too long) we added support for a heartbeat. The heartbeat
107+
isn't a trigger in and of itself, but when it runs, it will run through rules that are relevant to queue metrics.
72108
We see "submit" as two examples in the above, which is a common thing you'd want to do! For each action, you should minimally define the "name" and a "label" that typically corresponds to a job group.
73109
You can also optionally define "repetitions," which are the number of times the action should be run before expiring. If you want a backoff period between repetitions, set "backoff" to a non zero value.
74110
By default, when no repetitions or backoff are set, the action is assumed to have a repetition of 1. It will be run once! Let's now look at a custom action. Here is what your function should look like in your `ensemble.yaml`
75111

76-
77112
##### Actions
78113

79114

@@ -157,6 +192,9 @@ ensemble-server start
157192
# Run the hello-world example ensemble! it will submit and monitor job events, etc
158193
ensemble run examples/hello-world.yaml
159194
195+
# Here is how to add on the fly debug (logging->debug true)
196+
ensemble run --debug examples/hello-world.yaml
197+
160198
# This example shows using repetitions and backoff
161199
ensemble run examples/backoff-example.yaml
162200
@@ -165,6 +203,14 @@ ensemble run examples/custom-action-example.yaml
165203
166204
# This shows termination, which is necessary for when you want an exit
167205
ensemble run examples/terminate-example.yaml
206+
207+
# Run a heartbeat every 3 seconds.
208+
# This will trigger a check to see if actions need to be performed
209+
ensemble run examples/heartbeat-example.yaml
210+
211+
# Grow/shrink requires a minicluster (flux doesn't support it) but we can mock it here
212+
# Note that a --name with <namespace>/<name> is required
213+
ensemble run --name default/ensemble --executor minicluster examples/grow-shrink-example.yaml
168214
```
169215

170216
Right now, this will run any rules with "start" triggers, which for this hello world example includes a few hello world jobs! You'll then be able to watch and see flux events coming in!

ensemble/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.0.15"
1+
__version__ = "0.0.16"

ensemble/client/__init__.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import sys
88

99
import ensemble
10+
import ensemble.defaults as defaults
1011

1112

1213
def get_parser():
@@ -42,9 +43,30 @@ def get_parser():
4243
run.add_argument(
4344
"--executor",
4445
help="Executor to use (defaults to flux)",
45-
choices=["flux"],
46+
choices=defaults.supported_members,
4647
default="flux",
4748
)
49+
run.add_argument(
50+
"--name",
51+
help="Identifier for member (required for minicluster)",
52+
)
53+
run.add_argument(
54+
"--debug",
55+
help="Enable debug logging for the config",
56+
action="store_true",
57+
default=False,
58+
)
59+
run.add_argument(
60+
"--port",
61+
help=f"Port to run application (defaults to {defaults.port})",
62+
default=defaults.port,
63+
type=int,
64+
)
65+
run.add_argument(
66+
"--host",
67+
help="Host with server (defaults to localhost)",
68+
default="localhost",
69+
)
4870

4971
for command in [run]:
5072
command.add_argument(

ensemble/client/run.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33

44
def main(args, parser, extra, subparser):
5+
# Assemble options
6+
options = {"name": args.name, "port": args.port, "host": args.host}
7+
58
# This will raise an error if the member type (e.g., minicluster) is not known
6-
member = members.get_member(args.executor)
7-
member.load(args.config)
9+
member = members.get_member(args.executor, options=options)
10+
11+
member.load(args.config, args.debug)
812
member.start()

ensemble/config/config.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import jsonschema
77

8+
import ensemble.defaults as defaults
89
import ensemble.utils as utils
910
from ensemble import schema
1011
from ensemble.config.types import Rule
@@ -14,12 +15,22 @@
1415
script_template = """from ensemble.config.types import Action, Rule
1516
"""
1617

18+
# These are the actions that warrant the heartbeat
19+
heartbeat_actions = {"grow", "shrink"}
1720

18-
def load_config(config_path):
21+
22+
def load_config(config_path, debug=False):
1923
"""
2024
Load the config path, validating with the schema
2125
"""
2226
cfg = utils.read_yaml(config_path)
27+
28+
# On the fly debugging
29+
if debug:
30+
if "logging" not in cfg:
31+
cfg["logging"] = {}
32+
cfg["logging"]["debug"] = True
33+
2334
jsonschema.validate(cfg, schema=schema.ensemble_config_schema)
2435
return EnsembleConfig(cfg)
2536

@@ -34,6 +45,9 @@ def __init__(self, cfg):
3445
self._cfg = cfg
3546
self.jobs = {}
3647
self.rules = {}
48+
49+
# By default, we don't require a heartbeat
50+
self.require_heartbeat = False
3751
self.parse()
3852

3953
# Cache of action names
@@ -43,6 +57,20 @@ def __init__(self, cfg):
4357
def debug_logging(self):
4458
return self._cfg.get("logging", {}).get("debug") is True
4559

60+
@property
61+
def heartbeat(self):
62+
"""
63+
Get the heartbeat seconds.
64+
65+
If heartbeat actions are defined and no heartbeat is set, we require
66+
it and default to 60. Otherwise, we allow it unset (0) or a user
67+
specified value.
68+
"""
69+
heartbeat = self._cfg.get("logging", {}).get("heartbeat") or 0
70+
if not heartbeat and self.require_heartbeat:
71+
heartbeat = defaults.heartbeat_seconds
72+
return heartbeat
73+
4674
def pretty_job(self, name):
4775
"""
4876
Pretty print a job (for the logger) across a single line
@@ -105,6 +133,10 @@ def parse(self):
105133
for rule in self._cfg["rules"]:
106134
rule = Rule(rule, self.custom)
107135

136+
# If the rule action is in the heartbeat set, we require heartbeat
137+
if rule.action.name in heartbeat_actions:
138+
self.require_heartbeat = True
139+
108140
# Group rules with common trigger together
109141
if rule.trigger not in self.rules:
110142
self.rules[rule.trigger] = []

ensemble/config/types.py

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import re
2+
13
import ensemble.defaults as defaults
24

35

@@ -20,9 +22,60 @@ def name(self):
2022
def trigger(self):
2123
return self._rule["trigger"]
2224

25+
def run_when(self, value):
26+
"""
27+
Check if "when" is relevant to be run now, return True/False
28+
to say to run or not.
29+
"""
30+
# No when is set, so we just continue assuming there is no when
31+
if self.when is None:
32+
return True
33+
34+
# If we have a direct value, we check for equality
35+
number = (int, float)
36+
if isinstance(self.when, number) and value != self.when:
37+
return False
38+
if isinstance(self.when, number) and value == self.when:
39+
return True
40+
41+
# Otherwise, parse for inequality
42+
match = re.search(r"(?P<inequality>[<>]=?)\s*(?P<comparator>\w+)", self.when).groupdict()
43+
44+
# This could technically be a float value
45+
comparator = float(match["comparator"])
46+
inequality = match["inequality"]
47+
assert inequality in {"<", ">", "<=", ">=", "==", "="}
48+
49+
# Evaluate! Not sure there is a better way than this :)
50+
if inequality == "<":
51+
return value < comparator
52+
if inequality == "<=":
53+
return value <= comparator
54+
if inequality == ">":
55+
return value > comparator
56+
if inequality == ">=":
57+
return value >= comparator
58+
if inequality in ["==", "="]:
59+
return value == comparator
60+
raise ValueError(f"Invalid comparator {comparator} for rule when")
61+
62+
def check_when(self):
63+
"""
64+
Ensure we have a valid inequality before running anything!
65+
"""
66+
# If a number, we require greater than == 0
67+
if isinstance(self.when, int) and self.when >= 0:
68+
return
69+
70+
# Check running the function with a value, should not raise error
71+
try:
72+
self.run_when(10)
73+
except Exception as err:
74+
raise ValueError(f"when: for rule {self} is not valid: {err}")
75+
2376
@property
2477
def when(self):
25-
return self._rule["when"]
78+
return self._rule.get("when")
2679

2780
def validate(self):
2881
"""
@@ -33,6 +86,8 @@ def validate(self):
3386
raise ValueError(
3487
f"Rule trigger {self.trigger} has invalid action name {self.action.name}"
3588
)
89+
# Ensure we have a valid number or inequality
90+
self.check_when()
3691

3792

3893
class Action:
@@ -123,6 +178,12 @@ def finished(self):
123178
"""
124179
return self.repetitions <= 0
125180

181+
def value(self, default=1):
182+
"""
183+
Return a value, with some default (default is 1)
184+
"""
185+
return self._action.get("value", 1)
186+
126187
@property
127188
def label(self):
128189
return self._action.get("label")

ensemble/defaults.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
workers = 10
22
port = 50051
33

4-
valid_actions = ["submit", "custom", "terminate"]
4+
supported_members = ["flux", "minicluster"]
5+
valid_actions = ["submit", "custom", "terminate", "grow", "shrink"]
6+
heartbeat_seconds = 60
7+
service_account_file = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
58

69
job_events = [
710
"job-depend",

ensemble/heartbeat.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import signal
2+
import threading
3+
4+
5+
class GracefulExit(Exception):
6+
"""
7+
Ensure if we press control+C, it doesn't throw up.
8+
"""
9+
10+
pass
11+
12+
13+
class QueueHeartbeat(threading.Thread):
14+
"""
15+
The Queue Heartbeat triggers at a user specified interval, with the
16+
intention to be able to run events that might not be linked to jobs.
17+
"""
18+
19+
def __init__(self, interval_seconds, callback, **kwargs):
20+
super().__init__()
21+
self.stop_event = threading.Event()
22+
self.interval_seconds = interval_seconds
23+
self.callback = callback
24+
self.kwargs = kwargs
25+
26+
def run(self):
27+
"""
28+
Run the heartbeat function, and exit gracefully
29+
"""
30+
while not self.stop_event.wait(self.interval_seconds):
31+
try:
32+
self.callback(**self.kwargs)
33+
except (KeyboardInterrupt, GracefulExit):
34+
break
35+
36+
def stop(self):
37+
"""
38+
Explicit stop of the thread, like that will happen :)
39+
"""
40+
self.stop_event.set()
41+
42+
43+
def signal_handler(signum, frame):
44+
"""
45+
The signal handler follows an expected pattern, but
46+
just calls a graceful exit
47+
"""
48+
raise GracefulExit()
49+
50+
51+
# Signals for our heartbeat
52+
signal.signal(signal.SIGINT, signal_handler)
53+
signal.signal(signal.SIGTERM, signal_handler)

0 commit comments

Comments
 (0)