Skip to content

feat: support for flux minicluster #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ RUN python3 -m pip install --break-system-packages -r requirements.txt && \
python3 -m pip install --break-system-packages -e .

# ensemble-server start --workers 10 --port <port>
ENV PYTHONUNBUFFERED=1
ENTRYPOINT ["ensemble-server"]
CMD ["start"]
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ python: python ## Generate python proto files in python
docker-build:
docker build -t ${IMG} .

.PHONY: kind-load
kind-load: docker-build # For local development with kind
kind load docker-image ${IMG}

.PHONY: arm-build
arm-build:
docker buildx build --platform linux/arm64 --build-arg ARCH=aarch_64 -t ${ARMIMG} .
Expand Down
54 changes: 50 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,27 @@ This design will be translated into more consolidated design documentation. For
- **Plugins**: A plugin is a collection of custom actions that are typically associated with a particular application. For example, a plugin for LAMMPS might know how to check LAMMPS output and act on a specific parsing of a result. Plugins are used equivalently to custom functions, and can accept arguments.
- **Metrics** are summary metrics collected for groups of jobs, for customized algorithms. To support this, we use online (or streaming) ML algorithms for things like mean, IQR, etc. While there could be a named entity called an algorithm, since it's just a set of rules (triggers and actions) that means we don't need to explicitly define them (yet). I can see at some point creating "packaged" rule sets that are called that.


#### Logging

By default, we have a pretty printed set of the "main" events (triggers and submissions). However, to see ALL events from Flux (good for debugging) you can turn them on:

```yaml
logging:
debug: true
```

To set the event heartbeat to fire at some increment, set it:

```yaml
logging:
debug: true
heartbeat: 60
```

Note that by default it is turned off (set to 0 seconds) unless you include a grow or shrink action. In that case, it turns on and defaults to 60, unless you've specified another interval.
If you have grow/shrink and explicitly turn it off, it will still default to 60 seconds, because grow/shrink won't work as expected without the heartbeat.

#### Rules

A rule defines a trigger and action to take. The library is event driven, meaning that the queue is expected to send events, and we don't do any polling.
Expand Down Expand Up @@ -65,15 +86,29 @@ Note that yes, this means "submit" is both an action and an event.

The design of a rule is to have an action, and the action is something your ensemble can be tasked to do when an event is triggered. Right now we support the following:

- **submit**: submit a job
- **terminate**: terminate the member. This is usually what you want to call to have the entire thing exit on 0
- **custom**: run a custom function that will receive known kwargs, and then should return an action (read more below)
**Flux and MiniCluster**

- *submit*: submit a job
- *terminate*: terminate the member. This is usually what you want to call to have the entire thing exit on 0
- *custom*: run a custom function that will receive known kwargs, and then should return an action (read more below)

**MiniCluster Only**

- *grow*: grow (or request) the cluster to scale up
- *shrink*: shrink (or request) the cluster to scale down

For the scale operations, since this brings in the issue of resource contention between different ensembles, we have to assume to start that a request to scale:

1. Should only happen once. If it's granted, great, if not, we aren't going to ask again.
2. Does not need to consider another level of scheduler (e.g., fair share)

I started thinking about putting a second scheduler (or fair share algorithm) in the grpc service, but realized this was another level of complexity that although we might work on it later, is not warranted yet.
Also note that since scale operation triggers might not be linked to job events (e.g., if we want to trigger when a job group has been in the queue for too long) we added support for a heartbeat. The heartbeat
isn't a trigger in and of itself, but when it runs, it will run through rules that are relevant to queue metrics.
We see "submit" as two examples in the above, which is a common thing you'd want to do! For each action, you should minimally define the "name" and a "label" that typically corresponds to a job group.
You can also optionally define "repetitions," which are the number of times the action should be run before expiring. If you want a backoff period between repetitions, set "backoff" to a non zero value.
By default, when no repetitions or backoff are set, the action is assumed to have a repetition of 1. It will be run once! Let's now look at a custom action. Here is what your function should look like in your `ensemble.yaml`


##### Actions


Expand Down Expand Up @@ -157,6 +192,9 @@ ensemble-server start
# Run the hello-world example ensemble! it will submit and monitor job events, etc
ensemble run examples/hello-world.yaml

# Here is how to add on the fly debug (logging->debug true)
ensemble run --debug examples/hello-world.yaml

# This example shows using repetitions and backoff
ensemble run examples/backoff-example.yaml

Expand All @@ -165,6 +203,14 @@ ensemble run examples/custom-action-example.yaml

# This shows termination, which is necessary for when you want an exit
ensemble run examples/terminate-example.yaml

# Run a heartbeat every 3 seconds.
# This will trigger a check to see if actions need to be performed
ensemble run examples/heartbeat-example.yaml

# Grow/shrink requires a minicluster (flux doesn't support it) but we can mock it here
# Note that a --name with <namespace>/<name> is required
ensemble run --name default/ensemble --executor minicluster examples/grow-shrink-example.yaml
```

Right now, this will run any rules with "start" triggers, which for this hello world example includes a few hello world jobs! You'll then be able to watch and see flux events coming in!
Expand Down
2 changes: 1 addition & 1 deletion ensemble/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0.15"
__version__ = "0.0.16"
24 changes: 23 additions & 1 deletion ensemble/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys

import ensemble
import ensemble.defaults as defaults


def get_parser():
Expand Down Expand Up @@ -42,9 +43,30 @@ def get_parser():
run.add_argument(
"--executor",
help="Executor to use (defaults to flux)",
choices=["flux"],
choices=defaults.supported_members,
default="flux",
)
run.add_argument(
"--name",
help="Identifier for member (required for minicluster)",
)
run.add_argument(
"--debug",
help="Enable debug logging for the config",
action="store_true",
default=False,
)
run.add_argument(
"--port",
help=f"Port to run application (defaults to {defaults.port})",
default=defaults.port,
type=int,
)
run.add_argument(
"--host",
help="Host with server (defaults to localhost)",
default="localhost",
)

for command in [run]:
command.add_argument(
Expand Down
8 changes: 6 additions & 2 deletions ensemble/client/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@


def main(args, parser, extra, subparser):
# Assemble options
options = {"name": args.name, "port": args.port, "host": args.host}

# This will raise an error if the member type (e.g., minicluster) is not known
member = members.get_member(args.executor)
member.load(args.config)
member = members.get_member(args.executor, options=options)

member.load(args.config, args.debug)
member.start()
34 changes: 33 additions & 1 deletion ensemble/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import jsonschema

import ensemble.defaults as defaults
import ensemble.utils as utils
from ensemble import schema
from ensemble.config.types import Rule
Expand All @@ -14,12 +15,22 @@
script_template = """from ensemble.config.types import Action, Rule
"""

# These are the actions that warrant the heartbeat
heartbeat_actions = {"grow", "shrink"}

def load_config(config_path):

def load_config(config_path, debug=False):
"""
Load the config path, validating with the schema
"""
cfg = utils.read_yaml(config_path)

# On the fly debugging
if debug:
if "logging" not in cfg:
cfg["logging"] = {}
cfg["logging"]["debug"] = True

jsonschema.validate(cfg, schema=schema.ensemble_config_schema)
return EnsembleConfig(cfg)

Expand All @@ -34,6 +45,9 @@ def __init__(self, cfg):
self._cfg = cfg
self.jobs = {}
self.rules = {}

# By default, we don't require a heartbeat
self.require_heartbeat = False
self.parse()

# Cache of action names
Expand All @@ -43,6 +57,20 @@ def __init__(self, cfg):
def debug_logging(self):
return self._cfg.get("logging", {}).get("debug") is True

@property
def heartbeat(self):
"""
Get the heartbeat seconds.

If heartbeat actions are defined and no heartbeat is set, we require
it and default to 60. Otherwise, we allow it unset (0) or a user
specified value.
"""
heartbeat = self._cfg.get("logging", {}).get("heartbeat") or 0
if not heartbeat and self.require_heartbeat:
heartbeat = defaults.heartbeat_seconds
return heartbeat

def pretty_job(self, name):
"""
Pretty print a job (for the logger) across a single line
Expand Down Expand Up @@ -105,6 +133,10 @@ def parse(self):
for rule in self._cfg["rules"]:
rule = Rule(rule, self.custom)

# If the rule action is in the heartbeat set, we require heartbeat
if rule.action.name in heartbeat_actions:
self.require_heartbeat = True

# Group rules with common trigger together
if rule.trigger not in self.rules:
self.rules[rule.trigger] = []
Expand Down
63 changes: 62 additions & 1 deletion ensemble/config/types.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import re

import ensemble.defaults as defaults


Expand All @@ -20,9 +22,60 @@ def name(self):
def trigger(self):
return self._rule["trigger"]

def run_when(self, value):
"""
Check if "when" is relevant to be run now, return True/False
to say to run or not.
"""
# No when is set, so we just continue assuming there is no when
if self.when is None:
return True

# If we have a direct value, we check for equality
number = (int, float)
if isinstance(self.when, number) and value != self.when:
return False
if isinstance(self.when, number) and value == self.when:
return True

# Otherwise, parse for inequality
match = re.search(r"(?P<inequality>[<>]=?)\s*(?P<comparator>\w+)", self.when).groupdict()

# This could technically be a float value
comparator = float(match["comparator"])
inequality = match["inequality"]
assert inequality in {"<", ">", "<=", ">=", "==", "="}

# Evaluate! Not sure there is a better way than this :)
if inequality == "<":
return value < comparator
if inequality == "<=":
return value <= comparator
if inequality == ">":
return value > comparator
if inequality == ">=":
return value >= comparator
if inequality in ["==", "="]:
return value == comparator
raise ValueError(f"Invalid comparator {comparator} for rule when")

def check_when(self):
"""
Ensure we have a valid inequality before running anything!
"""
# If a number, we require greater than == 0
if isinstance(self.when, int) and self.when >= 0:
return

# Check running the function with a value, should not raise error
try:
self.run_when(10)
except Exception as err:
raise ValueError(f"when: for rule {self} is not valid: {err}")

@property
def when(self):
return self._rule["when"]
return self._rule.get("when")

def validate(self):
"""
Expand All @@ -33,6 +86,8 @@ def validate(self):
raise ValueError(
f"Rule trigger {self.trigger} has invalid action name {self.action.name}"
)
# Ensure we have a valid number or inequality
self.check_when()


class Action:
Expand Down Expand Up @@ -123,6 +178,12 @@ def finished(self):
"""
return self.repetitions <= 0

def value(self, default=1):
"""
Return a value, with some default (default is 1)
"""
return self._action.get("value", 1)

@property
def label(self):
return self._action.get("label")
5 changes: 4 additions & 1 deletion ensemble/defaults.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
workers = 10
port = 50051

valid_actions = ["submit", "custom", "terminate"]
supported_members = ["flux", "minicluster"]
valid_actions = ["submit", "custom", "terminate", "grow", "shrink"]
heartbeat_seconds = 60
service_account_file = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"

job_events = [
"job-depend",
Expand Down
53 changes: 53 additions & 0 deletions ensemble/heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import signal
import threading


class GracefulExit(Exception):
"""
Ensure if we press control+C, it doesn't throw up.
"""

pass


class QueueHeartbeat(threading.Thread):
"""
The Queue Heartbeat triggers at a user specified interval, with the
intention to be able to run events that might not be linked to jobs.
"""

def __init__(self, interval_seconds, callback, **kwargs):
super().__init__()
self.stop_event = threading.Event()
self.interval_seconds = interval_seconds
self.callback = callback
self.kwargs = kwargs

def run(self):
"""
Run the heartbeat function, and exit gracefully
"""
while not self.stop_event.wait(self.interval_seconds):
try:
self.callback(**self.kwargs)
except (KeyboardInterrupt, GracefulExit):
break

def stop(self):
"""
Explicit stop of the thread, like that will happen :)
"""
self.stop_event.set()


def signal_handler(signum, frame):
"""
The signal handler follows an expected pattern, but
just calls a graceful exit
"""
raise GracefulExit()


# Signals for our heartbeat
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
Loading
Loading