Skip to content

Commit d4ad2b5

Browse files
committed
test: getting ready for testing with operator
Signed-off-by: vsoch <[email protected]>
1 parent dc24180 commit d4ad2b5

File tree

11 files changed

+101
-38
lines changed

11 files changed

+101
-38
lines changed

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ ensemble-server start
192192
# Run the hello-world example ensemble! it will submit and monitor job events, etc
193193
ensemble run examples/hello-world.yaml
194194
195+
# Here is how to add on the fly debug (logging->debug true)
196+
ensemble run --debug examples/hello-world.yaml
197+
195198
# This example shows using repetitions and backoff
196199
ensemble run examples/backoff-example.yaml
197200
@@ -206,11 +209,8 @@ ensemble run examples/terminate-example.yaml
206209
ensemble run examples/heartbeat-example.yaml
207210
208211
# Grow/shrink requires a minicluster (flux doesn't support it) but we can mock it here
209-
# TODO - add greater than parsing, then run example here, then in operator
210-
ensemble run --executor minicluster examples/grow-shrink-example.yaml
211-
212-
# Here is how to add on the fly debug (logging->debug true)
213-
ensemble run --debug --executor minicluster examples/grow-shrink-example.yaml
212+
# Note that a --name with <namespace>/<name> is required
213+
ensemble run --name default/ensemble --executor minicluster examples/grow-shrink-example.yaml
214214
```
215215

216216
Right now, this will run any rules with "start" triggers, which for this hello world example includes a few hello world jobs! You'll then be able to watch and see flux events coming in!

ensemble/client/__init__.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,27 @@ def get_parser():
4646
choices=defaults.supported_members,
4747
default="flux",
4848
)
49+
run.add_argument(
50+
"--name",
51+
help="Identifier for member (required for minicluster)",
52+
)
4953
run.add_argument(
5054
"--debug",
5155
help="Enable debug logging for the config",
5256
action="store_true",
5357
default=False,
5458
)
59+
run.add_argument(
60+
"--port",
61+
help=f"Port to run application (defaults to {defaults.port})",
62+
default=defaults.port,
63+
type=int,
64+
)
65+
run.add_argument(
66+
"--host",
67+
help="Host with server (defaults to localhost)",
68+
default="localhost",
69+
)
5570

5671
for command in [run]:
5772
command.add_argument(

ensemble/client/run.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,11 @@
22

33

44
def main(args, parser, extra, subparser):
5+
# Assemble options
6+
options = {"name": args.name, "port": args.port, "host": args.host}
7+
58
# This will raise an error if the member type (e.g., minicluster) is not known
6-
member = members.get_member(args.executor)
9+
member = members.get_member(args.executor, options=options)
10+
711
member.load(args.config, args.debug)
812
member.start()

ensemble/members/base.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@ def __init__(self, **options):
1616
"""
1717
Create a new member type (e.g., FluxQueue)
1818
"""
19-
# Set options as attributes
20-
for key, value in options.items():
21-
setattr(self, key, value)
19+
# Set options
20+
self.options = options
2221

2322
# Common queue metrics
2423
self.metrics = QueueMetrics()

ensemble/members/flux/minicluster.py

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,35 +13,76 @@ class FluxMiniClusterQueue(MemberBase):
1313
member supports scale up and scale down for actions.
1414
"""
1515

16-
# TODO for ensemble operator - we need the ensemble name to identify it
17-
# perhaps generate randomly or similar?
16+
def __init__(self, **kwargs):
17+
"""
18+
Create a new flux MiniCluster
19+
20+
Since we need to communicate back to the grpc service, we require a name
21+
and namespace. This should be provided via the name kwargs, which is parsed
22+
into both. E.g,, --name default/ensemble. If a namespace is not provided,
23+
default is assumed.
24+
"""
25+
super().__init__(**kwargs)
26+
self.set_identifier()
27+
28+
def set_identifier(self):
29+
"""
30+
Get the name/namespace of the MiniCluster
31+
"""
32+
if "name" not in self.options or not self.options["name"]:
33+
raise ValueError("A --name (namespace/name) is required for a minicluster")
34+
name = self.options["name"]
35+
namespace = "default"
36+
if "/" in name:
37+
namespace, name = name.split("/")
38+
self.options["name"] = name
39+
self.options["namespace"] = namespace
40+
41+
@property
42+
def host(self):
43+
"""
44+
Host can be customized with options, and defaults to localhost:50051
45+
"""
46+
host = self.options.get("host") or "localhost"
47+
port = self.options.get("port") or 50051
48+
return f"{host}:{port}"
1849

1950
@property
2051
def client(self):
2152
"""
2253
Ensure we have a connection to the service client.
2354
"""
24-
# Do we have the ensemble client?
25-
# TODO allow to customize host, etc.
2655
if hasattr(self, "_client"):
2756
return self._client
28-
self._client = EnsembleClient()
57+
self._client = EnsembleClient(host=self.host)
2958
return self._client
3059

3160
def grow(self, rule, record=None):
3261
"""
3362
Request to the API to grow the MiniCluster
3463
"""
64+
# We need to send over the name and namespace
65+
name = self.options["name"]
66+
namespace = self.options["namespace"]
67+
3568
# For now use the ensemble type as the name
3669
# TODO this needs to be caught and decided upon - retry?
37-
response = self.client.action_request(self.name, "grow", {})
70+
response = self.client.action_request(
71+
name=name, namespace=namespace, action="grow", payload={}
72+
)
3873
print(response)
3974

4075
def shrink(self, rule, record=None):
4176
"""
4277
Request to the API to shrink the MiniCluster
4378
"""
44-
response = self.client.action_request(self.name, "shrink", {})
79+
# We need to send over the name and namespace
80+
name = self.options["name"]
81+
namespace = self.options["namespace"]
82+
83+
response = self.client.action_request(
84+
name=name, namespace=namespace, action="shrink", payload={}
85+
)
4586
print(response)
4687

4788
@property

ensemble/members/flux/queue.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ class FluxQueue(MemberBase):
5959

6060
def __init__(
6161
self,
62-
refresh_interval=0,
6362
summary_frequency=10,
6463
**kwargs,
6564
):
@@ -68,11 +67,9 @@ def __init__(
6867
6968
Parameters:
7069
summary_frequency (int): how often (events) to show summary
71-
refresh_interval (int) : number of seconds to refresh all metrics (0 indicates no refresh)
7270
to 60 seconds. If you set to 0, it will not be set.
7371
"""
7472
self.handle = flux.Flux()
75-
self.refresh_interval = refresh_interval
7673

7774
# How often on job completions to summarize?
7875
self.summary_freqency = summary_frequency

ensemble/protos/ensemble_service_pb2.py

Lines changed: 10 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ensemble/protos/ensemble_service_pb2.pyi

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,24 @@ from typing import ClassVar as _ClassVar, Optional as _Optional, Union as _Union
66
DESCRIPTOR: _descriptor.FileDescriptor
77

88
class StatusRequest(_message.Message):
9-
__slots__ = ("member",)
9+
__slots__ = ("member", "namespace")
1010
MEMBER_FIELD_NUMBER: _ClassVar[int]
11+
NAMESPACE_FIELD_NUMBER: _ClassVar[int]
1112
member: str
12-
def __init__(self, member: _Optional[str] = ...) -> None: ...
13+
namespace: str
14+
def __init__(self, member: _Optional[str] = ..., namespace: _Optional[str] = ...) -> None: ...
1315

1416
class ActionRequest(_message.Message):
15-
__slots__ = ("member", "action", "payload")
17+
__slots__ = ("member", "namespace", "action", "payload")
1618
MEMBER_FIELD_NUMBER: _ClassVar[int]
19+
NAMESPACE_FIELD_NUMBER: _ClassVar[int]
1720
ACTION_FIELD_NUMBER: _ClassVar[int]
1821
PAYLOAD_FIELD_NUMBER: _ClassVar[int]
1922
member: str
23+
namespace: str
2024
action: str
2125
payload: str
22-
def __init__(self, member: _Optional[str] = ..., action: _Optional[str] = ..., payload: _Optional[str] = ...) -> None: ...
26+
def __init__(self, member: _Optional[str] = ..., namespace: _Optional[str] = ..., action: _Optional[str] = ..., payload: _Optional[str] = ...) -> None: ...
2327

2428
class Response(_message.Message):
2529
__slots__ = ("payload", "status")

ensemble/schema.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,15 @@
88
"type": "object",
99
# The only required thing is the algorithm, since we can respond to jobs that
1010
# are submit otherwise in the queue.
11-
"required": ["rules"],
11+
"required": ["rules", "jobs"],
1212
"properties": {
13-
"attributes": {"$ref": "#/definitions/algorithm"},
1413
"jobs": {"$ref": "#/definitions/jobs"},
1514
"rules": {"$ref": "#/definitions/rules"},
15+
"logging": {"$ref": "#/definitions/logging"},
16+
"custom": {"type": "string"},
1617
"additionalProperties": False,
1718
},
1819
"definitions": {
19-
"custom": {"type": "string"},
2020
"logging": {
2121
"type": "object",
2222
"properties": {

ensemble/server.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def RequestAction(self, request, context):
8383
Request an action is performed according to an algorithm.
8484
"""
8585
print(f"Member {request.member}")
86+
print(f"Namespace {request.namespace}")
8687
print(f"Action {request.action}")
8788
print(f"Payload {request.payload}")
8889

protos/ensemble-service.proto

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ message StatusRequest {
1414

1515
// This is the ensemble member type (e.g., minicluster)
1616
string member = 1;
17+
string namespace = 2;
1718
}
1819

1920
// ActionRequest requests an action
2021
message ActionRequest {
2122
string member = 1;
22-
string action = 2;
23-
string payload = 3;
23+
string namespace = 2;
24+
string action = 3;
25+
string payload = 4;
2426
}
2527

2628
message Response {

0 commit comments

Comments
 (0)