Skip to content

Commit dc24180

Browse files
committed
flux: heartbeat is working!
Signed-off-by: vsoch <[email protected]>
1 parent 82b92fb commit dc24180

File tree

2 files changed

+31
-32
lines changed

2 files changed

+31
-32
lines changed

ensemble/members/flux/queue.py

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,6 @@ def terminate(self):
101101
Custom termination function for flux.
102102
"""
103103
self.handle.reactor_stop()
104-
if self.cfg.heartbeat:
105-
self.heartbeat.stop()
106104

107105
def record_metrics(self, record):
108106
"""
@@ -301,9 +299,38 @@ def event_callback(response):
301299
flags=flux.constants.FLUX_RPC_STREAMING,
302300
)
303301
events.then(event_callback)
304-
self.setup_heartbeat()
302+
self.setup_flux_heartbeat()
305303
self.reactor_start()
306304

305+
def setup_flux_heartbeat(self):
306+
"""
307+
Start the heartbeat via a flux watcher.
308+
"""
309+
# Exit early if we aren't using a heartbeat
310+
if not self.cfg.heartbeat:
311+
return
312+
313+
# Customize the heartbeat duration by reloading the module
314+
self.handle.rpc("module.remove", {"name": "heartbeat"}).get()
315+
self.handle.rpc(
316+
"module.load", {"path": "heartbeat", "args": [f"period={self.cfg.heartbeat}s"]}
317+
).get()
318+
319+
def heartbeat_callback(handle, msg_handler, msg, arg):
320+
print("💗 HEARTBEAT")
321+
self.summarize()
322+
self.record_heartbeat_metrics()
323+
324+
self.handle.event_subscribe("heartbeat.pulse")
325+
326+
# Create a message handler for the heartbeat.pulse event messages
327+
watcher = self.handle.msg_watcher_create(
328+
heartbeat_callback, flux.constants.FLUX_MSGTYPE_EVENT, "heartbeat.pulse"
329+
)
330+
# This relies on self.handle.reactor_start()
331+
# this is called in the parent function, once.
332+
watcher.start()
333+
307334
def reactor_start(self):
308335
"""
309336
Courtesy function to start the reactor and more
@@ -330,34 +357,6 @@ def heartbeat_callback(cls):
330357
self.heartbeat = QueueHeartbeat(self.cfg.heartbeat, heartbeat_callback, cls=self)
331358
self.heartbeat.start()
332359

333-
def cron_heartbeat(self):
334-
"""
335-
cron heartbeat provided by flux (does not work)
336-
"""
337-
338-
def heartbeat_callback(response):
339-
print("💗 HEARTBEAT")
340-
print(response)
341-
342-
# Create a cron heartbeat every N seconds, only if we have a heartbeat set
343-
# This is intended for grow/shrink actions that might need a regular check
344-
print(f" 💗 Creating flux heartbeat every {self.cfg.heartbeat} seconds")
345-
heartbeat = self.handle.rpc(
346-
"cron.create",
347-
{
348-
"type": "interval",
349-
"name": "heartbeat",
350-
"command": "sleep 0",
351-
"args": {"interval": self.cfg.heartbeat},
352-
},
353-
flux.constants.FLUX_NODEID_ANY,
354-
flags=flux.constants.FLUX_RPC_STREAMING,
355-
)
356-
357-
self.handle.flux_event_subscribe("cron.*")
358-
self.handle.event_subscribe("cron.*")
359-
heartbeat.then(heartbeat_callback)
360-
361360
def custom(self, rule, record=None):
362361
"""
363362
Custom runs a custom action (and runs another action, if returned)

examples/grow-shrink-example.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
logging:
2-
heartbeat: 10
2+
heartbeat: 3
33

44
jobs:
55

0 commit comments

Comments
 (0)