Skip to content

Commit 82b92fb

Browse files
committed
feat: grow/shrink requests are being hit
I need to put this into the ensemble operator next to have the request actually do something, like request the minicluster to scale up or down. I will also need to have a way to communicate the member name and namespace. This could either be done via discovery (requiring the kubernetes API within the ensemble python and the rbac to use it), or more simply done, just put the member name that is expected in the same namespace. More ideally there can be a registration step at the onset that generates a random name and sends it over to the grpc service to associate. Signed-off-by: vsoch <[email protected]>
1 parent d9f2df4 commit 82b92fb

File tree

11 files changed

+109
-60
lines changed

11 files changed

+109
-60
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,9 @@ ensemble run examples/heartbeat-example.yaml
208208
# Grow/shrink requires a minicluster (flux doesn't support it) but we can mock it here
209209
# TODO - add greater than parsing, then run example here, then in operator
210210
ensemble run --executor minicluster examples/grow-shrink-example.yaml
211+
212+
# Here is how to add on the fly debug (logging->debug true)
213+
ensemble run --debug --executor minicluster examples/grow-shrink-example.yaml
211214
```
212215

213216
Right now, this will run any rules with "start" triggers, which for this hello world example includes a few hello world jobs! You'll then be able to watch and see flux events coming in!

ensemble/client/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ def get_parser():
4646
choices=defaults.supported_members,
4747
default="flux",
4848
)
49+
run.add_argument(
50+
"--debug",
51+
help="Enable debug logging for the config",
52+
action="store_true",
53+
default=False,
54+
)
4955

5056
for command in [run]:
5157
command.add_argument(

ensemble/client/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@
44
def main(args, parser, extra, subparser):
55
# This will raise an error if the member type (e.g., minicluster) is not known
66
member = members.get_member(args.executor)
7-
member.load(args.config)
7+
member.load(args.config, args.debug)
88
member.start()

ensemble/config/config.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,18 @@
1919
heartbeat_actions = {"grow", "shrink"}
2020

2121

22-
def load_config(config_path):
22+
def load_config(config_path, debug=False):
2323
"""
2424
Load the config path, validating with the schema
2525
"""
2626
cfg = utils.read_yaml(config_path)
27+
28+
# On the fly debugging
29+
if debug:
30+
if "logging" not in cfg:
31+
cfg["logging"] = {}
32+
cfg["logging"]["debug"] = True
33+
2734
jsonschema.validate(cfg, schema=schema.ensemble_config_schema)
2835
return EnsembleConfig(cfg)
2936

ensemble/config/types.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,14 @@ def run_when(self, value):
3232
return True
3333

3434
# If we have a direct value, we check for equality
35-
if isinstance(value, int) and value != self.when:
35+
number = (int, float)
36+
if isinstance(self.when, number) and value != self.when:
3637
return False
38+
if isinstance(self.when, number) and value == self.when:
39+
return True
3740

3841
# Otherwise, parse for inequality
39-
match = re.search(r"(?P<inequality>[<>]=?)\s*(?P<comparator>\w+)", value).groupdict()
42+
match = re.search(r"(?P<inequality>[<>]=?)\s*(?P<comparator>\w+)", self.when).groupdict()
4043

4144
# This could technically be a float value
4245
comparator = float(match["comparator"])

ensemble/members/base.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ def execute_action(self, rule, record=None):
110110
# Not supported if the function does not exist
111111
if not run_action:
112112
raise NotImplementedError("Action {rule.action.name} is not supported.")
113-
self.announce(f" {rule.action.name} {rule.action.label}")
113+
self.announce(f" {rule.action.name} {rule.action.label or ''}")
114114
run_action(rule, record)
115115

116116
# Note that terminate exits but does not otherwise touch
@@ -146,11 +146,11 @@ def validate_rules(self):
146146
"""
147147
self.cfg.check_supported(self.rules_supported)
148148

149-
def load(self, config_path):
149+
def load(self, config_path, debug=False):
150150
"""
151151
Load and validate the config path
152152
"""
153-
self.cfg = cfg.load_config(config_path)
153+
self.cfg = cfg.load_config(config_path, debug)
154154
# All rules that the ensemble provides must be
155155
# supported by the queue executor
156156
self.validate_rules()

ensemble/members/client.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import json
2+
13
import ensemble.members.auth as auth
24
from ensemble.protos import ensemble_service_pb2, ensemble_service_pb2_grpc
35

@@ -17,20 +19,17 @@ def __init__(self, host="localhost:50051", use_ssl=False):
1719
self.host = host
1820
self.use_ssl = use_ssl
1921

20-
def update_request(self, member, action, payload):
22+
def action_request(self, member, action, payload):
2123
"""
22-
Send an update request to the grpc server.
24+
Send an action request to the grpc server.
2325
"""
26+
payload = json.dumps(payload)
27+
2428
# These are submit variables. A more substantial submit script would have argparse, etc.
25-
request = ensemble_service_pb2.UpdateRequest(member=member, action=action, payload=payload)
29+
request = ensemble_service_pb2.ActionRequest(member=member, action=action, payload=payload)
2630

2731
with auth.grpc_channel(self.host, self.use_ssl) as channel:
2832
stub = ensemble_service_pb2_grpc.EnsembleOperatorStub(channel)
29-
response = stub.ReceiveJobs(request)
30-
31-
# Case 1: SUCCESS
32-
if response.status == 1:
33-
print("Successful update.")
34-
else:
35-
print("Issue with requesting update")
33+
response = stub.RequestAction(request)
34+
print(f"Action request: {response.status}")
3635
return response

ensemble/members/flux/minicluster.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from ensemble.members.client import EnsembleClient
12
from ensemble.members.flux.queue import FluxQueue as MemberBase
23

34
# These are triggers supported for rules
@@ -12,24 +13,40 @@ class FluxMiniClusterQueue(MemberBase):
1213
member supports scale up and scale down for actions.
1314
"""
1415

16+
# TODO for ensemble operator - we need the ensemble name to identify it
17+
# perhaps generate randomly or similar?
18+
19+
@property
20+
def client(self):
21+
"""
22+
Ensure we have a connection to the service client.
23+
"""
24+
# Do we have the ensemble client?
25+
# TODO allow to customize host, etc.
26+
if hasattr(self, "_client"):
27+
return self._client
28+
self._client = EnsembleClient()
29+
return self._client
30+
1531
def grow(self, rule, record=None):
1632
"""
1733
Request to the API to grow the MiniCluster
1834
"""
19-
print("GROW Vanessa implement me")
20-
import IPython
21-
22-
IPython.embed()
35+
# For now use the ensemble type as the name
36+
# TODO this needs to be caught and decided upon - retry?
37+
response = self.client.action_request(self.name, "grow", {})
38+
print(response)
2339

2440
def shrink(self, rule, record=None):
2541
"""
2642
Request to the API to shrink the MiniCluster
2743
"""
28-
print("SHRINK Vanessa implement me")
29-
import IPython
30-
31-
IPython.embed()
44+
response = self.client.action_request(self.name, "shrink", {})
45+
print(response)
3246

3347
@property
3448
def name(self):
35-
return "MiniCluster"
49+
"""
50+
Name is used to identify the ensemble member type.
51+
"""
52+
return "minicluster"

ensemble/members/flux/queue.py

Lines changed: 37 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,6 @@ class FluxQueue(MemberBase):
5959

6060
def __init__(
6161
self,
62-
job_filters=None,
63-
include_inactive=False,
6462
refresh_interval=0,
6563
summary_frequency=10,
6664
**kwargs,
@@ -69,15 +67,11 @@ def __init__(
6967
Create a new flux handle to monitor a queue.
7068
7169
Parameters:
72-
job_filters (dict) : key value pairs for attributes or states to filter jobs to
73-
include_inactive (bool): consider and include inactive jobs when doing initial parse
70+
summary_frequency (int): how often (events) to show summary
7471
refresh_interval (int) : number of seconds to refresh all metrics (0 indicates no refresh)
7572
to 60 seconds. If you set to 0, it will not be set.
7673
"""
77-
# Filter jobs to any attributes, states, or similar.
78-
self.filters = job_filters or {}
7974
self.handle = flux.Flux()
80-
self.include_inactive = include_inactive
8175
self.refresh_interval = refresh_interval
8276

8377
# How often on job completions to summarize?
@@ -88,13 +82,19 @@ def __init__(
8882
# https://github.com/flux-framework/flux-core/blob/master/src/modules/job-manager/journal.c#L28-L33
8983
self.seen_sentinel = False
9084

85+
# Have we run the start events?
86+
self.started = False
87+
9188
# We store the job id associated with a group until it's cleaned up
9289
self.jobids = {}
9390
super().__init__(**kwargs)
9491

9592
@property
9693
def name(self):
97-
return "FluxQueue"
94+
"""
95+
Name is used to identify the ensemble member type.
96+
"""
97+
return "flux"
9898

9999
def terminate(self):
100100
"""
@@ -148,8 +148,8 @@ def record_heartbeat_metrics(self):
148148
will be more like a moving average where the same jobs (if they
149149
are still pending) get counted again, possibly increasing time.
150150
"""
151-
groups = {}
152-
for _, group in self.jobids:
151+
groups = set()
152+
for _, group in self.jobids.items():
153153
groups.add(group["name"])
154154

155155
# If we have a submit but not a start, we haven't included
@@ -231,12 +231,11 @@ def record_event(self, record):
231231
Record the event. This needs to be specific to the workload manager.
232232
"""
233233
# The sentinel tells us when the "backlog" is finished
234-
if not self.seen_sentinel:
235-
if record["id"] == -1:
236-
if self.cfg.debug_logging:
237-
print("Sentinel is seen, starting event monitoring.")
238-
self.seen_sentinel = True
239-
return
234+
if record["id"] == -1:
235+
if self.cfg.debug_logging:
236+
print("Sentinel is seen, starting event monitoring.")
237+
self.seen_sentinel = True
238+
return
240239

241240
# Record metrics for the event
242241
self.record_metrics(record)
@@ -260,21 +259,33 @@ def check_event(self, record, event):
260259
for rule in self.iter_rules(job_event):
261260
self.execute_rule(rule, record)
262261

262+
def ensure_started(self):
263+
"""
264+
Run start rules.
265+
"""
266+
if self.started:
267+
return
268+
269+
# Iterate through actions provided by the rule
270+
for rule in self.iter_rules("start"):
271+
self.execute_rule(rule)
272+
self.started = True
273+
263274
def start(self):
264275
"""
265276
Init the events subscriber (no longer pub sub but a callback)
266277
to the flux queue. See:
267278
268279
https://github.com/flux-framework/flux-core/blob/master/src/modules/job-manager/journal.c#L11-L41
269280
"""
270-
# Iterate through actions provided by the rule
271-
for rule in self.iter_rules("start"):
272-
self.execute_rule(rule)
273281

274282
def event_callback(response):
275283
"""
276284
Receive callback when a flux job posts an event.
277285
"""
286+
# This needs to happen when the event callback is running
287+
# Otherwise we get a race and can miss start events
288+
self.ensure_started()
278289
payload = response.get()
279290

280291
# Only print if the config has logging->debug set to true
@@ -312,7 +323,8 @@ def setup_heartbeat(self):
312323

313324
def heartbeat_callback(cls):
314325
print("💗 HEARTBEAT")
315-
print(cls)
326+
self.summarize()
327+
self.record_heartbeat_metrics()
316328

317329
# Instead we are using threading, which works!
318330
self.heartbeat = QueueHeartbeat(self.cfg.heartbeat, heartbeat_callback, cls=self)
@@ -374,6 +386,11 @@ def submit(self, rule, record=None):
374386
jobset = self.extract_jobs(group)
375387

376388
for job in jobset:
389+
# If the number of tasks < node count we get an error
390+
# assume the user wants one task per node
391+
if job["tasks"] < job["nodes"]:
392+
job["tasks"] = job["nodes"]
393+
377394
jobspec = flux.job.JobspecV1.from_command(
378395
command=job["command"], num_nodes=job["nodes"], num_tasks=job["tasks"]
379396
)

ensemble/server.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,9 @@ def RequestAction(self, request, context):
9191
response = ensemble_service_pb2.Response()
9292

9393
# The member primarily is directed to take the action
94-
member = members.get_member(request.member)
95-
print(member)
94+
# member = members.get_member(request.member)
95+
# print(member)
96+
print(response)
9697

9798
if request.action == "grow":
9899
print("REQUEST TO GROW")

examples/grow-shrink-example.yaml

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
# Note that we don't need to "turn on" the heartbeat because grow is present.
2-
# I made these the size of the cluster so we trigger scaling at least once
3-
# We currently assume these are uniform (so no order enforced beyond the listing here)
1+
logging:
2+
heartbeat: 10
3+
44
jobs:
5+
6+
# Note that echo seems to lose events here, not great
7+
# not reliable for submit at the onset at least
58
- name: echo
69
command: echo hello world
7-
count: 5
10+
count: 10
811
nodes: 1
912

1013
# Note that this takes up the entirety of our 4 node faux
@@ -15,13 +18,6 @@ jobs:
1518
count: 10
1619
nodes: 4
1720

18-
# TODO need a test case for queue time waiting...
19-
# Also need to implement when with < > case.
20-
- name: big-sleep
21-
command: sleep 10
22-
count: 5
23-
nodes: 10
24-
2521
rules:
2622
# 1. This rule says to submit the sleep jobs when we start
2723
- trigger: start
@@ -39,7 +35,7 @@ rules:
3935

4036
# When the pending time of sleep is > 5 seconds, grow the cluster.
4137
- trigger: metric
42-
name: pending-duration.mean
38+
name: mean.sleep-pending
4339
when: "> 5"
4440
action:
4541
name: grow

0 commit comments

Comments
 (0)