Skip to content

Commit bb9520e

Browse files
authored
Merge pull request #9 from converged-computing/add-terminate-action
feat: add support for terminate
2 parents 59e441e + 32063c8 commit bb9520e

File tree

8 files changed

+72
-9
lines changed

8 files changed

+72
-9
lines changed

.devcontainer/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ RUN curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v28.2
2323

2424
# For easier Python development.
2525
RUN python3 -m pip install IPython && \
26-
python3 -m pip install -r /requirements.txt
26+
python3 -m pip install -r /requirements.txt && \
2727
python3 -m pip install -r /dev-requirements.txt
2828

2929
# Assuming installing to /usr/local

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ Note that yes, this means "submit" is both an action and an event.
6666
The design of a rule is to have an action, and the action is something your ensemble can be tasked to do when an event is triggered. Right now we support the following:
6767
6868
- **submit**: submit a job
69+
- **terminate**: terminate the member. This is usually what you want to call to have the entire thing exit on 0
6970
- **custom**: run a custom function that will receive known kwargs, and then should return an action (read more below)
7071
7172
We see "submit" as two examples in the above, which is a common thing you'd want to do! For each action, you should minimally define the "name" and a "label" that typically corresponds to a job group.
@@ -161,6 +162,9 @@ ensemble run examples/backoff-example.yaml
161162
162163
# This shows a custom action
163164
ensemble run examples/custom-action-example.yaml
165+
166+
# This shows termination, which is necessary for when you want an exit
167+
ensemble run examples/terminate-example.yaml
164168
```
165169

166170
Right now, this will run any rules with "start" triggers, which for this hello world example includes a few hello world jobs! You'll then be able to watch and see flux events coming in!

ensemble/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.0.14"
1+
__version__ = "0.0.15"

ensemble/config/types.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
valid_actions = ["submit", "custom"]
1+
import ensemble.defaults as defaults
22

33

44
class Rule:
@@ -29,7 +29,7 @@ def validate(self):
2929
Validate the rule and associated action
3030
"""
3131
# Is the action name valid?
32-
if self.action.name not in valid_actions:
32+
if self.action.name not in defaults.valid_actions:
3333
raise ValueError(
3434
f"Rule trigger {self.trigger} has invalid action name {self.action.name}"
3535
)

ensemble/defaults.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
workers = 10
22
port = 50051
33

4+
valid_actions = ["submit", "custom", "terminate"]
5+
46
job_events = [
57
"job-depend",
68
"job-sched",

ensemble/members/base.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@ def __init__(self, **options):
2929
def name(self):
3030
raise NotImplementedError
3131

32+
def terminate(self):
33+
"""
34+
Custom termination function
35+
36+
Often the executor needs custom logic to work.
37+
"""
38+
pass
39+
3240
def record_metrics(self, event):
3341
"""
3442
Record group metrics for the event.
@@ -95,7 +103,13 @@ def execute_action(self, rule, record=None):
95103

96104
if rule.action.name == "custom":
97105
self.announce(f" custom {rule.action.label}")
98-
self.custom_action(rule, record)
106+
return self.custom_action(rule, record)
107+
108+
# Note that terminate exits but does not otherwise touch
109+
# the queue, etc. Given a reactor, we should just stop it
110+
if rule.action.name == "terminate":
111+
self.announce(" terminate ensemble session")
112+
self.terminate()
99113

100114
def execute_metric_action(self, rule, record=None):
101115
"""

ensemble/members/flux/queue.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,19 @@ def __init__(
9393
def name(self):
9494
return "FluxQueue"
9595

96+
def terminate(self):
97+
"""
98+
Custom termination function for flux.
99+
"""
100+
self.handle.reactor_stop()
101+
96102
def record_metrics(self, record):
97103
"""
98104
Parse a Flux event and record metrics for the group.
99105
"""
100106
# If we are picking up a queue backlog, we might be missing the id
101-
# An alternative would be to assume the job isn't of interest and
102-
# return early, but let's assume it is for now.
107+
# We have to assume we are only interested in the context that is
108+
# seen by the ensemble runner.
103109
if record["id"] not in self.jobids:
104110
return
105111

@@ -127,8 +133,6 @@ def record_metrics(self, record):
127133
for rule in self.iter_rules("metric"):
128134
self.execute_rule(rule, record)
129135

130-
# TODO custom triggers -> actions!
131-
132136
def record_start_metrics(self, event, record):
133137
"""
134138
We typically want to keep the job start time for the

examples/terminate-example.yaml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Termination is necessary if you want the event loop to end (and exit with 0)
2+
logging:
3+
debug: false
4+
5+
# I made these the size of the cluster so we trigger scaling at least once
6+
# We currently assume these are uniform (so no order enforced beyond the listing here)
7+
jobs:
8+
- name: echo
9+
command: echo hello world
10+
count: 5
11+
nodes: 1
12+
- name: sleep
13+
command: sleep 10
14+
count: 5
15+
nodes: 1
16+
17+
rules:
18+
# 1. This rule says to submit the sleep jobs when we start
19+
- trigger: start
20+
action:
21+
name: submit
22+
label: sleep
23+
24+
# This says to submit the echo hello world jobs when we have 3
25+
# successfully completed sleep jobs. This dot notation is a path into the
26+
# models data structure.
27+
- trigger: metric
28+
name: count.sleep.success
29+
when: 3
30+
action:
31+
name: submit
32+
label: echo
33+
34+
# Terminate when we have 5 successful echo jobs
35+
- trigger: metric
36+
name: count.echo.success
37+
when: 5
38+
action:
39+
name: terminate

0 commit comments

Comments
 (0)