Skip to content

Commit d9f2df4

Browse files
committed
feat: support for inequality in rule->when
Signed-off-by: vsoch <[email protected]>
1 parent c69c35f commit d9f2df4

File tree

10 files changed

+155
-10
lines changed

10 files changed

+155
-10
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,10 @@ ensemble run examples/terminate-example.yaml
204204
# Run a heartbeat every 3 seconds.
205205
# This will trigger a check to see if actions need to be performed
206206
ensemble run examples/heartbeat-example.yaml
207+
208+
# Grow/shrink requires a minicluster (flux doesn't support it) but we can mock it here
209+
# TODO - add greater than parsing, then run example here, then in operator
210+
ensemble run --executor minicluster examples/grow-shrink-example.yaml
207211
```
208212

209213
Right now, this will run any rules with "start" triggers, which for this hello world example includes a few hello world jobs! You'll then be able to watch and see flux events coming in!

ensemble/client/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import sys
88

99
import ensemble
10+
import ensemble.defaults as defaults
1011

1112

1213
def get_parser():
@@ -42,7 +43,7 @@ def get_parser():
4243
run.add_argument(
4344
"--executor",
4445
help="Executor to use (defaults to flux)",
45-
choices=["flux"],
46+
choices=defaults.supported_members,
4647
default="flux",
4748
)
4849

ensemble/config/types.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import re
2+
13
import ensemble.defaults as defaults
24

35

@@ -20,9 +22,57 @@ def name(self):
2022
def trigger(self):
2123
return self._rule["trigger"]
2224

25+
def run_when(self, value):
26+
"""
27+
Check if "when" is relevant to be run now, return True/False
28+
to say to run or not.
29+
"""
30+
# No when is set, so we just continue assuming there is no when
31+
if self.when is None:
32+
return True
33+
34+
# If we have a direct value, we check for equality
35+
if isinstance(value, int) and value != self.when:
36+
return False
37+
38+
# Otherwise, parse for inequality
39+
match = re.search(r"(?P<inequality>[<>]=?)\s*(?P<comparator>\w+)", value).groupdict()
40+
41+
# This could technically be a float value
42+
comparator = float(match["comparator"])
43+
inequality = match["inequality"]
44+
assert inequality in {"<", ">", "<=", ">=", "==", "="}
45+
46+
# Evaluate! Not sure there is a better way than this :)
47+
if inequality == "<":
48+
return value < comparator
49+
if inequality == "<=":
50+
return value <= comparator
51+
if inequality == ">":
52+
return value > comparator
53+
if inequality == ">=":
54+
return value >= comparator
55+
if inequality in ["==", "="]:
56+
return value == comparator
57+
raise ValueError(f"Invalid comparator {comparator} for rule when")
58+
59+
def check_when(self):
60+
"""
61+
Ensure we have a valid inequality before running anything!
62+
"""
63+
# If a number, we require greater than == 0
64+
if isinstance(self.when, int) and self.when >= 0:
65+
return
66+
67+
# Check running the function with a value, should not raise error
68+
try:
69+
self.run_when(10)
70+
except Exception as err:
71+
raise ValueError(f"when: for rule {self} is not valid: {err}")
72+
2373
@property
2474
def when(self):
25-
return self._rule["when"]
75+
return self._rule.get("when")
2676

2777
def validate(self):
2878
"""
@@ -33,6 +83,8 @@ def validate(self):
3383
raise ValueError(
3484
f"Rule trigger {self.trigger} has invalid action name {self.action.name}"
3585
)
86+
# Ensure we have a valid number or inequality
87+
self.check_when()
3688

3789

3890
class Action:

ensemble/defaults.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
workers = 10
22
port = 50051
33

4-
valid_actions = ["submit", "custom", "terminate"]
4+
supported_members = ["flux", "minicluster"]
5+
valid_actions = ["submit", "custom", "terminate", "grow", "shrink"]
56
heartbeat_seconds = 60
67

78
job_events = [

ensemble/members/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ def get_member(name, options=None):
99

1010
return FluxQueue(**options)
1111
if name == "minicluster":
12-
from ensemble.members.flux.minicluster import FluxQueue
12+
from ensemble.members.flux.minicluster import FluxMiniClusterQueue
1313

14-
return FluxQueue(**options)
14+
return FluxMiniClusterQueue(**options)
1515
raise ValueError(f"Member type {name} is not known")

ensemble/members/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def execute_metric_action(self, rule, record=None):
132132
item = item[path]
133133

134134
# The user set a "when" and it must match exactly.
135-
if rule.when is not None and item.get() != rule.when:
135+
if not rule.run_when(item.get()):
136136
return
137137
if self.cfg.debug_logging:
138138
print(self.metrics.models)

ensemble/members/flux/minicluster.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
from ensemble.members.flux import FluxQueue as MemberBase
1+
from ensemble.members.flux.queue import FluxQueue as MemberBase
22

33
# These are triggers supported for rules
44
rules = ["start", "metric"]
55

66

7-
class FluxQueue(MemberBase):
7+
class FluxMiniClusterQueue(MemberBase):
88
"""
99
The Flux Queue MiniCluster member type
1010

ensemble/members/flux/queue.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import shlex
22
import sys
3-
from datetime import datetime
3+
import time
44

55
from ensemble.heartbeat import QueueHeartbeat
66
from ensemble.members.base import MemberBase
@@ -138,6 +138,33 @@ def record_metrics(self, record):
138138
for rule in self.iter_rules("metric"):
139139
self.execute_rule(rule, record)
140140

141+
def record_heartbeat_metrics(self):
142+
"""
143+
Heartbeat metrics cannot rely on an event, but need
144+
to update metrics about the queue and then check to
145+
see if any actions should be triggered related to that.
146+
147+
Note: this doesn't reset anything from previous pending, it
148+
will be more like a moving average where the same jobs (if they
149+
are still pending) get counted again, possibly increasing time.
150+
"""
151+
groups = {}
152+
for _, group in self.jobids:
153+
groups.add(group["name"])
154+
155+
# If we have a submit but not a start, we haven't included
156+
# pending yet
157+
if "submit" in group and "start" not in group:
158+
time_in_queue = time.time() - group["submit"]
159+
group_name = f"{group['name']}-pending"
160+
self.metrics.record_datum(group_name, time_in_queue)
161+
162+
print(f"Found active groups {groups}")
163+
164+
# Now execute metric rules that might be impacted
165+
for rule in self.iter_rules("metric"):
166+
self.execute_rule(rule)
167+
141168
def record_start_metrics(self, event, record):
142169
"""
143170
We typically want to keep the job start time for the
@@ -364,7 +391,7 @@ def submit(self, rule, record=None):
364391
jobid = flux.job.submit(self.handle, jobspec)
365392

366393
# Don't rely on an event here, this is when the user (us) submits
367-
submit_time = datetime.now().timestamp()
394+
submit_time = time.time()
368395

369396
# This is the job id that will show up in events
370397
numerical = jobid.as_integer_ratio()[0]

examples/grow-shrink-example.yaml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# Note that we don't need to "turn on" the heartbeat because grow is present.
2+
# I made these the size of the cluster so we trigger scaling at least once
3+
# We currently assume these are uniform (so no order enforced beyond the listing here)
4+
jobs:
5+
- name: echo
6+
command: echo hello world
7+
count: 5
8+
nodes: 1
9+
10+
# Note that this takes up the entirety of our 4 node faux
11+
# test cluster, so the grow will trigger when the queue time
12+
# stacks up
13+
- name: sleep
14+
command: sleep 100
15+
count: 10
16+
nodes: 4
17+
18+
# TODO need a test case for queue time waiting...
19+
# Also need to implement when with < > case.
20+
- name: big-sleep
21+
command: sleep 10
22+
count: 5
23+
nodes: 10
24+
25+
rules:
26+
# 1. This rule says to submit the sleep jobs when we start
27+
- trigger: start
28+
action:
29+
name: submit
30+
label: echo
31+
32+
# This says to submit the sleep jobs when we have 5 successful echo
33+
- trigger: metric
34+
name: count.echo.success
35+
when: 5
36+
action:
37+
name: submit
38+
label: sleep
39+
40+
# When the pending time of sleep is > 5 seconds, grow the cluster.
41+
- trigger: metric
42+
name: pending-duration.mean
43+
when: "> 5"
44+
action:
45+
name: grow

examples/heartbeat-example.yaml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ jobs:
1515
count: 5
1616
nodes: 1
1717

18+
# TODO need a test case for queue time waiting...
19+
# Also need to implement when with < > case.
20+
- name: big-sleep
21+
command: sleep 10
22+
count: 5
23+
nodes: 10
24+
1825
rules:
1926
# 1. This rule says to submit the sleep jobs when we start
2027
- trigger: start
@@ -31,3 +38,11 @@ rules:
3138
action:
3239
name: submit
3340
label: echo
41+
42+
# This says to submit
43+
- trigger: metric
44+
name: count.sleep.success
45+
when: 3
46+
action:
47+
name: submit
48+
label: echo

0 commit comments

Comments
 (0)