Skip to content
Merged
2 changes: 1 addition & 1 deletion GEMstack/knowledge/defaults/computation_graph.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ components:
inputs: [vehicle, roadgraph]
outputs: vehicle_lane
- parking_detection:
inputs: obstacles
inputs: all
outputs: [goal, obstacles]
- sign_detection:
inputs: [vehicle, roadgraph]
Expand Down
2 changes: 1 addition & 1 deletion GEMstack/knowledge/defaults/current.yaml
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this cleared with the control team?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spoke with Jugal. We had an out outdated pid_p value that we have now updated to match Controls. We do however need crosstrack_gain set to 0.5 and @Jugthegreat has agreed that this is an acceptable change.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ control:
pure_pursuit:
lookahead: 2.0
lookahead_scale: 3.0
crosstrack_gain: 1.0
crosstrack_gain: 0.5
desired_speed: trajectory
longitudinal_control:
pid_p: 1.0
Expand Down
2 changes: 1 addition & 1 deletion GEMstack/onboard/perception/cone_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,4 +506,4 @@ def box_to_fake_obstacle(box):


if __name__ == '__main__':
pass
pass
27 changes: 19 additions & 8 deletions GEMstack/onboard/perception/parking_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from sensor_msgs.msg import PointCloud2
from typing import Dict
from ..component import Component
from ...state import ObjectPose, ObjectFrameEnum, Obstacle, ObstacleMaterialEnum
from ...state import ObjectPose, ObjectFrameEnum, Obstacle, ObstacleMaterialEnum, ObstacleStateEnum, AllState
from ..interface.gem import GEMInterface
from .utils.constants import *
from .utils.math_utils import *
Expand Down Expand Up @@ -77,7 +77,7 @@ def rate(self) -> float:
return 10.0 # Hz

def state_inputs(self) -> list:
return ['obstacles']
return ['all']

def state_outputs(self) -> list:
return ['goal', 'obstacles']
Expand Down Expand Up @@ -149,14 +149,14 @@ def visualize(self,
self.pub_cones_centers_pc2.publish(ros_cones_centers_pc2)


def update(self, cone_obstacles: Dict[str, Obstacle]):
def update(self, state: AllState):
# Initial variables
parking_goals = []
best_parking_spots = []
parking_obstacles_poses = []
parking_obstacles_dims = []
grouped_ordered_ground_centers_2D = []

cone_obstacles = state.obstacles
# Populate cone points
cone_pts_3D = []
for cone in cone_obstacles.values():
Expand Down Expand Up @@ -231,7 +231,7 @@ def update(self, cone_obstacles: Dict[str, Obstacle]):
yaw=yaw,
pitch=0.0,
roll=0.0,
frame=ObjectFrameEnum.CURRENT
frame=ObjectFrameEnum.START
)
new_obstacle = Obstacle(
pose=obstacle_pose,
Expand All @@ -240,7 +240,7 @@ def update(self, cone_obstacles: Dict[str, Obstacle]):
material=ObstacleMaterialEnum.BARRIER,
collidable=True
)
parking_obstacles[obstacle_id] = new_obstacle
parking_obstacles[f"parking_obstacle_{obstacle_id}"] = new_obstacle
obstacle_id += 1

# Constructing goal pose
Expand All @@ -253,8 +253,19 @@ def update(self, cone_obstacles: Dict[str, Obstacle]):
yaw=yaw,
pitch=0.0,
roll=0.0,
frame=ObjectFrameEnum.CURRENT
frame=ObjectFrameEnum.START
)

DISTANCE_THRESHOLD = 8.0
if self.euclidean_distance((x,y), state) > DISTANCE_THRESHOLD: # we are not close enough to the parking spot
return None

new_state = [goal_pose, parking_obstacles]
return new_state
return new_state

def euclidean_distance(self, point, state):
x, y = point
vehicle_x = state.vehicle.pose.x
vehicle_y = state.vehicle.pose.y
distance = np.sqrt((x - vehicle_x) ** 2 + (y - vehicle_y) ** 2)
return distance
267 changes: 267 additions & 0 deletions GEMstack/onboard/planning/astar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
""" generic A-Star path searching algorithm (https://github.com/jrialland/python-astar/blob/master/tests/basic/test_basic.py) """
# @TODO make it so we return the best path as discussed in class
# In class, we discussed that we should have a terminal function
# which approximates the cost_to_go and returns the path to the node
# with the least cost_to_go + terminal_cost

from abc import ABC, abstractmethod
from typing import Callable, Dict, Iterable, Union, TypeVar, Generic
from math import inf as infinity
from operator import attrgetter
import heapq

# introduce generic type
T = TypeVar("T")


################################################################################
class SearchNode(Generic[T]):
"""Representation of a search node"""

__slots__ = ("data", "gscore", "fscore", "tscore", "closed", "came_from", "in_openset", "cache")

def __init__(
self, data: T, gscore: float = infinity, fscore: float = infinity, tscore:float = infinity
) -> None:
self.data = data
self.gscore = gscore
self.fscore = fscore
self.tscore = tscore
self.closed = False
self.in_openset = False
self.came_from: Union[None, SearchNode[T]] = None
self.cache: Any = None

def __lt__(self, b: "SearchNode[T]") -> bool:
"""Natural order is based on the fscore value & is used by heapq operations"""
return self.fscore < b.fscore


################################################################################
class SearchNodeDict(Dict[T, SearchNode[T]]):
"""A dict that returns a new SearchNode when a key is missing"""

def __missing__(self, k) -> SearchNode[T]:
v = SearchNode(k)
self.__setitem__(k, v)
return v


################################################################################
SNType = TypeVar("SNType", bound=SearchNode)


class OpenSet(Generic[SNType]):
def __init__(self) -> None:
self.heap: list[SNType] = []

def push(self, item: SNType) -> None:
item.in_openset = True
heapq.heappush(self.heap, item)

def pop(self) -> SNType:
item = heapq.heappop(self.heap)
item.in_openset = False
return item

def remove(self, item: SNType) -> None:
idx = self.heap.index(item)
item.in_openset = False
item = self.heap.pop()
if idx < len(self.heap):
self.heap[idx] = item
# Fix heap invariants
heapq._siftup(self.heap, idx)
heapq._siftdown(self.heap, 0, idx)

def __len__(self) -> int:
return len(self.heap)


################################################################################*


class AStar(ABC, Generic[T]):
__slots__ = ()

@abstractmethod
def heuristic_cost_estimate(self, current: T, goal: T) -> float:
"""
Computes the estimated (rough) distance between a node and the goal.
The second parameter is always the goal.

This method must be implemented in a subclass.
"""
raise NotImplementedError

@abstractmethod
def terminal_cost_estimate(self, current: T, goal: T) -> float:
"""Computes the estimated distance between a node and the goal.
This function is called after all iterations of A* have been run
and is used to determine the closest node to the goal found so far.

This method must be implemented in a subclass.

Args:
current (T): Current T
goal (T): goal T

Returns:
float: _description_
"""
raise NotImplementedError

def distance_between(self, n1: T, n2: T) -> float:
"""
Gives the real distance between two adjacent nodes n1 and n2 (i.e n2
belongs to the list of n1's neighbors).
n2 is guaranteed to belong to the list returned by the call to neighbors(n1).

This method (or "path_distance_between") must be implemented in a subclass.
"""
raise NotImplementedError

def path_distance_between(self, n1: SearchNode[T], n2: SearchNode[T]) -> float:
"""
Gives the real distance between the node n1 and its neighbor n2.
n2 is guaranteed to belong to the list returned by the call to
path_neighbors(n1).

Calls "distance_between"`by default.
"""
return self.distance_between(n1.data, n2.data)

def neighbors(self, node: T) -> Iterable[T]:
"""
For a given node, returns (or yields) the list of its neighbors.

This method (or "path_neighbors") must be implemented in a subclass.
"""
raise NotImplementedError

def path_neighbors(self, node: SearchNode[T]) -> Iterable[T]:
"""
For a given node, returns (or yields) the list of its reachable neighbors.
Calls "neighbors" by default.
"""
return self.neighbors(node.data)

def _neighbors(self, current: SearchNode[T], search_nodes: SearchNodeDict[T]) -> Iterable[SearchNode]:
return (search_nodes[n] for n in self.path_neighbors(current))

def is_goal_reached(self, current: T, goal: T) -> bool:
"""
Returns true when we can consider that 'current' is the goal.
The default implementation simply compares `current == goal`, but this
method can be overwritten in a subclass to provide more refined checks.
"""
return current == goal

def reconstruct_path(self, last: SearchNode, reversePath=False) -> Iterable[T]:
def _gen():
current = last
while current:
yield current.data
current = current.came_from

if reversePath:
return _gen()
else:
return reversed(list(_gen()))

def astar(
self, start: T, goal: T, reversePath: bool = False, iterations: int = 5000
) -> Union[Iterable[T], None]:
if self.is_goal_reached(start, goal):
return [start]

openSet: OpenSet[SearchNode[T]] = OpenSet()
searchNodes: SearchNodeDict[T] = SearchNodeDict()
startNode = searchNodes[start] = SearchNode(
start, gscore=0.0, fscore=self.heuristic_cost_estimate(start, goal)
)
openSet.push(startNode)
bestNode = startNode

iteration = 0

while openSet and iteration < iterations:
current = openSet.pop()

if self.is_goal_reached(current.data, goal):
return self.reconstruct_path(current, reversePath)

current.closed = True

for neighbor in self._neighbors(current, searchNodes):
if neighbor.closed:
continue

gscore = current.gscore + self.path_distance_between(current, neighbor)

if gscore >= neighbor.gscore:
continue

fscore = gscore + self.heuristic_cost_estimate(
neighbor.data, goal
)
tscore = self.terminal_cost_estimate(
neighbor.data, goal
)

# print(f"Checking node: {neighbor.data} with tscore {tscore}")
if tscore < bestNode.tscore:
# print(f"Found a better node: {neighbor.data} with tscore {tscore}")
bestNode = neighbor

if neighbor.in_openset:
if neighbor.fscore < fscore:
# the new path to this node isn't better
continue

# we have to remove the item from the heap, as its score has changed
openSet.remove(neighbor)

# update the node
neighbor.came_from = current
neighbor.gscore = gscore
neighbor.fscore = fscore
neighbor.tscore = tscore

openSet.push(neighbor)

iteration += 1

# print("Warning: A* search failed to find a path")
return self.reconstruct_path(bestNode, reversePath)


################################################################################
U = TypeVar("U")


def find_path(
start: U,
goal: U,
neighbors_fnct: Callable[[U], Iterable[U]],
reversePath=False,
heuristic_cost_estimate_fnct: Callable[[U, U], float] = lambda a, b: infinity,
distance_between_fnct: Callable[[U, U], float] = lambda a, b: 1.0,
is_goal_reached_fnct: Callable[[U, U], bool] = lambda a, b: a == b,
) -> Union[Iterable[U], None]:
"""A non-class version of the path finding algorithm"""

class FindPath(AStar):
def heuristic_cost_estimate(self, current: U, goal: U) -> float:
return heuristic_cost_estimate_fnct(current, goal) # type: ignore

def distance_between(self, n1: U, n2: U) -> float:
return distance_between_fnct(n1, n2)

def neighbors(self, node) -> Iterable[U]:
return neighbors_fnct(node) # type: ignore

def is_goal_reached(self, current: U, goal: U) -> bool:
return is_goal_reached_fnct(current, goal)

return FindPath().astar(start, goal, reversePath)
4 changes: 0 additions & 4 deletions GEMstack/onboard/planning/parking_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ def update(self, state: AllState):
# Calculate elapsed time since initialization.
elapsed_time = time.time() - self.start_time

# Reading goal from state
# print(f"\n AllState (parking goal): {state.goal} \n")
# print(f"AllState (parking obstacles): {state.obstacles} \n")

# After a goal is detected, change the mission plan to use PARKING.
if state.goal:
print("\n Parking goal available. Entering PARKING mode......")
Expand Down
Loading
Loading