Skip to content

Commit 8649990

Browse files
committed
feat: support for flux minicluster
Flux running in the MiniCluster contexts adds the cool features to grow/shrink (autoscaling), which can be paired with an actual cluster autoscaler (or not) if the mini cluster is not using all the nodes available. To support this we need a few things - first a heartbeat, and one that runs at a user specified increment (and likely this should also be exposed in the ensemble config) because it is very likely the case that the triggers are not linked to jobs (for example, "run this when the queue wait time is over X for this job group"). We then need the MiniCluster member, which is exactly the same as flux, but instead has added the grow/shrink. I think likely what I want to do is have the heartbeat disabled unless the setting is found in the ensemble config OR the action to grow/shrink is found (and we would use a default heartbeat seconds of 60). This time should obviously be tested for different applications. Signed-off-by: vsoch <[email protected]>
1 parent bb9520e commit 8649990

17 files changed

+281
-166
lines changed

Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ python: python ## Generate python proto files in python
1717
docker-build:
1818
docker build -t ${IMG} .
1919

20+
.PHONY: kind-load
21+
kind-load: docker-build # For local development with kind
22+
kind load docker-image ${IMG}
23+
2024
.PHONY: arm-build
2125
arm-build:
2226
docker buildx build --platform linux/arm64 --build-arg ARCH=aarch_64 -t ${ARMIMG} .

README.md

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,27 @@ Note that yes, this means "submit" is both an action and an event.
6565
6666
The design of a rule is to have an action, and the action is something your ensemble can be tasked to do when an event is triggered. Right now we support the following:
6767
68-
- **submit**: submit a job
69-
- **terminate**: terminate the member. This is usually what you want to call to have the entire thing exit on 0
70-
- **custom**: run a custom function that will receive known kwargs, and then should return an action (read more below)
68+
**Flux and MiniCluster**
7169
70+
- *submit*: submit a job
71+
- *terminate*: terminate the member. This is usually what you want to call to have the entire thing exit on 0
72+
- *custom*: run a custom function that will receive known kwargs, and then should return an action (read more below)
73+
74+
**MiniCluster Only**
75+
76+
- *grow*: grow (or request) the cluster to scale up
77+
- *shrink*: shrink (or request) the cluster to scale down
78+
79+
For the scale operations, since this brings in the issue of resource contention between different ensembles, we have to assume to start that a request to scale:
80+
81+
1. Should only happen once. If it's granted, great, if not, we aren't going to ask again.
82+
2. Does not need to consider another level of scheduler (e.g., fair shaire)
83+
84+
I started thinking about putting a second scheduler (or fair share algorithm) in the grpc service, but realized this was another level of complexity that although we might work on it later, is not warranted yet.
7285
We see "submit" as two examples in the above, which is a common thing you'd want to do! For each action, you should minimally define the "name" and a "label" that typically corresponds to a job group.
7386
You can also optionally define "repetitions," which are the number of times the action should be run before expiring. If you want a backoff period between repetitions, set "backoff" to a non zero value.
7487
By default, when no repetitions or backoff are set, the action is assumed to have a repetition of 1. It will be run once! Let's now look at a custom action. Here is what your function should look like in your `ensemble.yaml`
7588

76-
7789
##### Actions
7890

7991

ensemble/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.0.15"
1+
__version__ = "0.0.16"

ensemble/heartbeat.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import signal
2+
import threading
3+
4+
5+
class GracefulExit(Exception):
6+
"""
7+
Ensure if we press control+C, it doesn't throw up.
8+
"""
9+
10+
pass
11+
12+
13+
class QueueHeartbeat(threading.Thread):
14+
"""
15+
The Queue Heartbeat triggers at a user specified interval, with the
16+
intention to be able to run events that might not be linked to jobs.
17+
"""
18+
19+
def __init__(self, interval_seconds, callback, **kwargs):
20+
super().__init__()
21+
self.stop_event = threading.Event()
22+
self.interval_seconds = interval_seconds
23+
self.callback = callback
24+
self.kwargs = kwargs
25+
26+
def run(self):
27+
"""
28+
Run the heartbeat function, and exit gracefully
29+
"""
30+
while not self.stop_event.wait(self.interval_seconds):
31+
try:
32+
self.callback(**self.kwargs)
33+
except (KeyboardInterrupt, GracefulExit):
34+
break
35+
36+
def stop(self):
37+
"""
38+
Explicit stop of the thread, like that will happen :)
39+
"""
40+
self.stop_event.set()
41+
42+
43+
def signal_handler(signum, frame):
44+
"""
45+
The signal handler follows an expected pattern, but
46+
just calls a graceful exit
47+
"""
48+
raise GracefulExit()
49+
50+
51+
# Signals for our heartbeat
52+
signal.signal(signal.SIGINT, signal_handler)
53+
signal.signal(signal.SIGTERM, signal_handler)

ensemble/members/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,9 @@ def get_member(name, options=None):
77
if name == "flux":
88
from ensemble.members.flux.queue import FluxQueue
99

10+
return FluxQueue(**options)
11+
if name == "minicluster":
12+
from ensemble.members.flux.minicluster import FluxQueue
13+
1014
return FluxQueue(**options)
1115
raise ValueError(f"Member type {name} is not known")

ensemble/members/auth.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from contextlib import contextmanager
2+
3+
import grpc
4+
5+
6+
@contextmanager
7+
def grpc_channel(host, use_ssl=False):
8+
"""
9+
Yield a channel, either with or without ssl, and close properly.
10+
"""
11+
if use_ssl:
12+
channel = grpc.secure_channel(host, grpc.ssl_channel_credentials())
13+
else:
14+
channel = grpc.insecure_channel(host)
15+
try:
16+
yield channel
17+
finally:
18+
channel.close()

ensemble/members/base.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ def announce(self, message, meta=None, ljust=15, color="cyan"):
8787
def execute_action(self, rule, record=None):
8888
"""
8989
Execute a general action, it's assumed that
90-
the trigger was called if we get here.
90+
the trigger was called if we get here. Those supported by
91+
Flux and the MiniCluster: submit, custom, terminate.
92+
MiniCluster Only: shrink and grow.
9193
"""
9294
# This function checks for repetitions and backoff
9395
# periods, and determines if we should continue (to run)
@@ -101,9 +103,15 @@ def execute_action(self, rule, record=None):
101103
self.announce(f" submit {rule.action.label} ", self.cfg.pretty_job(rule.action.label))
102104
return self.submit(rule, record)
103105

104-
if rule.action.name == "custom":
105-
self.announce(f" custom {rule.action.label}")
106-
return self.custom_action(rule, record)
106+
# These all have the same logic to call the function of the same name
107+
if rule.action.name in ["custom", "grow", "shrink"]:
108+
run_action = getattr(self, rule.action.name, None)
109+
110+
# Not supported if the function does not exist
111+
if not run_action:
112+
raise NotImplementedError("Action {rule.action.name} is not supported.")
113+
self.announce(f" {rule.action.name} {rule.action.label}")
114+
run_action(rule, record)
107115

108116
# Note that terminate exits but does not otherwise touch
109117
# the queue, etc. Given a reactor, we should just stop it
@@ -153,9 +161,6 @@ def start(self, *args, **kwargs):
153161
"""
154162
raise NotImplementedError
155163

156-
def custom_action(self, rule, record=None):
157-
raise NotImplementedError
158-
159164
def submit(self, *args, **kwargs):
160165
"""
161166
Submit a job

ensemble/members/client.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import ensemble.members.auth as auth
2+
from ensemble.protos import ensemble_service_pb2, ensemble_service_pb2_grpc
3+
4+
5+
class EnsembleClient:
6+
"""
7+
The EnsembleClient is used by members to communicate with the grpc service.
8+
9+
The grpc service will receive requests to grow/shrink, etc (requests that fall outside
10+
of the scope of the ensemble and require changing the member) and issue some kind of event
11+
that can be listened to by an entity to do it (e.g., Kubernetes). Right now we support
12+
update requests (to scale up and down) and status requests (to check on state that
13+
some grpc endpoint sees).
14+
"""
15+
16+
def __init__(self, host="localhost:50051", use_ssl=False):
17+
self.host = host
18+
self.use_ssl = use_ssl
19+
20+
def update_request(self, member, action, payload):
21+
"""
22+
Send an update request to the grpc server.
23+
"""
24+
# These are submit variables. A more substantial submit script would have argparse, etc.
25+
request = ensemble_service_pb2.UpdateRequest(member=member, action=action, payload=payload)
26+
27+
with auth.grpc_channel(self.host, self.use_ssl) as channel:
28+
stub = ensemble_service_pb2_grpc.EnsembleOperatorStub(channel)
29+
response = stub.ReceiveJobs(request)
30+
31+
# Case 1: SUCCESS
32+
if response.status == 1:
33+
print("Successful update.")
34+
else:
35+
print("Issue with requesting update")
36+
return response

ensemble/members/flux/minicluster.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from ensemble.members.flux import FluxQueue as MemberBase
2+
3+
# These are triggers supported for rules
4+
rules = ["start", "metric"]
5+
6+
7+
class FluxQueue(MemberBase):
8+
"""
9+
The Flux Queue MiniCluster member type
10+
11+
It uses the FluxQueue as a base. The main difference is that this
12+
member supports scale up and scale down for actions.
13+
"""
14+
15+
def grow(self, rule, record=None):
16+
"""
17+
Request to the API to grow the MiniCluster
18+
"""
19+
print("GROW Vanessa implement me")
20+
import IPython
21+
22+
IPython.embed()
23+
24+
def shrink(self, rule, record=None):
25+
"""
26+
Request to the API to shrink the MiniCluster
27+
"""
28+
print("SHRINK Vanessa implement me")
29+
import IPython
30+
31+
IPython.embed()
32+
33+
@property
34+
def name(self):
35+
return "MiniCluster"

ensemble/members/flux/queue.py

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
import sys
33
from datetime import datetime
44

5+
from ensemble.heartbeat import QueueHeartbeat
56
from ensemble.members.base import MemberBase
67

78
try:
89
import flux
910
import flux.constants
1011
import flux.job
12+
import flux.message
1113
except ImportError:
1214
sys.exit("flux python is required to use the flux queue member")
1315

@@ -61,6 +63,7 @@ def __init__(
6163
include_inactive=False,
6264
refresh_interval=0,
6365
summary_frequency=10,
66+
heartbeat_seconds=60,
6467
**kwargs,
6568
):
6669
"""
@@ -70,6 +73,8 @@ def __init__(
7073
job_filters (dict) : key value pairs for attributes or states to filter jobs to
7174
include_inactive (bool): consider and include inactive jobs when doing initial parse
7275
refresh_interval (int) : number of seconds to refresh all metrics (0 indicates no refresh)
76+
heartbeat_seconds (int): number of seconds to heartbeat (for grow/shrink). This defaults
77+
to 60 seconds. If you set to 0, it will not be set.
7378
"""
7479
# Filter jobs to any attributes, states, or similar.
7580
self.filters = job_filters or {}
@@ -80,6 +85,7 @@ def __init__(
8085
# How often on job completions to summarize?
8186
self.summary_freqency = summary_frequency
8287
self.completion_counter = 0
88+
self.heartbeat_seconds = heartbeat_seconds
8389

8490
# The flux sentinel tells us when we have finished with the backlog
8591
# https://github.com/flux-framework/flux-core/blob/master/src/modules/job-manager/journal.c#L28-L33
@@ -98,6 +104,8 @@ def terminate(self):
98104
Custom termination function for flux.
99105
"""
100106
self.handle.reactor_stop()
107+
if self.heartbeat_seconds:
108+
self.heartbeat.stop()
101109

102110
def record_metrics(self, record):
103111
"""
@@ -258,11 +266,65 @@ def event_callback(response):
258266
flags=flux.constants.FLUX_RPC_STREAMING,
259267
)
260268
events.then(event_callback)
261-
self.handle.reactor_run()
269+
self.setup_heartbeat()
270+
self.reactor_start()
262271

263-
def custom_action(self, rule, record=None):
272+
def reactor_start(self):
264273
"""
265-
Custom action runs the action (and runs another action, if returned)
274+
Courtesy function to start the reactor and more
275+
gracefully handle keyboard interrupts.
276+
"""
277+
try:
278+
self.handle.reactor_run()
279+
except KeyboardInterrupt:
280+
self.terminate()
281+
282+
def setup_heartbeat(self):
283+
"""
284+
Setup the heartbeat - a threading.Thread
285+
"""
286+
if not self.heartbeat_seconds:
287+
return
288+
289+
def heartbeat_callback(cls):
290+
print("💗 HEARTBEAT")
291+
print(cls)
292+
293+
# Instead we are using threading, which works!
294+
self.heartbeat = QueueHeartbeat(self.heartbeat_seconds, heartbeat_callback, cls=self)
295+
self.heartbeat.start()
296+
297+
def cron_heartbeat(self):
298+
"""
299+
cron heartbeat provided by flux (does not work)
300+
"""
301+
302+
def heartbeat_callback(response):
303+
print("💗 HEARTBEAT")
304+
print(response)
305+
306+
# Create a cron heartbeat every N seconds, only if we have a heartbeat set
307+
# This is intended for grow/shrink actions that might need a regular check
308+
print(f" 💗 Creating flux heartbeat every {self.heartbeat_seconds} seconds")
309+
heartbeat = self.handle.rpc(
310+
"cron.create",
311+
{
312+
"type": "interval",
313+
"name": "heartbeat",
314+
"command": "sleep 0",
315+
"args": {"interval": self.heartbeat_seconds},
316+
},
317+
flux.constants.FLUX_NODEID_ANY,
318+
flags=flux.constants.FLUX_RPC_STREAMING,
319+
)
320+
321+
self.handle.flux_event_subscribe("cron.*")
322+
self.handle.event_subscribe("cron.*")
323+
heartbeat.then(heartbeat_callback)
324+
325+
def custom(self, rule, record=None):
326+
"""
327+
Custom runs a custom action (and runs another action, if returned)
266328
and passes forward the flux handle and other metadata.
267329
"""
268330
kwargs = {

ensemble/protos/ensemble_service_pb2.py

Lines changed: 10 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)