Skip to content

feat: add support for terminate #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ RUN curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v28.2

# For easier Python development.
RUN python3 -m pip install IPython && \
python3 -m pip install -r /requirements.txt
python3 -m pip install -r /requirements.txt && \
python3 -m pip install -r /dev-requirements.txt

# Assuming installing to /usr/local
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Note that yes, this means "submit" is both an action and an event.
The design of a rule is to have an action, and the action is something your ensemble can be tasked to do when an event is triggered. Right now we support the following:

- **submit**: submit a job
- **terminate**: terminate the member. This is usually what you want to call to have the entire thing exit on 0
- **custom**: run a custom function that will receive known kwargs, and then should return an action (read more below)

We see "submit" as two examples in the above, which is a common thing you'd want to do! For each action, you should minimally define the "name" and a "label" that typically corresponds to a job group.
Expand Down Expand Up @@ -161,6 +162,9 @@ ensemble run examples/backoff-example.yaml

# This shows a custom action
ensemble run examples/custom-action-example.yaml

# This shows termination, which is necessary for when you want an exit
ensemble run examples/terminate-example.yaml
```

Right now, this will run any rules with "start" triggers, which for this hello world example includes a few hello world jobs! You'll then be able to watch and see flux events coming in!
Expand Down
2 changes: 1 addition & 1 deletion ensemble/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.0.14"
__version__ = "0.0.15"
4 changes: 2 additions & 2 deletions ensemble/config/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
valid_actions = ["submit", "custom"]
import ensemble.defaults as defaults


class Rule:
Expand Down Expand Up @@ -29,7 +29,7 @@ def validate(self):
Validate the rule and associated action
"""
# Is the action name valid?
if self.action.name not in valid_actions:
if self.action.name not in defaults.valid_actions:
raise ValueError(
f"Rule trigger {self.trigger} has invalid action name {self.action.name}"
)
Expand Down
2 changes: 2 additions & 0 deletions ensemble/defaults.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
workers = 10
port = 50051

valid_actions = ["submit", "custom", "terminate"]

job_events = [
"job-depend",
"job-sched",
Expand Down
16 changes: 15 additions & 1 deletion ensemble/members/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ def __init__(self, **options):
def name(self):
raise NotImplementedError

def terminate(self):
"""
Custom termination function

Often the executor needs custom logic to work.
"""
pass

def record_metrics(self, event):
"""
Record group metrics for the event.
Expand Down Expand Up @@ -95,7 +103,13 @@ def execute_action(self, rule, record=None):

if rule.action.name == "custom":
self.announce(f" custom {rule.action.label}")
self.custom_action(rule, record)
return self.custom_action(rule, record)

# Note that terminate exits but does not otherwise touch
# the queue, etc. Given a reactor, we should just stop it
if rule.action.name == "terminate":
self.announce(" terminate ensemble session")
self.terminate()

def execute_metric_action(self, rule, record=None):
"""
Expand Down
12 changes: 8 additions & 4 deletions ensemble/members/flux/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,19 @@ def __init__(
def name(self):
return "FluxQueue"

def terminate(self):
"""
Custom termination function for flux.
"""
self.handle.reactor_stop()

def record_metrics(self, record):
"""
Parse a Flux event and record metrics for the group.
"""
# If we are picking up a queue backlog, we might be missing the id
# An alternative would be to assume the job isn't of interest and
# return early, but let's assume it is for now.
# We have to assume we are only interested in the context that is
# seen by the ensemble runner.
if record["id"] not in self.jobids:
return

Expand Down Expand Up @@ -127,8 +133,6 @@ def record_metrics(self, record):
for rule in self.iter_rules("metric"):
self.execute_rule(rule, record)

# TODO custom triggers -> actions!

def record_start_metrics(self, event, record):
"""
We typically want to keep the job start time for the
Expand Down
39 changes: 39 additions & 0 deletions examples/terminate-example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Termination is necessary if you want the event loop to end (and exit with 0)
logging:
debug: false

# I made these the size of the cluster so we trigger scaling at least once
# We currently assume these are uniform (so no order enforced beyond the listing here)
jobs:
- name: echo
command: echo hello world
count: 5
nodes: 1
- name: sleep
command: sleep 10
count: 5
nodes: 1

rules:
# 1. This rule says to submit the sleep jobs when we start
- trigger: start
action:
name: submit
label: sleep

# This says to submit the echo hello world jobs when we have 3
# successfully completed sleep jobs. This dot notation is a path into the
# models data structure.
- trigger: metric
name: count.sleep.success
when: 3
action:
name: submit
label: echo

# Terminate when we have 5 successful echo jobs
- trigger: metric
name: count.echo.success
when: 5
action:
name: terminate
Loading