diff --git a/GEMstack/knowledge/defaults/computation_graph.yaml b/GEMstack/knowledge/defaults/computation_graph.yaml index 505b8568..43516d4c 100644 --- a/GEMstack/knowledge/defaults/computation_graph.yaml +++ b/GEMstack/knowledge/defaults/computation_graph.yaml @@ -22,7 +22,7 @@ components: inputs: [vehicle, roadgraph] outputs: vehicle_lane - parking_detection: - inputs: obstacles + inputs: all outputs: [goal, obstacles] - sign_detection: inputs: [vehicle, roadgraph] diff --git a/GEMstack/knowledge/defaults/current.yaml b/GEMstack/knowledge/defaults/current.yaml index c886288b..149080f5 100644 --- a/GEMstack/knowledge/defaults/current.yaml +++ b/GEMstack/knowledge/defaults/current.yaml @@ -14,7 +14,7 @@ control: pure_pursuit: lookahead: 2.0 lookahead_scale: 3.0 - crosstrack_gain: 1.0 + crosstrack_gain: 0.5 desired_speed: trajectory longitudinal_control: pid_p: 1.0 diff --git a/GEMstack/onboard/perception/cone_detection.py b/GEMstack/onboard/perception/cone_detection.py index 42b74bb6..ab83d03d 100644 --- a/GEMstack/onboard/perception/cone_detection.py +++ b/GEMstack/onboard/perception/cone_detection.py @@ -506,4 +506,4 @@ def box_to_fake_obstacle(box): if __name__ == '__main__': - pass + pass \ No newline at end of file diff --git a/GEMstack/onboard/perception/parking_detection.py b/GEMstack/onboard/perception/parking_detection.py index 3c935b46..76a3c9e0 100644 --- a/GEMstack/onboard/perception/parking_detection.py +++ b/GEMstack/onboard/perception/parking_detection.py @@ -4,7 +4,7 @@ from sensor_msgs.msg import PointCloud2 from typing import Dict from ..component import Component -from ...state import ObjectPose, ObjectFrameEnum, Obstacle, ObstacleMaterialEnum +from ...state import ObjectPose, ObjectFrameEnum, Obstacle, ObstacleMaterialEnum, ObstacleStateEnum, AllState from ..interface.gem import GEMInterface from .utils.constants import * from .utils.math_utils import * @@ -77,7 +77,7 @@ def rate(self) -> float: return 10.0 # Hz def state_inputs(self) -> list: - return ['obstacles'] + return ['all'] def state_outputs(self) -> list: return ['goal', 'obstacles'] @@ -149,14 +149,14 @@ def visualize(self, self.pub_cones_centers_pc2.publish(ros_cones_centers_pc2) - def update(self, cone_obstacles: Dict[str, Obstacle]): + def update(self, state: AllState): # Initial variables parking_goals = [] best_parking_spots = [] parking_obstacles_poses = [] parking_obstacles_dims = [] grouped_ordered_ground_centers_2D = [] - + cone_obstacles = state.obstacles # Populate cone points cone_pts_3D = [] for cone in cone_obstacles.values(): @@ -231,7 +231,7 @@ def update(self, cone_obstacles: Dict[str, Obstacle]): yaw=yaw, pitch=0.0, roll=0.0, - frame=ObjectFrameEnum.CURRENT + frame=ObjectFrameEnum.START ) new_obstacle = Obstacle( pose=obstacle_pose, @@ -240,7 +240,7 @@ def update(self, cone_obstacles: Dict[str, Obstacle]): material=ObstacleMaterialEnum.BARRIER, collidable=True ) - parking_obstacles[obstacle_id] = new_obstacle + parking_obstacles[f"parking_obstacle_{obstacle_id}"] = new_obstacle obstacle_id += 1 # Constructing goal pose @@ -253,8 +253,19 @@ def update(self, cone_obstacles: Dict[str, Obstacle]): yaw=yaw, pitch=0.0, roll=0.0, - frame=ObjectFrameEnum.CURRENT + frame=ObjectFrameEnum.START ) + DISTANCE_THRESHOLD = 8.0 + if self.euclidean_distance((x,y), state) > DISTANCE_THRESHOLD: # we are not close enough to the parking spot + return None + new_state = [goal_pose, parking_obstacles] - return new_state \ No newline at end of file + return new_state + + def euclidean_distance(self, point, state): + x, y = point + vehicle_x = state.vehicle.pose.x + vehicle_y = state.vehicle.pose.y + distance = np.sqrt((x - vehicle_x) ** 2 + (y - vehicle_y) ** 2) + return distance \ No newline at end of file diff --git a/GEMstack/onboard/planning/astar.py b/GEMstack/onboard/planning/astar.py new file mode 100644 index 00000000..2704a450 --- /dev/null +++ b/GEMstack/onboard/planning/astar.py @@ -0,0 +1,267 @@ +""" generic A-Star path searching algorithm (https://github.com/jrialland/python-astar/blob/master/tests/basic/test_basic.py) """ +# @TODO make it so we return the best path as discussed in class +# In class, we discussed that we should have a terminal function +# which approximates the cost_to_go and returns the path to the node +# with the least cost_to_go + terminal_cost + +from abc import ABC, abstractmethod +from typing import Callable, Dict, Iterable, Union, TypeVar, Generic +from math import inf as infinity +from operator import attrgetter +import heapq + +# introduce generic type +T = TypeVar("T") + + +################################################################################ +class SearchNode(Generic[T]): + """Representation of a search node""" + + __slots__ = ("data", "gscore", "fscore", "tscore", "closed", "came_from", "in_openset", "cache") + + def __init__( + self, data: T, gscore: float = infinity, fscore: float = infinity, tscore:float = infinity + ) -> None: + self.data = data + self.gscore = gscore + self.fscore = fscore + self.tscore = tscore + self.closed = False + self.in_openset = False + self.came_from: Union[None, SearchNode[T]] = None + self.cache: Any = None + + def __lt__(self, b: "SearchNode[T]") -> bool: + """Natural order is based on the fscore value & is used by heapq operations""" + return self.fscore < b.fscore + + +################################################################################ +class SearchNodeDict(Dict[T, SearchNode[T]]): + """A dict that returns a new SearchNode when a key is missing""" + + def __missing__(self, k) -> SearchNode[T]: + v = SearchNode(k) + self.__setitem__(k, v) + return v + + +################################################################################ +SNType = TypeVar("SNType", bound=SearchNode) + + +class OpenSet(Generic[SNType]): + def __init__(self) -> None: + self.heap: list[SNType] = [] + + def push(self, item: SNType) -> None: + item.in_openset = True + heapq.heappush(self.heap, item) + + def pop(self) -> SNType: + item = heapq.heappop(self.heap) + item.in_openset = False + return item + + def remove(self, item: SNType) -> None: + idx = self.heap.index(item) + item.in_openset = False + item = self.heap.pop() + if idx < len(self.heap): + self.heap[idx] = item + # Fix heap invariants + heapq._siftup(self.heap, idx) + heapq._siftdown(self.heap, 0, idx) + + def __len__(self) -> int: + return len(self.heap) + + +################################################################################* + + +class AStar(ABC, Generic[T]): + __slots__ = () + + @abstractmethod + def heuristic_cost_estimate(self, current: T, goal: T) -> float: + """ + Computes the estimated (rough) distance between a node and the goal. + The second parameter is always the goal. + + This method must be implemented in a subclass. + """ + raise NotImplementedError + + @abstractmethod + def terminal_cost_estimate(self, current: T, goal: T) -> float: + """Computes the estimated distance between a node and the goal. + This function is called after all iterations of A* have been run + and is used to determine the closest node to the goal found so far. + + This method must be implemented in a subclass. + + Args: + current (T): Current T + goal (T): goal T + + Returns: + float: _description_ + """ + raise NotImplementedError + + def distance_between(self, n1: T, n2: T) -> float: + """ + Gives the real distance between two adjacent nodes n1 and n2 (i.e n2 + belongs to the list of n1's neighbors). + n2 is guaranteed to belong to the list returned by the call to neighbors(n1). + + This method (or "path_distance_between") must be implemented in a subclass. + """ + raise NotImplementedError + + def path_distance_between(self, n1: SearchNode[T], n2: SearchNode[T]) -> float: + """ + Gives the real distance between the node n1 and its neighbor n2. + n2 is guaranteed to belong to the list returned by the call to + path_neighbors(n1). + + Calls "distance_between"`by default. + """ + return self.distance_between(n1.data, n2.data) + + def neighbors(self, node: T) -> Iterable[T]: + """ + For a given node, returns (or yields) the list of its neighbors. + + This method (or "path_neighbors") must be implemented in a subclass. + """ + raise NotImplementedError + + def path_neighbors(self, node: SearchNode[T]) -> Iterable[T]: + """ + For a given node, returns (or yields) the list of its reachable neighbors. + Calls "neighbors" by default. + """ + return self.neighbors(node.data) + + def _neighbors(self, current: SearchNode[T], search_nodes: SearchNodeDict[T]) -> Iterable[SearchNode]: + return (search_nodes[n] for n in self.path_neighbors(current)) + + def is_goal_reached(self, current: T, goal: T) -> bool: + """ + Returns true when we can consider that 'current' is the goal. + The default implementation simply compares `current == goal`, but this + method can be overwritten in a subclass to provide more refined checks. + """ + return current == goal + + def reconstruct_path(self, last: SearchNode, reversePath=False) -> Iterable[T]: + def _gen(): + current = last + while current: + yield current.data + current = current.came_from + + if reversePath: + return _gen() + else: + return reversed(list(_gen())) + + def astar( + self, start: T, goal: T, reversePath: bool = False, iterations: int = 5000 + ) -> Union[Iterable[T], None]: + if self.is_goal_reached(start, goal): + return [start] + + openSet: OpenSet[SearchNode[T]] = OpenSet() + searchNodes: SearchNodeDict[T] = SearchNodeDict() + startNode = searchNodes[start] = SearchNode( + start, gscore=0.0, fscore=self.heuristic_cost_estimate(start, goal) + ) + openSet.push(startNode) + bestNode = startNode + + iteration = 0 + + while openSet and iteration < iterations: + current = openSet.pop() + + if self.is_goal_reached(current.data, goal): + return self.reconstruct_path(current, reversePath) + + current.closed = True + + for neighbor in self._neighbors(current, searchNodes): + if neighbor.closed: + continue + + gscore = current.gscore + self.path_distance_between(current, neighbor) + + if gscore >= neighbor.gscore: + continue + + fscore = gscore + self.heuristic_cost_estimate( + neighbor.data, goal + ) + tscore = self.terminal_cost_estimate( + neighbor.data, goal + ) + + # print(f"Checking node: {neighbor.data} with tscore {tscore}") + if tscore < bestNode.tscore: + # print(f"Found a better node: {neighbor.data} with tscore {tscore}") + bestNode = neighbor + + if neighbor.in_openset: + if neighbor.fscore < fscore: + # the new path to this node isn't better + continue + + # we have to remove the item from the heap, as its score has changed + openSet.remove(neighbor) + + # update the node + neighbor.came_from = current + neighbor.gscore = gscore + neighbor.fscore = fscore + neighbor.tscore = tscore + + openSet.push(neighbor) + + iteration += 1 + + # print("Warning: A* search failed to find a path") + return self.reconstruct_path(bestNode, reversePath) + + +################################################################################ +U = TypeVar("U") + + +def find_path( + start: U, + goal: U, + neighbors_fnct: Callable[[U], Iterable[U]], + reversePath=False, + heuristic_cost_estimate_fnct: Callable[[U, U], float] = lambda a, b: infinity, + distance_between_fnct: Callable[[U, U], float] = lambda a, b: 1.0, + is_goal_reached_fnct: Callable[[U, U], bool] = lambda a, b: a == b, +) -> Union[Iterable[U], None]: + """A non-class version of the path finding algorithm""" + + class FindPath(AStar): + def heuristic_cost_estimate(self, current: U, goal: U) -> float: + return heuristic_cost_estimate_fnct(current, goal) # type: ignore + + def distance_between(self, n1: U, n2: U) -> float: + return distance_between_fnct(n1, n2) + + def neighbors(self, node) -> Iterable[U]: + return neighbors_fnct(node) # type: ignore + + def is_goal_reached(self, current: U, goal: U) -> bool: + return is_goal_reached_fnct(current, goal) + + return FindPath().astar(start, goal, reversePath) diff --git a/GEMstack/onboard/planning/parking_component.py b/GEMstack/onboard/planning/parking_component.py index 6860fd35..df2e6ea9 100644 --- a/GEMstack/onboard/planning/parking_component.py +++ b/GEMstack/onboard/planning/parking_component.py @@ -24,10 +24,6 @@ def update(self, state: AllState): # Calculate elapsed time since initialization. elapsed_time = time.time() - self.start_time - # Reading goal from state - # print(f"\n AllState (parking goal): {state.goal} \n") - # print(f"AllState (parking obstacles): {state.obstacles} \n") - # After a goal is detected, change the mission plan to use PARKING. if state.goal: print("\n Parking goal available. Entering PARKING mode......") diff --git a/GEMstack/onboard/planning/parking_motion_planning.py b/GEMstack/onboard/planning/parking_motion_planning.py new file mode 100644 index 00000000..9a156d9a --- /dev/null +++ b/GEMstack/onboard/planning/parking_motion_planning.py @@ -0,0 +1,486 @@ +# File: GEMstack/onboard/planning/longitudinal_planning.py +from typing import List, Tuple +import math +from ..component import Component +from ...state import AllState, VehicleState, EntityRelation, EntityRelationEnum, Path, Trajectory, Route, \ + ObjectFrameEnum +from ...utils import serialization, settings +from ...mathutils import transforms +import numpy as np + +DEBUG = False # Set to False to disable debug output + + +def generate_dense_points(points: List[Tuple[float, float]], density: int = 10) -> List[Tuple[float, float]]: + if not points: + return [] + if len(points) == 1: + return points.copy() + + dense_points = [points[0]] + for i in range(len(points) - 1): + p0 = points[i] + p1 = points[i + 1] + dx = p1[0] - p0[0] + dy = p1[1] - p0[1] + seg_length = math.hypot(dx, dy) + + n_interp = int(round(seg_length * density)) + + for j in range(1, n_interp + 1): + fraction = j / (n_interp + 1) + x_interp = p0[0] + fraction * dx + y_interp = p0[1] + fraction * dy + dense_points.append((x_interp, y_interp)) + + dense_points.append(p1) + + return dense_points + + +def compute_cumulative_distances(points: List[List[float]]) -> List[float]: + s_vals = [0.0] + for i in range(1, len(points)): + dx = points[i][0] - points[i - 1][0] + dy = points[i][1] - points[i - 1][1] + ds = math.hypot(dx, dy) + s_vals.append(s_vals[-1] + ds) + + if DEBUG: + print("[DEBUG] compute_cumulative_distances: s_vals =", s_vals) + + return s_vals + + +def longitudinal_plan(path, acceleration, deceleration, max_speed, current_speed): + path_normalized = path.arc_length_parameterize() + points = list(path_normalized.points) + dense_points = generate_dense_points(points) + s_vals = compute_cumulative_distances(dense_points) + L = s_vals[-1] # Total path length + stopping_distance = (current_speed ** 2) / (2 * deceleration) + + if DEBUG: + print("[DEBUG] compute_cumulative_distances: s_vals =", s_vals) + print("[DEBUG] longitudinal_plan: Total path length L =", L) + print("[DEBUG] longitudinal_plan: Braking distance needed =", stopping_distance) + + if stopping_distance > L: # Case where there is not enough stopping distance to stop before path ends (calls emergency brake) + return longitudinal_brake(path, deceleration, current_speed) + + if current_speed > max_speed: # Case where car is exceeding the max speed so we need to slow down (do initial slowdown) + if DEBUG: + print(f"[DEBUG] Handling case where current_speed ({current_speed:.2f}) > max_speed ({max_speed:.2f})") + + # Initial deceleration phase to reach max_speed + initial_decel_distance = (current_speed ** 2 - max_speed ** 2) / (2 * deceleration) + initial_decel_time = (current_speed - max_speed) / deceleration + remaining_distance = L - initial_decel_distance + + if DEBUG: + print( + f"[DEBUG] Phase 1 - Initial Decel: distance = {initial_decel_distance:.2f}, time = {initial_decel_time:.2f}") + print(f"[DEBUG] Remaining distance after reaching max_speed: {remaining_distance:.2f}") + + # Calculate final deceleration distance needed to stop from max_speed + final_decel_distance = (max_speed ** 2) / (2 * deceleration) + cruise_distance = remaining_distance - final_decel_distance + + if DEBUG: + print(f"[DEBUG] Phase 2 - Cruise: distance = {cruise_distance:.2f}") + print(f"[DEBUG] Phase 3 - Final Decel: distance = {final_decel_distance:.2f}") + + times = [] + for s in s_vals: + if s <= initial_decel_distance: # Phase 1: Initial deceleration to max_speed + v = math.sqrt(current_speed ** 2 - 2 * deceleration * s) + t = (current_speed - v) / deceleration + if DEBUG: # Print every 10m + print(f"[DEBUG] Initial Decel: s = {s:.2f}, v = {v:.2f}, t = {t:.2f}") + + elif s <= initial_decel_distance + cruise_distance: # Phase 2: Cruise at max_speed + s_in_cruise = s - initial_decel_distance + t = initial_decel_time + s_in_cruise / max_speed + if DEBUG: # Print every 10m + print(f"[DEBUG] Cruise: s = {s:.2f}, v = {max_speed:.2f}, t = {t:.2f}") + + else: # Phase 3: Final deceleration to stop + s_in_final_decel = s - (initial_decel_distance + cruise_distance) + v = math.sqrt(max(max_speed ** 2 - 2 * deceleration * s_in_final_decel, 0.0)) + t = initial_decel_time + cruise_distance / max_speed + (max_speed - v) / deceleration + if DEBUG: # Print every 10m + print(f"[DEBUG] Final Decel: s = {s:.2f}, v = {v:.2f}, t = {t:.2f}") + + times.append(t) + + if DEBUG: + print("[DEBUG] Trajectory complete: Three phases executed") + print(f"[DEBUG] Total time: {times[-1]:.2f}") + + return Trajectory(frame=path.frame, points=dense_points, times=times) + + if acceleration <= 0: + if DEBUG: + print(f"[DEBUG] No acceleration allowed. Current speed: {current_speed:.2f}") + + # Pure deceleration phase + s_decel = (current_speed ** 2) / (2 * deceleration) + T_decel = current_speed / deceleration + + if DEBUG: + print(f"[DEBUG] Will maintain speed until s_decel: {s_decel:.2f}") + print(f"[DEBUG] Total deceleration time will be: {T_decel:.2f}") + + times = [] + for s in s_vals: + if s <= L - s_decel: # Maintain current speed until deceleration point + t_point = s / current_speed + if DEBUG: + print(f"[DEBUG] Constant Speed Phase: s = {s:.2f}, v = {current_speed:.2f}, t = {t_point:.2f}") + else: # Deceleration phase + s_in_decel = s - (L - s_decel) + v = math.sqrt(max(current_speed ** 2 - 2 * deceleration * s_in_decel, 0.0)) + t_point = (L - s_decel) / current_speed + (current_speed - v) / deceleration + if DEBUG: + print(f"[DEBUG] Deceleration Phase: s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") + + times.append(t_point) + + return Trajectory(frame=path.frame, points=dense_points, times=times) + + # Determine max possible peak speed given distance + v_peak_possible = math.sqrt( + (2 * acceleration * deceleration * L + deceleration * current_speed ** 2) / (acceleration + deceleration)) + v_target = min(max_speed, v_peak_possible) + + if DEBUG: + print("[DEBUG] longitudinal_plan: v_peak_possible =", v_peak_possible, "v_target =", v_target) + + # Compute acceleration phase + s_accel = max(0.0, (v_target ** 2 - current_speed ** 2) / (2 * acceleration)) + t_accel = max(0.0, (v_target - current_speed) / acceleration) + + # Compute deceleration phase + s_decel = max(0.0, (v_target ** 2) / (2 * deceleration)) + t_decel = max(0.0, v_target / deceleration) + + # Compute cruise phase + s_cruise = max(0.0, L - s_accel - s_decel) + t_cruise = s_cruise / v_target if v_target > 0 else 0.0 + + if DEBUG: + print("[DEBUG] longitudinal_plan: s_accel =", s_accel, "t_accel =", t_accel) + print("[DEBUG] longitudinal_plan: s_decel =", s_decel, "t_decel =", t_decel) + print("[DEBUG] longitudinal_plan: s_cruise =", s_cruise, "t_cruise =", t_cruise) + + times = [] + for s in s_vals: + if s <= s_accel: # Acceleration phase + v = math.sqrt(current_speed ** 2 + 2 * acceleration * s) + t_point = (v - current_speed) / acceleration + + if DEBUG: + print(f"[DEBUG] Acceleration Phase: s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") + + elif s <= s_accel + s_cruise: # Cruise phase + t_point = t_accel + (s - s_accel) / v_target + + if DEBUG: + print(f"[DEBUG] Cruise Phase: s = {s:.2f}, t = {t_point:.2f}") + + else: # Deceleration phase + s_decel_phase = s - s_accel - s_cruise + v_decel = math.sqrt(max(v_target ** 2 - 2 * deceleration * s_decel_phase, 0.0)) + t_point = t_accel + t_cruise + (v_target - v_decel) / deceleration + + if t_point < times[-1]: # Ensure time always increases + t_point = times[-1] + 0.01 # Small time correction step + + if DEBUG: + print(f"[DEBUG] Deceleration Phase: s = {s:.2f}, v = {v_decel:.2f}, t = {t_point:.2f}") + + times.append(t_point) + + if DEBUG: + print("[DEBUG] longitudinal_plan: Final times =", times) + + return Trajectory(frame=path.frame, points=dense_points, times=times) + + +def longitudinal_brake(path: Path, deceleration: float, current_speed: float, + emergency_decel: float = 8.0) -> Trajectory: + # Vehicle already stopped - maintain position + if current_speed <= 0: + print("[DEBUG] longitudinal_brake: Zero velocity case! ", [path.points[0]] * len(path.points)) + return Trajectory( + frame=path.frame, + points=[path.points[0]] * len(path.points), + times=[float(i) for i in range(len(path.points))] + ) + + # Get total path length + path_length = sum( + np.linalg.norm(np.array(path.points[i + 1]) - np.array(path.points[i])) + for i in range(len(path.points) - 1) + ) + + # Calculate stopping distance with normal deceleration + T_stop_normal = current_speed / deceleration + s_stop_normal = current_speed * T_stop_normal - 0.5 * deceleration * (T_stop_normal ** 2) + + # Check if emergency braking is needed + if s_stop_normal > path_length: + if DEBUG: + print("[DEBUG] longitudinal_brake: Emergency braking needed!") + print(f"[DEBUG] longitudinal_brake: Normal stopping distance: {s_stop_normal:.2f}m") + print(f"[DEBUG] longitudinal_brake: Available distance: {path_length:.2f}m") + + # Calculate emergency braking parameters + T_stop = current_speed / emergency_decel + s_stop = current_speed * T_stop - 0.5 * emergency_decel * (T_stop ** 2) + + if DEBUG: + print(f"[DEBUG] longitudinal_brake: Emergency stopping distance: {s_stop:.2f}m") + print(f"[DEBUG] longitudinal_brake: Emergency stopping time: {T_stop:.2f}s") + + decel_to_use = emergency_decel + + else: + if DEBUG: + print("[DEBUG] longitudinal_brake: Normal braking sufficient") + T_stop = T_stop_normal + decel_to_use = deceleration + + # Generate time points (use more points for smoother trajectory) + num_points = max(len(path.points), 50) + times = np.linspace(0, T_stop, num_points) + + # Calculate distances at each time point using physics equation + distances = current_speed * times - 0.5 * decel_to_use * (times ** 2) + + # Generate points along the path + points = [] + for d in distances: + if d <= path_length: + points.append(path.eval(d)) + else: + points.append(path.eval(d)) + + if DEBUG: + print(f"[DEBUG] longitudinal_brake: Using deceleration of {decel_to_use:.2f} m/s²") + print(f"[DEBUG] longitudinal_brake: Final stopping time: {T_stop:.2f}s") + + return Trajectory(frame=path.frame, points=points, times=times.tolist()) + + times = [] + for s in s_vals: + if s <= s_accel: # Acceleration phase + v = math.sqrt(current_speed ** 2 + 2 * acceleration * s) + t_point = (v - current_speed) / acceleration + + if DEBUG: + print(f"[DEBUG] Acceleration Phase: s = {s:.2f}, v = {v:.2f}, t = {t_point:.2f}") + + elif s <= s_accel + s_cruise: # Cruise phase + t_point = t_accel + (s - s_accel) / v_target + + if DEBUG: + print(f"[DEBUG] Cruise Phase: s = {s:.2f}, t = {t_point:.2f}") + + else: # Deceleration phase + s_decel_phase = s - s_accel - s_cruise + v_decel = math.sqrt(max(v_target ** 2 - 2 * deceleration * s_decel_phase, 0.0)) + t_point = t_accel + t_cruise + (v_target - v_decel) / deceleration + + if t_point < times[-1]: # Ensure time always increases + t_point = times[-1] + 0.01 # Small time correction step + + if DEBUG: + print(f"[DEBUG] Deceleration Phase: s = {s:.2f}, v = {v_decel:.2f}, t = {t_point:.2f}") + + times.append(t_point) + + if DEBUG: + print("[DEBUG] longitudinal_plan: Final times =", times) + + return Trajectory(frame=path.frame, points=dense_points, times=times) + +def longitudinal_brake(path: Path, deceleration: float, current_speed: float, emergency_decel: float = 8.0) -> Trajectory: + # Vehicle already stopped - maintain position + if current_speed <= 0: + print("[DEBUG] longitudinal_brake: Zero velocity case! ", [path.points[0]] * len(path.points)) + return Trajectory( + frame=path.frame, + points=[path.points[0]] * len(path.points), + times=[float(i) for i in range(len(path.points))] + ) + + # Get total path length + path_length = sum( + np.linalg.norm(np.array(path.points[i+1]) - np.array(path.points[i])) + for i in range(len(path.points)-1) + ) + + # Calculate stopping distance with normal deceleration + T_stop_normal = current_speed / deceleration + s_stop_normal = current_speed * T_stop_normal - 0.5 * deceleration * (T_stop_normal ** 2) + + # Check if emergency braking is needed + if s_stop_normal > path_length: + if DEBUG: + print("[DEBUG] longitudinal_brake: Emergency braking needed!") + print(f"[DEBUG] longitudinal_brake: Normal stopping distance: {s_stop_normal:.2f}m") + print(f"[DEBUG] longitudinal_brake: Available distance: {path_length:.2f}m") + + # Calculate emergency braking parameters + T_stop = current_speed / emergency_decel + s_stop = current_speed * T_stop - 0.5 * emergency_decel * (T_stop ** 2) + + if DEBUG: + print(f"[DEBUG] longitudinal_brake: Emergency stopping distance: {s_stop:.2f}m") + print(f"[DEBUG] longitudinal_brake: Emergency stopping time: {T_stop:.2f}s") + + decel_to_use = emergency_decel + + else: + if DEBUG: + print("[DEBUG] longitudinal_brake: Normal braking sufficient") + T_stop = T_stop_normal + decel_to_use = deceleration + + # Generate time points (use more points for smoother trajectory) + num_points = max(len(path.points), 50) + times = np.linspace(0, T_stop, num_points) + + # Calculate distances at each time point using physics equation + distances = current_speed * times - 0.5 * decel_to_use * (times ** 2) + + # Generate points along the path + points = [] + for d in distances: + if d <= path_length: + points.append(path.eval(d)) + else: + points.append(path.eval(d)) + + if DEBUG: + print(f"[DEBUG] longitudinal_brake: Using deceleration of {decel_to_use:.2f} m/s²") + print(f"[DEBUG] longitudinal_brake: Final stopping time: {T_stop:.2f}s") + + return Trajectory(frame=path.frame, points=points, times=times.tolist()) + +class YieldTrajectoryPlanner(Component): + """Follows the given route. Brakes if the ego–vehicle must yield + (e.g. to a pedestrian) or if the end of the route is near; otherwise, + it accelerates (or cruises) toward a desired speed. + """ + + def __init__(self): + self.route_progress = None + self.t_last = None + self.acceleration = settings.get("run.drive.planning.motion_planning.largs.acceleration", 5) + self.desired_speed = settings.get("run.drive.planning.motion_planning.largs.desired_speed",2.0) + self.deceleration = settings.get("run.drive.planning.motion_planning.largs.deceleration", 2.0) + self.emergency_brake = settings.get("run.drive.planning.motion_planning.largs.emergency_brake", 8.0) + + def state_inputs(self): + return ['all'] + + def state_outputs(self) -> List[str]: + return ['trajectory'] + + def rate(self): + return 10.0 + + def update(self, state: AllState): + vehicle = state.vehicle # type: VehicleState + route = state.route # type: Route + t = state.t + + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: t =", t) + + if self.t_last is None: + self.t_last = t + dt = t - self.t_last + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: dt =", dt) + + curr_x = vehicle.pose.x + curr_y = vehicle.pose.y + curr_v = vehicle.v + if DEBUG: + print(f"[DEBUG] YieldTrajectoryPlanner.update: Vehicle position = ({curr_x}, {curr_y}), speed = {curr_v}, ") + + # Determine progress along the route. + if self.route_progress is None: + self.route_progress = 0.0 + _, closest_parameter = route.closest_point_local( + [curr_x, curr_y], + (self.route_progress - 5.0, self.route_progress + 5.0) + ) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Closest parameter on route =", closest_parameter) + self.route_progress = closest_parameter + + # Extract a 10 m segment of the route for planning lookahead. + route_with_lookahead = route.trim(closest_parameter, closest_parameter + 10.0) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Route Lookahead =", route_with_lookahead) + + print("[DEBUG] state", state.relations) + # Check whether any yield relations (e.g. due to pedestrians) require braking. + stay_braking = False + pointSet = set() + for i in range(len(route_with_lookahead.points)): + if tuple(route_with_lookahead.points[i]) in pointSet: + stay_braking = True + break + pointSet.add(tuple(route_with_lookahead.points[i])) + + should_brake = any( + r.type == EntityRelationEnum.STOPPING_AT and r.obj1 == '' + for r in state.relations + ) + should_decelerate = any( + r.type == EntityRelationEnum.YIELDING and r.obj1 == '' + for r in state.relations + ) if should_brake == False else False + + should_accelerate = (not should_brake and not should_decelerate and curr_v < self.desired_speed) + + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: stay_braking =", stay_braking) + print("[DEBUG] YieldTrajectoryPlanner.update: should_brake =", should_brake) + print("[DEBUG] YieldTrajectoryPlanner.update: should_accelerate =", should_accelerate) + print("[DEBUG] YieldTrajectoryPlanner.update: should_decelerate =", should_decelerate) + + if stay_braking: + traj = longitudinal_brake(route_with_lookahead, 0.0, 0.0, 0.0) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_brake (stay braking).") + elif should_brake: + traj = longitudinal_brake(route_with_lookahead, self.emergency_brake, curr_v) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_brake.") + elif should_decelerate: + traj = longitudinal_brake(route_with_lookahead, self.deceleration, curr_v) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_brake.") + elif should_accelerate: + traj = longitudinal_plan(route_with_lookahead, self.acceleration, + self.deceleration, self.desired_speed, curr_v) + if DEBUG: + print("[DEBUG] YieldTrajectoryPlanner.update: Using longitudinal_plan (accelerate).") + else: + # Maintain current speed if not accelerating or braking. + traj = longitudinal_plan(route_with_lookahead, 0.0, self.deceleration, self.desired_speed, curr_v) + if DEBUG: + print( + "[DEBUG] YieldTrajectoryPlanner.update: Maintaining current speed with longitudinal_plan (0 accel).") + + self.t_last = t + if DEBUG: + print('[DEBUG] Current Velocity of the Car: LOOK!', curr_v, self.desired_speed) + print("[DEBUG] YieldTrajectoryPlanner.update: Returning trajectory with", len(traj.points), "points.") + return traj diff --git a/GEMstack/onboard/planning/parking_route_planner.py b/GEMstack/onboard/planning/parking_route_planner.py new file mode 100644 index 00000000..b8bde391 --- /dev/null +++ b/GEMstack/onboard/planning/parking_route_planner.py @@ -0,0 +1,359 @@ +from typing import List, Tuple, Union, Dict +from ..component import Component +from ...state import AllState, VehicleState, Path, Trajectory, Route, ObjectFrameEnum, AgentState, Obstacle, ObjectPose, PhysicalObject +from ...utils import serialization, settings +from ...mathutils.transforms import vector_madd +from GEMstack.mathutils.dubins import DubinsCar, SecondOrderDubinsCar, DubinsCarIntegrator +from ...mathutils.dynamics import IntegratorControlSpace +from ...mathutils import collisions +from .astar import AStar +from .longitudinal_planning import longitudinal_plan +from . import reed_shepp +from visualization_msgs.msg import Marker +from geometry_msgs.msg import Point +import rospy +import time + + + +import numpy as np +import math + + +def generate_action_set(max_vel, max_turn_rate): + return [ + (max_vel, -max_turn_rate), (max_vel, 0.0), (max_vel, max_turn_rate), + (-max_vel, -max_turn_rate), (-max_vel, 0.0), (-max_vel, max_turn_rate) + ] +class ParkingSolverFirstOrderDubins(AStar): + """sample use of the astar algorithm. In this exemple we work on a maze made of ascii characters, + and a 'node' is just a (x,y) tuple that represents a reachable position""" + + def __init__(self, vehicle=None, obstacles=None, actions=None): + self._obstacles = obstacles + + # Vehicle model + self._vehicle = None + + # settings + self.T = settings.get("run.drive.planning._route_planner.dubins.integrator.time_step", 1.5) + self.dt = settings.get("run.drive.planning._route_planner.dubins.integrator.dt", .25) + self.max_v = settings.get("run.drive.planning._route_planner.dubins.actions.max_velocity", 0.75) + self.max_turn_rate = settings.get("run.drive.planning._route_planner.dubins.actions.max_turn_rate", 0.3) + self.tolerance = settings.get("run.drive.planning._route_planner.dubins.tolerance", 0.5) + self.heuristic_string = settings.get("run.drive.planning._route_planner.astar.heuristic", "reeds_shepp") + + self.vehicle = DubinsCar() #x = (tx,ty,theta) and u = (fwd_velocity,turnRate). + self.vehicle_sim = DubinsCarIntegrator(self.vehicle, self.T, self.dt) + self._actions = generate_action_set(self.max_v, self.max_turn_rate) + + + @property + def vehicle(self): + return self._vehicle + + @vehicle.setter + def vehicle(self, vehicle): + self._vehicle = vehicle + + @property + def obstacles(self): + return self._obstacles + + @obstacles.setter + def obstacles(self, obstacles): + self._obstacles = obstacles + + @property + def actions(self): + return self._actions + + @actions.setter + def actions(self, actions): + # @TODO Add a validity checker for the actions with respect to the vehicle constraints + self._actions = actions + + def is_goal_reached(self, current: List[float], goal: List[float]): + return np.linalg.norm(np.array([current[0], current[1]]) - np.array([goal[0], goal[1]])) < 0.5 + + def heuristic_cost_estimate(self, state_1, state_2): + """run the correct heuristic function based on the heuristic_string""" + + if self.heuristic_string == "reeds_shepp": + return self.reed_shepp(state_1, state_2) # Using turning radius of 1.0 + + elif self.heuristic_string == "approx_reeds_shepp": + return self.approx_reeds_shepp(state_1, state_2) # Using turning radius of 1.0 + + elif self.heuristic_string == "euclidian": + return self.euclidian_cost_esimate(state_1, state_2) + + else: + raise Exception(f"Unknown heuristic function: {self.heuristic_string}") + + def euclidian_cost_esimate(self, state_1, state_2): + """computes the 'direct' distance between two (x,y) tuples""" + # Extract position and orientation from states + (x1, y1, theta1, t) = state_1 + (x2, y2, theta2, t) = state_2 + + return math.hypot(x2 - x1, y2 - y1, theta2 - theta1) + + def approx_reeds_shepp(self, state_1, state_2): + # Extract position and orientation from states + (x1, y1, theta1, t1) = state_1 + (x2, y2, theta2, t2) = state_2 + + # Create start and goal configurations for Reeds-Shepp + start = (x1, y1, theta1) + goal = (x2, y2, theta2) + + # Calculate Reeds-Shepp path length + path_length_cost = path_length(start, goal, 1.0) # Using turning radius of 1.0 + + # # Add penalties for velocity and time differences + # velocity_penalty = abs(v2 - v1) * 0.1 + # time_penalty = abs(t2 - t1) * 0.01 + + return path_length_cost # + velocity_penalty + time_penalty + + def reed_shepp(self, state_1, state_2): + # Extract position and orientation from states + (x1, y1, theta1, t1) = state_1 + (x2, y2, theta2, t2) = state_2 + + # Create start and goal configurations for Reeds-Shepp + start = (x1, y1, theta1) + goal = (x2, y2, theta2) + + # Calculate Reeds-Shepp path length + path = reed_shepp.get_optimal_path(start, goal) + + return reed_shepp.path_length(path) # Using turning radius of 1.0 + + def terminal_cost_estimate(self, state_1, state_2): + """computes the 'direct' distance between two (x,y) tuples""" + # Extract position and orientation from states + return self.reed_shepp(state_1, state_2) + return self.approx_reeds_shepp(state_1, state_2) + + def distance_between(self, state_1, state_2): + """ + The states here are (x,y,theta,v,dtheta,t) + """ + (x1, y1, theta1, t) = state_1 + (x2, y2, theta2, t) = state_2 + + return math.hypot(x2 - x1, y2 - y1) + + def neighbors(self, node): + """ for a given configuration of the car in the maze, returns up to 4 adjacent(north,east,south,west) + nodes that can be reached (=any adjacent coordinate that is not a wall) + """ + print(f"[DEBUG] neighbors() called on node {node}") + neighbors = [] + # print(f"Node: {node}") + for control in self.actions: + next_state = self.vehicle_sim.nextState(node[:3], control) + # print(f"next state: {next_state}") + next_state = np.append(next_state, node[3] + self.vehicle_sim.T) + next_state = np.round(next_state, 3) + if self.is_valid_neighbor([next_state]): + # print(f"Accepted: {next_state}") + neighbors.append(tuple(next_state)) + else: + print(f"Rejected first order (collision): {next_state}") + return neighbors + + def is_valid_neighbor(self, path): + """check if any points along the path are in collision + with any of the known obstacles + @TODO We are not currently using the geometry of the vehicle, + define a geometry for the vehicle and then use polygon_itersects_polygon + @TODO Consider performing a linear search on the trajectory + and checking for collission on each of these configurations + + Args: + path (_type_): _description_ + """ + for obstacle in self.obstacles: + # print(f"Obstacle: {obstacle}") + for point in path: + vehicle_object = self.state_to_object(point) + if collisions.polygon_intersects_polygon_2d(vehicle_object.polygon_parent(), obstacle.polygon_parent()): + # print("Collision detected") + return False + return True + + def state_to_object(self, state): + """Convert a state to a polygon. The state is of the form (x,y,theta,v,dtheta,t) + + Args: + state (_type_): _description_ + + Returns: + _type_: _description_ + """ + x = state[0] + y = state[1] + theta = state[2] + t = state[3] + + pose = ObjectPose(frame=ObjectFrameEnum.CURRENT,t=t, x=x,y=y,z=0,yaw=theta) + + temp_obj = PhysicalObject(pose=pose, + dimensions=self.vehicle.to_object().dimensions, + outline=self.vehicle.to_object().outline) + + return temp_obj + + +class ParkingPlanner(): + """_summary_ + + Args: + Component (_type_): _description_ + """ + def __init__(self): + + self.planner = ParkingSolverFirstOrderDubins() + + self.iterations = settings.get("run.drive.planning._route_planner.astar.iterations", 200) + + def state_inputs(self): + return ['all'] + + def state_outputs(self) -> List[str]: + return ['route'] + + def rate(self): + return 1.0 + + def vehicle_state_to_second_order(self, vehicle_state: VehicleState) -> Tuple[float, float]: + """Takes a vehicle state and outputs the state of a second order dubins car + + Args: + vehicle_state (VehicleState): _description_ + + Returns: + Tuple[float, float]: _description_ + """ + vehicle_state.pose = vehicle_state.pose.to_frame() + x = vehicle_state.pose.x + y = vehicle_state.pose.y + theta = vehicle_state.pose.yaw # check that this is correct + v = vehicle_state.v + dtheta = vehicle_state.heading_rate + t = 0 + + return (x,y,theta,v,dtheta,t) + + def vehicle_state_to_first_order(self, vehicle_state: VehicleState) -> Tuple[float, float]: + """Takes a vehicle state and outputs the state of a second order dubins car + + Args: + vehicle_state (VehicleState): _description_ + + Returns: + Tuple[float, float]: _description_ + """ + x = vehicle_state.pose.x + y = vehicle_state.pose.y + theta = vehicle_state.pose.yaw # check that this is correct + v = vehicle_state.v + dtheta = vehicle_state.heading_rate + t = 0 + + return (x,y,theta,t) + + def pose_to_first_order(self, pose: ObjectPose) -> Tuple[float, float]: + """Takes a vehicle state and outputs the state of a second order dubins car + + Args: + vehicle_state (VehicleState): _description_ + + Returns: + Tuple[float, float]: _description_ + """ + x = pose.x + y = pose.y + theta = pose.yaw # check that this is correct + t = 0 + + return (x,y,theta,t) + + def update(self, state : AllState) -> Route: + """_summary_ + + Args: + state (AllState): _description_ + + Returns: + Trajectory: _description_ + """ + vehicle = state.vehicle # type: VehicleState + obstacles = state.obstacles # type: Dict[str, Obstacle] + agents = state.agents # type: Dict[str, AgentState] + all_obstacles = {**agents, **obstacles} + goal_pose = state.goal + assert goal_pose.frame == ObjectFrameEnum.START + + start_state = state.vehicle.to_frame(ObjectFrameEnum.START, current_pose = state.vehicle.pose,start_pose_abs = state.start_vehicle_pose) + + + start = self.vehicle_state_to_first_order(start_state) + goal = self.pose_to_first_order(goal_pose) + + # Update the planner + self.planner.obstacles = list(all_obstacles.values()) + self.planner.vehicle = vehicle + + # Compute the new trajectory and return it + print(f"Iteartions ------------------------------ {self.iterations}") + res = list(self.planner.astar(start, goal, reversePath=False, iterations=self.iterations)) + # points = [state[:2] for state in res] # change here to return the theta as well + points = [] + for state in res: + points.append((state[0], state[1])) + times = [state[3] for state in res] + path = Path(frame=vehicle.pose.frame, points=points) + + route = Path(frame=vehicle.pose.frame, points=points) + return route + + + from rospy.exceptions import ROSInitException + + def create_trajectory_line_marker(self, traj: Trajectory, frame_id="map", marker_id=0, color=(0.0, 0.0, 1.0, 1.0)) -> Marker: + marker = Marker() + marker.header.frame_id = frame_id + + try: + marker.header.stamp = rospy.Time.now() + except ROSInitException: + # fallback to zero time (safe default in RViz) + marker.header.stamp = rospy.Time(0) + + marker.ns = "trajectory" + marker.id = marker_id + marker.type = Marker.LINE_STRIP + marker.action = Marker.ADD + + marker.scale.x = 0.1 # Line width + marker.color.r, marker.color.g, marker.color.b, marker.color.a = color + + for pt in traj.points: + p = Point(x=pt[0], y=pt[1], z=pt[2] if len(pt) > 2 else 0.0) + marker.points.append(p) + + return marker + + + def visualize_trajectory(self, traj: Trajectory, frame_id="vehicle"): + + self.marker_pub = rospy.Publisher("trajectory_markers", Marker, queue_size=1, latch=True) + rospy.sleep(0.5) # ensure publisher is ready + + line_marker = self.create_trajectory_line_marker(traj, frame_id=frame_id) + self.marker_pub.publish(line_marker) + diff --git a/GEMstack/onboard/planning/pure_pursuit.py b/GEMstack/onboard/planning/pure_pursuit.py index e4b6f4f4..e4cb0f48 100644 --- a/GEMstack/onboard/planning/pure_pursuit.py +++ b/GEMstack/onboard/planning/pure_pursuit.py @@ -2,7 +2,7 @@ from ...utils import settings from ...mathutils import transforms from ...knowledge.vehicle.dynamics import acceleration_to_pedal_positions -from ...state.vehicle import VehicleState,ObjectFrameEnum +from ...state.vehicle import VehicleState,ObjectFrameEnum, VehicleGearEnum from ...state.trajectory import Path,Trajectory,compute_headings from ...knowledge.vehicle.geometry import front2steer from ..interface.gem import GEMVehicleCommand @@ -105,9 +105,13 @@ def compute(self, state : VehicleState, component : Component = None): else: desired_x,desired_y = self.path.eval(des_parameter) desired_yaw = np.arctan2(desired_y-curr_y,desired_x-curr_x) - #print("Desired point",(desired_x,desired_y)," with lookahead distance",self.look_ahead + self.look_ahead_scale * speed) - #print("Current yaw",curr_yaw,"desired yaw",desired_yaw) - + + print("Desired point",(desired_x,desired_y)," with lookahead distance",self.look_ahead + self.look_ahead_scale * speed) + print("Current x",curr_x,"Current y",curr_y) + print("Current yaw",curr_yaw,"Desired yaw",desired_yaw) + print("Parameter : ", self.current_path_parameter) + print("Last index : ", self.path.domain()[1]) + # distance between the desired point and the vehicle L = transforms.vector2_dist((desired_x,desired_y),(curr_x,curr_y)) @@ -142,6 +146,7 @@ def compute(self, state : VehicleState, component : Component = None): #past the end, just stop desired_speed = 0.0 feedforward_accel = -2.0 + f_delta = 0 else: if self.desired_speed_source=='path': current_trajectory_time = self.trajectory.parameter_to_time(self.current_path_parameter) @@ -165,6 +170,17 @@ def compute(self, state : VehicleState, component : Component = None): else: #decay speed when crosstrack error is high desired_speed *= np.exp(-abs(ct_error)*0.4) + + if len(self.trajectory.points) < 2 or self.current_path_parameter >= self.path.domain()[1]: + if component is not None: + component.debug_event('Past the end of trajectory') + #past the end, just stop + desired_speed = 0.0 + feedforward_accel = -2.0 + f_delta = 0 + + + if desired_speed > self.speed_limit: desired_speed = self.speed_limit output_accel = self.pid_speed.advance(e = desired_speed - speed, t = t, feedforward_term=feedforward_accel) @@ -205,6 +221,7 @@ def state_outputs(self): return [] def update(self, vehicle : VehicleState, trajectory: Trajectory): + vehicle.gear = 1 self.pure_pursuit.set_path(trajectory) accel,wheel_angle = self.pure_pursuit.compute(vehicle, self) #print("Desired wheel angle",wheel_angle) diff --git a/GEMstack/onboard/planning/reed_shepp.py b/GEMstack/onboard/planning/reed_shepp.py new file mode 100644 index 00000000..8d5496e6 --- /dev/null +++ b/GEMstack/onboard/planning/reed_shepp.py @@ -0,0 +1,473 @@ +""" +https://github.com/nathanlct/reeds-shepp-curves + +Implementation of the optimal path formulas given in the following paper: + +OPTIMAL PATHS FOR A CAR THAT GOES BOTH FORWARDS AND BACKWARDS +J. A. REEDS AND L. A. SHEPP + +notes: there are some typos in the formulas given in the paper; +some formulas have been adapted (cf http://msl.cs.uiuc.edu/~lavalle/cs326a/rs.c) + +Each of the 12 functions (each representing 4 of the 48 possible words) +have 3 arguments x, y and phi, the goal position and angle (in degrees) of the +object given it starts at position (0, 0) and angle 0, and returns the +corresponding path (if it exists) as a list of PathElements (or an empty list). + +(actually there are less than 48 possible words but this code is not optimized) + +MIT License + +Copyright (c) 2019 Nathan Lichtlé + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +""" + +import math +from enum import Enum +from dataclasses import dataclass, replace + +def M(theta): + """ + Return the angle phi = theta mod (2 pi) such that -pi <= theta < pi. + """ + theta = theta % (2*math.pi) + if theta < -math.pi: return theta + 2*math.pi + if theta >= math.pi: return theta - 2*math.pi + return theta + +def R(x, y): + """ + Return the polar coordinates (r, theta) of the point (x, y). + """ + r = math.sqrt(x*x + y*y) + theta = math.atan2(y, x) + return r, theta + +def change_of_basis(p1, p2): + """ + Given p1 = (x1, y1, theta1) and p2 = (x2, y2, theta2) represented in a + coordinate system with origin (0, 0) and rotation 0 (in degrees), return + the position and rotation of p2 in the coordinate system which origin + (x1, y1) and rotation theta1. + """ + theta1 = deg2rad(p1[2]) + dx = p2[0] - p1[0] + dy = p2[1] - p1[1] + new_x = dx * math.cos(theta1) + dy * math.sin(theta1) + new_y = -dx * math.sin(theta1) + dy * math.cos(theta1) + new_theta = p2[2] - p1[2] + return new_x, new_y, new_theta + +def rad2deg(rad): + return 180 * rad / math.pi + +def deg2rad(deg): + return math.pi * deg / 180 + +def sign(x): + return 1 if x >= 0 else -1 + +class Steering(Enum): + LEFT = -1 + RIGHT = 1 + STRAIGHT = 0 + + +class Gear(Enum): + FORWARD = 1 + BACKWARD = -1 + + +@dataclass(eq=True) +class PathElement: + param: float + steering: Steering + gear: Gear + + @classmethod + def create(cls, param: float, steering: Steering, gear: Gear): + if param >= 0: + return cls(param, steering, gear) + else: + return cls(-param, steering, gear).reverse_gear() + + def __repr__(self): + s = "{ Steering: " + self.steering.name + "\tGear: " + self.gear.name \ + + "\tdistance: " + str(round(self.param, 2)) + " }" + return s + + def reverse_steering(self): + steering = Steering(-self.steering.value) + return replace(self, steering=steering) + + def reverse_gear(self): + gear = Gear(-self.gear.value) + return replace(self, gear=gear) + + +def path_length(path): + """ + this one's obvious + """ + return sum([e.param for e in path]) + + +def get_optimal_path(start, end): + """ + Return the shortest path from start to end among those that exist + """ + paths = get_all_paths(start, end) + return min(paths, key=path_length) + + +def get_all_paths(start, end): + """ + Return a list of all the paths from start to end generated by the + 12 functions and their variants + """ + path_fns = [path1, path2, path3, path4, path5, path6, \ + path7, path8, path9, path10, path11, path12] + paths = [] + + # get coordinates of end in the set of axis where start is (0,0,0) + x, y, theta = change_of_basis(start, end) + + for get_path in path_fns: + # get the four variants for each path type, cf article + paths.append(get_path(x, y, theta)) + paths.append(timeflip(get_path(-x, y, -theta))) + paths.append(reflect(get_path(x, -y, -theta))) + paths.append(reflect(timeflip(get_path(-x, -y, theta)))) + + # remove path elements that have parameter 0 + for i in range(len(paths)): + paths[i] = list(filter(lambda e: e.param != 0, paths[i])) + + # remove empty paths + paths = list(filter(None, paths)) + + return paths + + +def timeflip(path): + """ + timeflip transform described around the end of the article + """ + new_path = [e.reverse_gear() for e in path] + return new_path + + +def reflect(path): + """ + reflect transform described around the end of the article + """ + new_path = [e.reverse_steering() for e in path] + return new_path + + +def path1(x, y, phi): + """ + Formula 8.1: CSC (same turns) + """ + phi = deg2rad(phi) + path = [] + + u, t = R(x - math.sin(phi), y - 1 + math.cos(phi)) + v = M(phi - t) + + path.append(PathElement.create(t, Steering.LEFT, Gear.FORWARD)) + path.append(PathElement.create(u, Steering.STRAIGHT, Gear.FORWARD)) + path.append(PathElement.create(v, Steering.LEFT, Gear.FORWARD)) + + return path + + +def path2(x, y, phi): + """ + Formula 8.2: CSC (opposite turns) + """ + phi = M(deg2rad(phi)) + path = [] + + rho, t1 = R(x + math.sin(phi), y - 1 - math.cos(phi)) + + if rho * rho >= 4: + u = math.sqrt(rho * rho - 4) + t = M(t1 + math.atan2(2, u)) + v = M(t - phi) + + path.append(PathElement.create(t, Steering.LEFT, Gear.FORWARD)) + path.append(PathElement.create(u, Steering.STRAIGHT, Gear.FORWARD)) + path.append(PathElement.create(v, Steering.RIGHT, Gear.FORWARD)) + + return path + + +def path3(x, y, phi): + """ + Formula 8.3: C|C|C + """ + phi = deg2rad(phi) + path = [] + + xi = x - math.sin(phi) + eta = y - 1 + math.cos(phi) + rho, theta = R(xi, eta) + + if rho <= 4: + A = math.acos(rho / 4) + t = M(theta + math.pi/2 + A) + u = M(math.pi - 2*A) + v = M(phi - t - u) + + path.append(PathElement.create(t, Steering.LEFT, Gear.FORWARD)) + path.append(PathElement.create(u, Steering.RIGHT, Gear.BACKWARD)) + path.append(PathElement.create(v, Steering.LEFT, Gear.FORWARD)) + + return path + + +def path4(x, y, phi): + """ + Formula 8.4 (1): C|CC + """ + phi = deg2rad(phi) + path = [] + + xi = x - math.sin(phi) + eta = y - 1 + math.cos(phi) + rho, theta = R(xi, eta) + + if rho <= 4: + A = math.acos(rho / 4) + t = M(theta + math.pi/2 + A) + u = M(math.pi - 2*A) + v = M(t + u - phi) + + path.append(PathElement.create(t, Steering.LEFT, Gear.FORWARD)) + path.append(PathElement.create(u, Steering.RIGHT, Gear.BACKWARD)) + path.append(PathElement.create(v, Steering.LEFT, Gear.BACKWARD)) + + return path + + +def path5(x, y, phi): + """ + Formula 8.4 (2): CC|C + """ + phi = deg2rad(phi) + path = [] + + xi = x - math.sin(phi) + eta = y - 1 + math.cos(phi) + rho, theta = R(xi, eta) + + if rho <= 4: + u = math.acos(1 - rho*rho/8) + A = math.asin(2 * math.sin(u) / rho) + t = M(theta + math.pi/2 - A) + v = M(t - u - phi) + + path.append(PathElement.create(t, Steering.LEFT, Gear.FORWARD)) + path.append(PathElement.create(u, Steering.RIGHT, Gear.FORWARD)) + path.append(PathElement.create(v, Steering.LEFT, Gear.BACKWARD)) + + return path + + +def path6(x, y, phi): + """ + Formula 8.7: CCu|CuC + """ + phi = deg2rad(phi) + path = [] + + xi = x + math.sin(phi) + eta = y - 1 - math.cos(phi) + rho, theta = R(xi, eta) + + if rho <= 4: + if rho <= 2: + A = math.acos((rho + 2) / 4) + t = M(theta + math.pi/2 + A) + u = M(A) + v = M(phi - t + 2*u) + else: + A = math.acos((rho - 2) / 4) + t = M(theta + math.pi/2 - A) + u = M(math.pi - A) + v = M(phi - t + 2*u) + + path.append(PathElement.create(t, Steering.LEFT, Gear.FORWARD)) + path.append(PathElement.create(u, Steering.RIGHT, Gear.FORWARD)) + path.append(PathElement.create(u, Steering.LEFT, Gear.BACKWARD)) + path.append(PathElement.create(v, Steering.RIGHT, Gear.BACKWARD)) + + return path + + +def path7(x, y, phi): + """ + Formula 8.8: C|CuCu|C + """ + phi = deg2rad(phi) + path = [] + + xi = x + math.sin(phi) + eta = y - 1 - math.cos(phi) + rho, theta = R(xi, eta) + u1 = (20 - rho*rho) / 16 + + if rho <= 6 and 0 <= u1 <= 1: + u = math.acos(u1) + A = math.asin(2 * math.sin(u) / rho) + t = M(theta + math.pi/2 + A) + v = M(t - phi) + + path.append(PathElement.create(t, Steering.LEFT, Gear.FORWARD)) + path.append(PathElement.create(u, Steering.RIGHT, Gear.BACKWARD)) + path.append(PathElement.create(u, Steering.LEFT, Gear.BACKWARD)) + path.append(PathElement.create(v, Steering.RIGHT, Gear.FORWARD)) + + return path + + +def path8(x, y, phi): + """ + Formula 8.9 (1): C|C[pi/2]SC + """ + phi = deg2rad(phi) + path = [] + + xi = x - math.sin(phi) + eta = y - 1 + math.cos(phi) + rho, theta = R(xi, eta) + + if rho >= 2: + u = math.sqrt(rho*rho - 4) - 2 + A = math.atan2(2, u+2) + t = M(theta + math.pi/2 + A) + v = M(t - phi + math.pi/2) + + path.append(PathElement.create(t, Steering.LEFT, Gear.FORWARD)) + path.append(PathElement.create(math.pi/2, Steering.RIGHT, Gear.BACKWARD)) + path.append(PathElement.create(u, Steering.STRAIGHT, Gear.BACKWARD)) + path.append(PathElement.create(v, Steering.LEFT, Gear.BACKWARD)) + + return path + + +def path9(x, y, phi): + """ + Formula 8.9 (2): CSC[pi/2]|C + """ + phi = deg2rad(phi) + path = [] + + xi = x - math.sin(phi) + eta = y - 1 + math.cos(phi) + rho, theta = R(xi, eta) + + if rho >= 2: + u = math.sqrt(rho*rho - 4) - 2 + A = math.atan2(u+2, 2) + t = M(theta + math.pi/2 - A) + v = M(t - phi - math.pi/2) + + path.append(PathElement.create(t, Steering.LEFT, Gear.FORWARD)) + path.append(PathElement.create(u, Steering.STRAIGHT, Gear.FORWARD)) + path.append(PathElement.create(math.pi/2, Steering.RIGHT, Gear.FORWARD)) + path.append(PathElement.create(v, Steering.LEFT, Gear.BACKWARD)) + + return path + + +def path10(x, y, phi): + """ + Formula 8.10 (1): C|C[pi/2]SC + """ + phi = deg2rad(phi) + path = [] + + xi = x + math.sin(phi) + eta = y - 1 - math.cos(phi) + rho, theta = R(xi, eta) + + if rho >= 2: + t = M(theta + math.pi/2) + u = rho - 2 + v = M(phi - t - math.pi/2) + + path.append(PathElement.create(t, Steering.LEFT, Gear.FORWARD)) + path.append(PathElement.create(math.pi/2, Steering.RIGHT, Gear.BACKWARD)) + path.append(PathElement.create(u, Steering.STRAIGHT, Gear.BACKWARD)) + path.append(PathElement.create(v, Steering.RIGHT, Gear.BACKWARD)) + + return path + + +def path11(x, y, phi): + """ + Formula 8.10 (2): CSC[pi/2]|C + """ + phi = deg2rad(phi) + path = [] + + xi = x + math.sin(phi) + eta = y - 1 - math.cos(phi) + rho, theta = R(xi, eta) + + if rho >= 2: + t = M(theta) + u = rho - 2 + v = M(phi - t - math.pi/2) + + path.append(PathElement.create(t, Steering.LEFT, Gear.FORWARD)) + path.append(PathElement.create(u, Steering.STRAIGHT, Gear.FORWARD)) + path.append(PathElement.create(math.pi/2, Steering.LEFT, Gear.FORWARD)) + path.append(PathElement.create(v, Steering.RIGHT, Gear.BACKWARD)) + + return path + + +def path12(x, y, phi): + """ + Formula 8.11: C|C[pi/2]SC[pi/2]|C + """ + phi = deg2rad(phi) + path = [] + + xi = x + math.sin(phi) + eta = y - 1 - math.cos(phi) + rho, theta = R(xi, eta) + + if rho >= 4: + u = math.sqrt(rho*rho - 4) - 4 + A = math.atan2(2, u+4) + t = M(theta + math.pi/2 + A) + v = M(t - phi) + + path.append(PathElement.create(t, Steering.LEFT, Gear.FORWARD)) + path.append(PathElement.create(math.pi/2, Steering.RIGHT, Gear.BACKWARD)) + path.append(PathElement.create(u, Steering.STRAIGHT, Gear.BACKWARD)) + path.append(PathElement.create(math.pi/2, Steering.LEFT, Gear.BACKWARD)) + path.append(PathElement.create(v, Steering.RIGHT, Gear.FORWARD)) + + return path \ No newline at end of file diff --git a/GEMstack/onboard/planning/route_planning_component.py b/GEMstack/onboard/planning/route_planning_component.py index 9f9ab2da..ef3bf5c0 100644 --- a/GEMstack/onboard/planning/route_planning_component.py +++ b/GEMstack/onboard/planning/route_planning_component.py @@ -7,11 +7,14 @@ from GEMstack.state.mission import MissionEnum from GEMstack.state.all import AllState from GEMstack.state.physical_object import ObjectFrameEnum, ObjectPose -from GEMstack.state.route import PlannerEnum, Route +from GEMstack.state.route import PlannerEnum, Route, Path +from .parking_route_planner import ParkingPlanner +from .longitudinal_planning import longitudinal_plan + from GEMstack.state.vehicle import VehicleState from GEMstack.state.intent import VehicleIntentEnum -from .planner import optimized_kinodynamic_rrt_planning -from .map_utils import load_pgm_to_occupancy_grid +# from .planner import optimized_kinodynamic_rrt_planning +# from .map_utils import load_pgm_to_occupancy_grid from .rrt_star import RRTStar from typing import List from ..component import Component @@ -23,7 +26,7 @@ import rospy from sensor_msgs.msg import Image from cv_bridge import CvBridge -from .occupancy_grid2 import OccupancyGrid2 +# from .occupancy_grid2 import OccupancyGrid2 import cv2 @@ -179,8 +182,11 @@ class RoutePlanningComponent(Component): """Reads a route from disk and returns it as the desired route.""" def __init__(self): print("Route Planning Component init") - self.planner = None self.route = None + self.planner = None + self.compute_parking_route = False + self.done_computing = False + self.already_computed = False def state_inputs(self): return ["all"] @@ -192,22 +198,42 @@ def rate(self): return 10.0 def update(self, state: AllState): - # print("Route Planner's mission:", state.mission_plan.planner_type.value) - # print("type of mission plan:", type(PlannerEnum.RRT_STAR)) - # print("Route Planner's mission:", state.mission_plan.planner_type.value == PlannerEnum.RRT_STAR.value) - # print("Route Planner's mission:", state.mission_plan.planner_type.value == PlannerEnum.PARKING.value) - # print("Mission plan:", state.mission_plan) - # print("Vehicle x:", state.vehicle.pose.x) - # print("Vehicle y:", state.vehicle.pose.y) - # print("Vehicle yaw:", state.vehicle.pose.yaw) - if state.mission_plan.type.name == "PARKING": - print("I am in PARKING mode") - # Return a route after doing some processing based on mission plan REMOVE ONCE OTHER PLANNERS ARE IMPLEMENTED + + if state.mission_plan.type.name == "PARKING" and not self.compute_parking_route: + # print("I am in BRAKING mode") + + desired_points = [(state.vehicle.pose.x, state.vehicle.pose.y), + (state.vehicle.pose.x, state.vehicle.pose.y)] + desired_path = Path(state.vehicle.pose.frame, desired_points) + + desired_path = desired_path.to_frame(state.vehicle.pose.frame, current_pose=state.vehicle.pose, start_pose_abs=state.start_vehicle_pose) + + self.compute_parking_route = True + + return desired_path + elif state.mission_plan.type.name == "PARKING" and self.compute_parking_route and not self.done_computing: + # print("I am in PARKING mode") + state.vehicle.pose.yaw = 0 # needed this to avoid a weird error in the parking planner + + if not self.already_computed: + self.planner = ParkingPlanner() + self.done_computing = True + self.route = self.planner.update(state) + self.route = self.route.to_frame(ObjectFrameEnum.START, current_pose=state.vehicle.pose, start_pose_abs=state.start_vehicle_pose) + self.planner.visualize_trajectory(self.route) + self.already_computed = True + self.route.points = self.route.points[:-2] # temporary fix to avoid going too far into the parking space, next perception PR will resolve + return self.route + elif state.mission_plan.type.name == "SCANNING": - print("I am in SCANNING mode") - else: - print("Unknown mode") + # print("I am in SCANNING mode") + desired_points = [(state.vehicle.pose.x, state.vehicle.pose.y), + (state.vehicle.pose.x + 1, state.vehicle.pose.y)] + desired_path = Path(ObjectFrameEnum.START, desired_points) + + desired_path = desired_path.to_frame(state.vehicle.pose.frame, current_pose=state.vehicle.pose, start_pose_abs=state.start_vehicle_pose) + return desired_path return self.route diff --git a/launch/parking_detection.yaml b/launch/parking_detection.yaml index 4745bdc1..611d51f8 100644 --- a/launch/parking_detection.yaml +++ b/launch/parking_detection.yaml @@ -13,20 +13,19 @@ recovery: drive: perception: state_estimation : GNSSStateEstimator - # agent_detection : cone_detection.ConeDetector3D obstacle_detection : type: cone_detection.ConeDetector3D - args: - camera_name: front_right #[front, front_right] - camera_calib_file: ./GEMstack/knowledge/calibration/cameras.yaml + # args: + # camera_name: front_right #[front, front_right] + # camera_calib_file: ./GEMstack/knowledge/calibration/cameras.yaml - # optional overrides - enable_tracking: False - visualize_2d: False - use_cyl_roi: False - save_data: False - orientation: False - use_start_frame: False + # # optional overrides + # enable_tracking: False + # visualize_2d: False + # use_cyl_roi: False + # save_data: False + # orientation: False + # use_start_frame: False parking_detection: type: parking_detection.ParkingSpotsDetector3D args: @@ -37,6 +36,30 @@ drive: planning: _mission_planner: type: parking_component.ParkingSim + _route_planner: + type: route_planning_component.RoutePlanningComponent + dubins: + integrator: + timestep: 1 + dt: .25 + max_velocity: 0.75 + max_turning_rate: 0.3 + tolerance: 0.25 + astar: + iterations: 20000 + heuristic: reeds_shepp + motion_planning: + type: longitudinal_planning.YieldTrajectoryPlanner + # type: yield_spline_planner.SplinePlanner + largs: + acceleration: 5 + deceleration: 2 + desired_speed: 2 + trajectory_tracking: + type: pure_pursuit.PurePursuitTrajectoryTracker + print: False + + #usually can keep this constant computation_graph: !include "../GEMstack/knowledge/defaults/computation_graph.yaml" @@ -84,12 +107,58 @@ variants: camera_name: front_right #[front, front_right] camera_calib_file: ./GEMstack/knowledge/calibration/cameras.yaml visualize_3d: True + planning: + _mission_planner: + type: parking_component.ParkingSim + # _route_planner: + # type: route_planning_component.RoutePlanningComponent + + fake_sim: + run: + mode: simulation + vehicle_interface: + type: gem_simulator.GEMDoubleIntegratorSimulationInterface + args: + scene: !relative_path '../scenes/xyhead_demo.yaml' + visualization: !include "klampt_visualization.yaml" + drive: + perception: + state_estimation : OmniscientStateEstimator + obstacle_detection : + type: cone_detection.FakConeDetector + # agent_detection : cone_detection_simple.ConeDetectorSimple3D + parking_detection: + type: parking_detection.ParkingSpotsDetector3D + args: + camera_name: front_right #[front, front_right] + camera_calib_file: ./GEMstack/knowledge/calibration/cameras.yaml + visualize_3d: True planning: _mission_planner: type: parking_component.ParkingSim _route_planner: type: route_planning_component.RoutePlanningComponent - + dubins: + integrator: + timestep: 1 + dt: .25 + max_velocity: 0.75 + max_turning_rate: 0.3 + tolerance: 0.25 + astar: + iterations: 500 + heuristic: reeds_shepp + motion_planning: + type: parking_motion_planning.YieldTrajectoryPlanner + # type: yield_spline_planner.SplinePlanner + largs: + acceleration: 5 + deceleration: 2 + desired_speed: 2 + trajectory_tracking: + type: pure_pursuit.PurePursuitTrajectoryTracker + print: False + sim: run: mode: simulation