Skip to content

Commit 4ef9d7b

Browse files
committed
add state machine
state_machine test state_machine_update
1 parent 6477929 commit 4ef9d7b

File tree

3 files changed

+434
-0
lines changed

3 files changed

+434
-0
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
from state_machine import StateMachine
2+
3+
4+
class Robot:
5+
def __init__(self):
6+
self.battery = 100
7+
self.task_progress = 0
8+
9+
# Initialize state machine
10+
self.machine = StateMachine(self, "robot_sm")
11+
12+
# Add state transition rules
13+
self.machine.add_transition(
14+
src_state="patrolling",
15+
event="detect_task",
16+
dst_state="executing_task",
17+
guard=None,
18+
action=None,
19+
)
20+
21+
self.machine.add_transition(
22+
src_state="executing_task",
23+
event="task_complete",
24+
dst_state="patrolling",
25+
guard=None,
26+
action="reset_task",
27+
)
28+
29+
self.machine.add_transition(
30+
src_state="executing_task",
31+
event="low_battery",
32+
dst_state="returning_to_base",
33+
guard="is_battery_low",
34+
)
35+
36+
self.machine.add_transition(
37+
src_state="returning_to_base",
38+
event="reach_base",
39+
dst_state="charging",
40+
guard=None,
41+
action=None,
42+
)
43+
44+
self.machine.add_transition(
45+
src_state="charging",
46+
event="charge_complete",
47+
dst_state="patrolling",
48+
guard=None,
49+
action="battery_full",
50+
)
51+
52+
# Set initial state
53+
self.machine.set_current_state("patrolling")
54+
55+
def is_battery_low(self):
56+
"""Battery level check condition"""
57+
return self.battery < 30
58+
59+
def reset_task(self):
60+
"""Reset task progress"""
61+
self.task_progress = 0
62+
print("[Action] Task progress has been reset")
63+
64+
# Modify state entry callback naming convention (add state_ prefix)
65+
def on_enter_executing_task(self):
66+
print("\n------ Start Executing Task ------")
67+
print(f"Current battery: {self.battery}%")
68+
while self.machine.get_current_state().name == "executing_task":
69+
self.task_progress += 10
70+
self.battery -= 25
71+
print(
72+
f"Task progress: {self.task_progress}%, Remaining battery: {self.battery}%"
73+
)
74+
75+
if self.task_progress >= 100:
76+
self.machine.process("task_complete")
77+
break
78+
elif self.is_battery_low():
79+
self.machine.process("low_battery")
80+
break
81+
82+
def on_enter_returning_to_base(self):
83+
print("\nLow battery, returning to charging station...")
84+
self.machine.process("reach_base")
85+
86+
def on_enter_charging(self):
87+
print("\n------ Charging ------")
88+
self.battery = 100
89+
print("Charging complete!")
90+
self.machine.process("charge_complete")
91+
92+
93+
# Keep the test section structure the same, only modify the trigger method
94+
if __name__ == "__main__":
95+
robot = Robot()
96+
97+
print(f"Initial state: {robot.machine.get_current_state().name}")
98+
print("------------")
99+
100+
# Trigger task detection event
101+
robot.machine.process("detect_task")
102+
103+
print("\n------------")
104+
print(f"Final state: {robot.machine.get_current_state().name}")
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
from collections.abc import Callable
2+
from state_machine import StateMachine
3+
4+
5+
class EventBus:
6+
def __init__(self):
7+
self.subscribers = {}
8+
9+
def subscribe(self, event: str, callback: Callable):
10+
if event not in self.subscribers:
11+
self.subscribers[event] = []
12+
self.subscribers[event].append(callback)
13+
14+
def publish(self, event: str):
15+
if event in self.subscribers:
16+
for callback in self.subscribers[event]:
17+
callback()
18+
else:
19+
raise ValueError(f"Invalid event: {event}")
20+
21+
22+
class SlamModel:
23+
def __init__(self, event_bus: EventBus, mapping_success: bool = False):
24+
self.event_bus = event_bus
25+
self.mapping_success = mapping_success
26+
27+
def on_enter_localization(self):
28+
self.event_bus.publish("top_localization_ready_event")
29+
30+
def on_enter_mapping(self):
31+
self.mapping_success = True
32+
33+
def is_mapping_success(self):
34+
return self.mapping_success
35+
36+
37+
class PlanningModel:
38+
def __init__(self, event_bus: EventBus):
39+
self.event_bus = event_bus
40+
41+
def on_exit_working(self):
42+
self.event_bus.publish("top_stop_working_event")
43+
44+
45+
class TopModel:
46+
def __init__(self, event_bus: EventBus):
47+
self.event_bus = event_bus
48+
49+
def on_enter_pre_working(self):
50+
self.event_bus.publish("slam_start_localization_event")
51+
52+
def on_enter_working(self):
53+
self.event_bus.publish("planning_start_working_event")
54+
55+
def on_enter_mapping(self):
56+
self.event_bus.publish("planning_start_remote_control_control_event")
57+
self.event_bus.publish("slam_start_mapping_event")
58+
59+
def on_exit_mapping(self):
60+
self.event_bus.publish("planning_stop_remote_control_control_event")
61+
self.event_bus.publish("slam_stop_mapping_event")
62+
63+
64+
def main():
65+
event_bus = EventBus()
66+
67+
slam_model = SlamModel(event_bus)
68+
planning_model = PlanningModel(event_bus)
69+
top_model = TopModel(event_bus)
70+
71+
slam_machine = StateMachine(slam_model, "slam_machine")
72+
planning_machine = StateMachine(planning_model, "planning_machine")
73+
top_machine = StateMachine(top_model, "top_machine")
74+
75+
# fmt: off
76+
slam_machine.add_transition("idle", "start_localization_event", "localization", "is_mapping_success")
77+
slam_machine.add_transition("localization", "stop_localization_event", "idle")
78+
slam_machine.add_transition("idle", "start_mapping_event", "mapping")
79+
slam_machine.add_transition("mapping", "stop_mapping_event", "idle")
80+
81+
planning_machine.add_transition("idle", "start_working_event", "working")
82+
planning_machine.add_transition("idle", "stop_working_event", "idle")
83+
planning_machine.add_transition("working", "stop_working_event", "idle")
84+
planning_machine.add_transition("idle", "start_remote_control_control_event", "remote_control_control")
85+
planning_machine.add_transition("remote_control_control", "stop_remote_control_control_event", "idle")
86+
87+
top_machine.add_transition("idle", "start_working_event", "pre_working")
88+
top_machine.add_transition("pre_working", "localization_ready_event", "working")
89+
top_machine.add_transition("pre_working", "stop_working_event", "idle")
90+
top_machine.add_transition("working", "stop_working_event", "idle")
91+
top_machine.add_transition("idle", "start_mapping_event", "mapping")
92+
top_machine.add_transition("mapping", "stop_mapping_event", "idle")
93+
# fmt: on
94+
event_bus.subscribe(
95+
"slam_start_localization_event",
96+
lambda: slam_machine.process("start_localization_event"),
97+
)
98+
event_bus.subscribe(
99+
"slam_start_mapping_event",
100+
lambda: slam_machine.process("start_mapping_event"),
101+
)
102+
event_bus.subscribe(
103+
"slam_stop_mapping_event",
104+
lambda: slam_machine.process("stop_mapping_event"),
105+
)
106+
event_bus.subscribe(
107+
"planning_start_working_event",
108+
lambda: planning_machine.process("start_working_event"),
109+
)
110+
event_bus.subscribe(
111+
"planning_start_remote_control_control_event",
112+
lambda: planning_machine.process("start_remote_control_control_event"),
113+
)
114+
event_bus.subscribe(
115+
"planning_stop_remote_control_control_event",
116+
lambda: planning_machine.process("stop_remote_control_control_event"),
117+
)
118+
event_bus.subscribe(
119+
"top_localization_ready_event",
120+
lambda: top_machine.process("localization_ready_event"),
121+
)
122+
event_bus.subscribe(
123+
"top_mapping_ready_event",
124+
lambda: top_machine.process("mapping_ready_event"),
125+
)
126+
event_bus.subscribe(
127+
"top_stop_working_event",
128+
lambda: top_machine.process("stop_working_event"),
129+
)
130+
131+
def working_task():
132+
slam_machine.set_current_state("idle")
133+
planning_machine.set_current_state("idle")
134+
top_machine.set_current_state("idle")
135+
# User sends start working event
136+
top_machine.process("start_working_event")
137+
138+
# Planning Model finish the task, and send stop working event
139+
planning_machine.process("stop_working_event")
140+
141+
print("top_machine: ", top_machine.get_current_state().name)
142+
print("planning_machine: ", planning_machine.get_current_state().name)
143+
print("slam_machine: ", slam_machine.get_current_state().name)
144+
145+
working_task()
146+
147+
def mapping_task():
148+
slam_machine.set_current_state("idle")
149+
planning_machine.set_current_state("idle")
150+
top_machine.set_current_state("idle")
151+
# User sends start mapping event
152+
top_machine.process("start_mapping_event")
153+
# User sends stop mapping event
154+
top_machine.process("stop_mapping_event")
155+
156+
print("top_machine: ", top_machine.get_current_state().name)
157+
print("planning_machine: ", planning_machine.get_current_state().name)
158+
print("slam_machine: ", slam_machine.get_current_state().name)
159+
160+
mapping_task()
161+
162+
163+
if __name__ == "__main__":
164+
main()

0 commit comments

Comments
 (0)