Skip to content

Commit c69c35f

Browse files
committed
example: heartbeat example
This updates the heartbeat so it is entirely derived from the config. This can happen explicitly if the user sets logging->heartbeat to a non zero value, but it will also happen if there is a grow or shrink action used. If the user defines a grow/shrink and sets the heartbeat to 0 it will still be set to the default, 60, because grow/shrink will not work as expected without it. Signed-off-by: vsoch <[email protected]>
1 parent 8649990 commit c69c35f

File tree

6 files changed

+96
-10
lines changed

6 files changed

+96
-10
lines changed

README.md

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,27 @@ This design will be translated into more consolidated design documentation. For
3232
- **Plugins**: A plugin is a collection of custom actions that are typically associated with a particular application. For example, a plugin for LAMMPS might know how to check LAMMPS output and act on a specific parsing of a result. Plugins are used equivalently to custom functions, and can accept arguments.
3333
- **Metrics** are summary metrics collected for groups of jobs, for customized algorithms. To support this, we use online (or streaming) ML algorithms for things like mean, IQR, etc. While there could be a named entity called an algorithm, since it's just a set of rules (triggers and actions) that means we don't need to explicitly define them (yet). I can see at some point creating "packaged" rule sets that are called that.
3434

35+
36+
#### Logging
37+
38+
By default, we have a pretty printed set of the "main" events (triggers and submissions). However, to see ALL events from Flux (good for debugging) you can turn them on:
39+
40+
```yaml
41+
logging:
42+
debug: true
43+
```
44+
45+
To set the event heartbeat to fire at some increment, set it:
46+
47+
```yaml
48+
logging:
49+
debug: true
50+
heartbeat: 60
51+
```
52+
53+
Note that by default it is turned off (set to 0 seconds) unless you include a grow or shrink action. In that case, it turns on and defaults to 60, unless you've specified another interval.
54+
If you have grow/shrink and explicitly turn it off, it will still default to 60 seconds, because grow/shrink won't work as expected without the heartbeat.
55+
3556
#### Rules
3657
3758
A rule defines a trigger and action to take. The library is event driven, meaning that the queue is expected to send events, and we don't do any polling.
@@ -79,9 +100,11 @@ The design of a rule is to have an action, and the action is something your ense
79100
For the scale operations, since this brings in the issue of resource contention between different ensembles, we have to assume to start that a request to scale:
80101
81102
1. Should only happen once. If it's granted, great, if not, we aren't going to ask again.
82-
2. Does not need to consider another level of scheduler (e.g., fair shaire)
103+
2. Does not need to consider another level of scheduler (e.g., fair share)
83104
84105
I started thinking about putting a second scheduler (or fair share algorithm) in the grpc service, but realized this was another level of complexity that although we might work on it later, is not warranted yet.
106+
Also note that since scale operation triggers might not be linked to job events (e.g., if we want to trigger when a job group has been in the queue for too long) we added support for a heartbeat. The heartbeat
107+
isn't a trigger in and of itself, but when it runs, it will run through rules that are relevant to queue metrics.
85108
We see "submit" as two examples in the above, which is a common thing you'd want to do! For each action, you should minimally define the "name" and a "label" that typically corresponds to a job group.
86109
You can also optionally define "repetitions," which are the number of times the action should be run before expiring. If you want a backoff period between repetitions, set "backoff" to a non zero value.
87110
By default, when no repetitions or backoff are set, the action is assumed to have a repetition of 1. It will be run once! Let's now look at a custom action. Here is what your function should look like in your `ensemble.yaml`
@@ -177,6 +200,10 @@ ensemble run examples/custom-action-example.yaml
177200
178201
# This shows termination, which is necessary for when you want an exit
179202
ensemble run examples/terminate-example.yaml
203+
204+
# Run a heartbeat every 3 seconds.
205+
# This will trigger a check to see if actions need to be performed
206+
ensemble run examples/heartbeat-example.yaml
180207
```
181208

182209
Right now, this will run any rules with "start" triggers, which for this hello world example includes a few hello world jobs! You'll then be able to watch and see flux events coming in!

ensemble/config/config.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import jsonschema
77

8+
import ensemble.defaults as defaults
89
import ensemble.utils as utils
910
from ensemble import schema
1011
from ensemble.config.types import Rule
@@ -14,6 +15,9 @@
1415
script_template = """from ensemble.config.types import Action, Rule
1516
"""
1617

18+
# These are the actions that warrant the heartbeat
19+
heartbeat_actions = {"grow", "shrink"}
20+
1721

1822
def load_config(config_path):
1923
"""
@@ -34,6 +38,9 @@ def __init__(self, cfg):
3438
self._cfg = cfg
3539
self.jobs = {}
3640
self.rules = {}
41+
42+
# By default, we don't require a heartbeat
43+
self.require_heartbeat = False
3744
self.parse()
3845

3946
# Cache of action names
@@ -43,6 +50,20 @@ def __init__(self, cfg):
4350
def debug_logging(self):
4451
return self._cfg.get("logging", {}).get("debug") is True
4552

53+
@property
54+
def heartbeat(self):
55+
"""
56+
Get the heartbeat seconds.
57+
58+
If heartbeat actions are defined and no heartbeat is set, we require
59+
it and default to 60. Otherwise, we allow it unset (0) or a user
60+
specified value.
61+
"""
62+
heartbeat = self._cfg.get("logging", {}).get("heartbeat") or 0
63+
if not heartbeat and self.require_heartbeat:
64+
heartbeat = defaults.heartbeat_seconds
65+
return heartbeat
66+
4667
def pretty_job(self, name):
4768
"""
4869
Pretty print a job (for the logger) across a single line
@@ -105,6 +126,10 @@ def parse(self):
105126
for rule in self._cfg["rules"]:
106127
rule = Rule(rule, self.custom)
107128

129+
# If the rule action is in the heartbeat set, we require heartbeat
130+
if rule.action.name in heartbeat_actions:
131+
self.require_heartbeat = True
132+
108133
# Group rules with common trigger together
109134
if rule.trigger not in self.rules:
110135
self.rules[rule.trigger] = []

ensemble/defaults.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
port = 50051
33

44
valid_actions = ["submit", "custom", "terminate"]
5+
heartbeat_seconds = 60
56

67
job_events = [
78
"job-depend",

ensemble/members/flux/queue.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ def __init__(
6363
include_inactive=False,
6464
refresh_interval=0,
6565
summary_frequency=10,
66-
heartbeat_seconds=60,
6766
**kwargs,
6867
):
6968
"""
@@ -73,7 +72,6 @@ def __init__(
7372
job_filters (dict) : key value pairs for attributes or states to filter jobs to
7473
include_inactive (bool): consider and include inactive jobs when doing initial parse
7574
refresh_interval (int) : number of seconds to refresh all metrics (0 indicates no refresh)
76-
heartbeat_seconds (int): number of seconds to heartbeat (for grow/shrink). This defaults
7775
to 60 seconds. If you set to 0, it will not be set.
7876
"""
7977
# Filter jobs to any attributes, states, or similar.
@@ -85,7 +83,6 @@ def __init__(
8583
# How often on job completions to summarize?
8684
self.summary_freqency = summary_frequency
8785
self.completion_counter = 0
88-
self.heartbeat_seconds = heartbeat_seconds
8986

9087
# The flux sentinel tells us when we have finished with the backlog
9188
# https://github.com/flux-framework/flux-core/blob/master/src/modules/job-manager/journal.c#L28-L33
@@ -104,7 +101,7 @@ def terminate(self):
104101
Custom termination function for flux.
105102
"""
106103
self.handle.reactor_stop()
107-
if self.heartbeat_seconds:
104+
if self.cfg.heartbeat:
108105
self.heartbeat.stop()
109106

110107
def record_metrics(self, record):
@@ -283,15 +280,15 @@ def setup_heartbeat(self):
283280
"""
284281
Setup the heartbeat - a threading.Thread
285282
"""
286-
if not self.heartbeat_seconds:
283+
if not self.cfg.heartbeat:
287284
return
288285

289286
def heartbeat_callback(cls):
290287
print("💗 HEARTBEAT")
291288
print(cls)
292289

293290
# Instead we are using threading, which works!
294-
self.heartbeat = QueueHeartbeat(self.heartbeat_seconds, heartbeat_callback, cls=self)
291+
self.heartbeat = QueueHeartbeat(self.cfg.heartbeat, heartbeat_callback, cls=self)
295292
self.heartbeat.start()
296293

297294
def cron_heartbeat(self):
@@ -305,14 +302,14 @@ def heartbeat_callback(response):
305302

306303
# Create a cron heartbeat every N seconds, only if we have a heartbeat set
307304
# This is intended for grow/shrink actions that might need a regular check
308-
print(f" 💗 Creating flux heartbeat every {self.heartbeat_seconds} seconds")
305+
print(f" 💗 Creating flux heartbeat every {self.cfg.heartbeat} seconds")
309306
heartbeat = self.handle.rpc(
310307
"cron.create",
311308
{
312309
"type": "interval",
313310
"name": "heartbeat",
314311
"command": "sleep 0",
315-
"args": {"interval": self.heartbeat_seconds},
312+
"args": {"interval": self.cfg.heartbeat},
316313
},
317314
flux.constants.FLUX_NODEID_ANY,
318315
flags=flux.constants.FLUX_RPC_STREAMING,

ensemble/schema.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
"custom": {"type": "string"},
2020
"logging": {
2121
"type": "object",
22-
"properties": {"debug": {"type": "boolean", "default": False}},
22+
"properties": {
23+
"debug": {"type": "boolean", "default": False},
24+
"heartbeat": {"type": "number"},
25+
},
2326
"additionalProperties": False,
2427
},
2528
"jobs": {

examples/heartbeat-example.yaml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# No verbose logging, but turn on the heartbeat every 3 seconds
2+
logging:
3+
debug: false
4+
heartbeat: 3
5+
6+
# I made these the size of the cluster so we trigger scaling at least once
7+
# We currently assume these are uniform (so no order enforced beyond the listing here)
8+
jobs:
9+
- name: echo
10+
command: echo hello world
11+
count: 5
12+
nodes: 1
13+
- name: sleep
14+
command: sleep 10
15+
count: 5
16+
nodes: 1
17+
18+
rules:
19+
# 1. This rule says to submit the sleep jobs when we start
20+
- trigger: start
21+
action:
22+
name: submit
23+
label: sleep
24+
25+
# This says to submit the echo hello world jobs when we have 3
26+
# successfully completed sleep jobs. This dot notation is a path into the
27+
# models data structure.
28+
- trigger: metric
29+
name: count.sleep.success
30+
when: 3
31+
action:
32+
name: submit
33+
label: echo

0 commit comments

Comments
 (0)