13
13
Position ,
14
14
)
15
15
import heapq
16
- from collections .abc import Generator
17
16
import random
18
17
from dataclasses import dataclass
19
18
from functools import total_ordering
20
-
19
+ import time
21
20
22
21
# Seed randomness for reproducibility
23
22
RANDOM_SEED = 50
@@ -55,6 +54,10 @@ def __eq__(self, other: object):
55
54
return NotImplementedError (f"Cannot compare Node with object of type: { type (other )} " )
56
55
return self .position == other .position and self .time == other .time and self .interval == other .interval
57
56
57
+ @dataclass
58
+ class EntryTimeAndInterval :
59
+ entry_time : int
60
+ interval : Interval
58
61
59
62
class NodePath :
60
63
path : list [Node ]
@@ -130,24 +133,22 @@ def plan(self, verbose: bool = False) -> NodePath:
130
133
print (f"Found path to goal after { len (expanded_list )} expansions" )
131
134
path = []
132
135
path_walker : Node = expanded_node
133
- while path_walker . parent_index != - 1 :
136
+ while True :
134
137
path .append (path_walker )
138
+ if path_walker .parent_index == - 1 :
139
+ break
135
140
path_walker = expanded_list [path_walker .parent_index ]
136
- # TODO: fix hack around bad while condiiotn
137
- path .append (path_walker )
138
141
139
142
# reverse path so it goes start -> goal
140
143
path .reverse ()
141
144
return NodePath (path )
142
145
143
146
expanded_idx = len (expanded_list )
144
147
expanded_list .append (expanded_node )
145
- visited_intervals [expanded_node .position .x , expanded_node .position .y ].append ((expanded_node .time , expanded_node .interval ))
148
+ entry_time_and_node = EntryTimeAndInterval (expanded_node .time , expanded_node .interval )
149
+ add_entry_to_visited_intervals_array (entry_time_and_node , visited_intervals , expanded_node )
146
150
147
- # if len(expanded_set) > 100:
148
- # blarg
149
-
150
- for child in self .generate_successors (expanded_node , expanded_idx , verbose , safe_intervals , visited_intervals ):
151
+ for child in self .generate_successors (expanded_node , expanded_idx , safe_intervals , visited_intervals ):
151
152
heapq .heappush (open_set , child )
152
153
153
154
raise Exception ("No path found" )
@@ -157,7 +158,7 @@ def plan(self, verbose: bool = False) -> NodePath:
157
158
"""
158
159
# TODO: is intervals being passed by ref? (i think so?)
159
160
def generate_successors (
160
- self , parent_node : Node , parent_node_idx : int , verbose : bool , intervals : np .ndarray , visited_intervals : np .ndarray
161
+ self , parent_node : Node , parent_node_idx : int , intervals : np .ndarray , visited_intervals : np .ndarray
161
162
) -> list [Node ]:
162
163
new_nodes = []
163
164
diffs = [
@@ -185,60 +186,69 @@ def generate_successors(
185
186
if interval .start_time > current_interval .end_time :
186
187
break
187
188
188
- # TODO: this bit feels wonky
189
189
# if we have already expanded a node in this interval with a <= starting time, continue
190
190
better_node_expanded = False
191
191
for visited in visited_intervals [new_pos .x , new_pos .y ]:
192
- if interval == visited [ 1 ] and visited [ 0 ] <= parent_node .time + 1 :
192
+ if interval == visited . interval and visited . entry_time <= parent_node .time + 1 :
193
193
better_node_expanded = True
194
194
break
195
195
if better_node_expanded :
196
196
continue
197
197
198
198
# We know there is some overlap. Generate successor at the earliest possible time the
199
199
# new interval can be entered
200
- # TODO: dont love the optionl usage here
201
- new_node_t = None
202
200
for possible_t in range (max (parent_node .time + 1 , interval .start_time ), min (current_interval .end_time , interval .end_time )):
203
201
if self .grid .valid_position (new_pos , possible_t ):
204
- new_node_t = possible_t
202
+ new_nodes .append (Node (
203
+ new_pos ,
204
+ # entry is max of interval start and parent node start time (get there as soon as possible)
205
+ max (parent_node .time + 1 , interval .start_time ),
206
+ self .calculate_heuristic (new_pos ),
207
+ parent_node_idx ,
208
+ interval ,
209
+ ))
210
+ # break because all t's after this will make nodes with a higher cost, the same heuristic, and are in the same interval
205
211
break
206
212
207
- if new_node_t :
208
- # TODO: should be able to break here?
209
- new_nodes .append (Node (
210
- new_pos ,
211
- # entry is max of interval start and parent node start time (get there as soon as possible)
212
- max (parent_node .time + 1 , interval .start_time ),
213
- self .calculate_heuristic (new_pos ),
214
- parent_node_idx ,
215
- interval ,
216
- ))
217
-
218
213
return new_nodes
219
214
220
215
def calculate_heuristic (self , position ) -> int :
221
216
diff = self .goal - position
222
217
return abs (diff .x ) + abs (diff .y )
223
218
224
219
220
+ def add_entry_to_visited_intervals_array (entry_time_and_interval : EntryTimeAndInterval , visited_intervals : np .ndarray , expanded_node : Node ):
221
+ # if entry is present, update entry time if better
222
+ for existing_entry_and_interval in visited_intervals [expanded_node .position .x , expanded_node .position .y ]:
223
+ if existing_entry_and_interval .interval == entry_time_and_interval .interval :
224
+ existing_entry_and_interval .entry_time = min (existing_entry_and_interval .entry_time , entry_time_and_interval .entry_time )
225
+
226
+ # Otherwise, append
227
+ visited_intervals [expanded_node .position .x , expanded_node .position .y ].append (entry_time_and_interval )
228
+
229
+
225
230
show_animation = True
226
231
verbose = False
227
232
228
233
def main ():
229
- start = Position (1 , 1 )
234
+ start = Position (1 , 18 )
230
235
goal = Position (19 , 19 )
231
236
grid_side_length = 21
237
+
238
+ start_time = time .time ()
239
+
232
240
grid = Grid (
233
241
np .array ([grid_side_length , grid_side_length ]),
234
242
num_obstacles = 250 ,
235
243
obstacle_avoid_points = [start , goal ],
236
- # obstacle_arrangement=ObstacleArrangement.ARRANGEMENT1,
237
- obstacle_arrangement = ObstacleArrangement .RANDOM ,
244
+ obstacle_arrangement = ObstacleArrangement .ARRANGEMENT1 ,
245
+ # obstacle_arrangement=ObstacleArrangement.RANDOM,
238
246
)
239
247
240
248
planner = SafeIntervalPathPlanner (grid , start , goal )
241
249
path = planner .plan (verbose )
250
+ runtime = time .time () - start_time
251
+ print (f"Planning took: { runtime :.5f} seconds" )
242
252
243
253
if verbose :
244
254
print (f"Path: { path } " )
0 commit comments