diff --git a/GEMstack/offboard/calibration/capture_lidar_zed.py b/GEMstack/offboard/calibration/capture_lidar_zed.py index 21814b793..ee11ebb1a 100644 --- a/GEMstack/offboard/calibration/capture_lidar_zed.py +++ b/GEMstack/offboard/calibration/capture_lidar_zed.py @@ -1,99 +1,107 @@ -# ROS Headers +#!/usr/bin/env python3 +""" +Capture ROS messages and save each run in its own time-stamped folder. +""" + import rospy -from sensor_msgs.msg import Image,PointCloud2 +from sensor_msgs.msg import Image, PointCloud2 import sensor_msgs.point_cloud2 as pc2 import ctypes import struct - -# OpenCV and cv2 bridge import cv2 from cv_bridge import CvBridge import numpy as np import os +import datetime lidar_points = None camera_image = None depth_image = None bridge = CvBridge() -def lidar_callback(lidar : PointCloud2): +def lidar_callback(lidar: PointCloud2): global lidar_points lidar_points = lidar -def camera_callback(img : Image): +def camera_callback(img: Image): global camera_image camera_image = img -def depth_callback(img : Image): +def depth_callback(img: Image): global depth_image depth_image = img -def pc2_to_numpy(pc2_msg, want_rgb = False): +def pc2_to_numpy(pc2_msg, want_rgb=False): gen = pc2.read_points(pc2_msg, skip_nans=True) if want_rgb: - xyzpack = np.array(list(gen),dtype=np.float32) + xyzpack = np.array(list(gen), dtype=np.float32) if xyzpack.shape[1] != 4: raise ValueError("PointCloud2 does not have points") - xyzrgb = np.empty((xyzpack.shape[0],6)) - xyzrgb[:,:3] = xyzpack[:,:3] - for i,x in enumerate(xyzpack): - rgb = x[3] - # cast float32 to int so that bitwise operations are possible - s = struct.pack('>f' ,rgb) - i = struct.unpack('>l',s)[0] - # you can get back the float value by the inverse operations - pack = ctypes.c_uint32(i).value - r = (pack & 0x00FF0000)>> 16 - g = (pack & 0x0000FF00)>> 8 + xyzrgb = np.empty((xyzpack.shape[0], 6)) + xyzrgb[:, :3] = xyzpack[:, :3] + for i, x in enumerate(xyzpack): + rgb = x[3] + s = struct.pack('>f', rgb) + i_val = struct.unpack('>l', s)[0] + pack = ctypes.c_uint32(i_val).value + r = (pack & 0x00FF0000) >> 16 + g = (pack & 0x0000FF00) >> 8 b = (pack & 0x000000FF) - #r,g,b values in the 0-255 range - xyzrgb[i,3:] = (r,g,b) + xyzrgb[i, 3:] = (r, g, b) return xyzrgb else: - return np.array(list(gen),dtype=np.float32)[:,:3] + return np.array(list(gen), dtype=np.float32)[:, :3] -def save_scan(lidar_fn,color_fn,depth_fn): - print("Saving scan to",lidar_fn,color_fn,depth_fn) - pc = pc2_to_numpy(lidar_points,want_rgb=False) - np.savez(lidar_fn,pc) - cv2.imwrite(color_fn,bridge.imgmsg_to_cv2(camera_image)) +def save_scan(lidar_fn, color_fn, depth_fn): + print("Saving scan to", lidar_fn, color_fn, depth_fn) + pc = pc2_to_numpy(lidar_points, want_rgb=False) + np.savez(lidar_fn, pc) + cv2.imwrite(color_fn, bridge.imgmsg_to_cv2(camera_image)) dimage = bridge.imgmsg_to_cv2(depth_image) dimage_non_nan = dimage[np.isfinite(dimage)] - print("Depth range",np.min(dimage_non_nan),np.max(dimage_non_nan)) - dimage = np.nan_to_num(dimage,nan=0,posinf=0,neginf=0) - dimage = (dimage/4000*0xffff) - print("Depth pixel range",np.min(dimage),np.max(dimage)) + print("Depth range", np.min(dimage_non_nan), np.max(dimage_non_nan)) + dimage = np.nan_to_num(dimage, nan=0, posinf=0, neginf=0) + dimage = (dimage / 4000 * 0xffff) + print("Depth pixel range", np.min(dimage), np.max(dimage)) dimage = dimage.astype(np.uint16) - cv2.imwrite(depth_fn,dimage) + cv2.imwrite(depth_fn, dimage) + +def main(base_folder='data', start_index=1): + # unique folder for this capture run based on the current date/time. + run_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + run_folder = os.path.join(base_folder, run_timestamp) + os.makedirs(run_folder, exist_ok=True) + print("Capture run folder:", run_folder) + + rospy.init_node("capture_lidar_zed", disable_signals=True) + rospy.Subscriber("/lidar1/velodyne_points", PointCloud2, lidar_callback) + rospy.Subscriber("/zed2/zed_node/rgb/image_rect_color", Image, camera_callback) + rospy.Subscriber("/zed2/zed_node/depth/depth_registered", Image, depth_callback) -def main(folder='data',start_index=1): - rospy.init_node("capture_lidar_zed",disable_signals=True) - lidar_sub = rospy.Subscriber("/lidar1/velodyne_points", PointCloud2, lidar_callback) - camera_sub = rospy.Subscriber("/zed2/zed_node/rgb/image_rect_color", Image, camera_callback) - depth_sub = rospy.Subscriber("/zed2/zed_node/depth/depth_registered", Image, depth_callback) index = start_index print("Press any key to:") - print(" store lidar point clouds as npz") - print(" store color images as png") - print(" store depth images (m scaled by 0xffff/4000) as 16-bit tif") + print(" - store lidar point clouds as npz") + print(" - store color images as png") + print(" - store depth images (m scaled by 0xffff/4000) as 16-bit tif") print("Press Escape or Ctrl+C to quit") - while True: - if camera_image: - cv2.imshow("result",bridge.imgmsg_to_cv2(camera_image)) + + while not rospy.is_shutdown(): + if camera_image is not None: + cv2.imshow("result", bridge.imgmsg_to_cv2(camera_image)) key = cv2.waitKey(30) if key == -1: - #nothing - pass + pass # No key pressed. elif key == 27: - #escape - break + break # Escape key pressed. else: if lidar_points is None or camera_image is None or depth_image is None: print("Missing some messages?") else: - files = [os.path.join(folder,'lidar{}.npz'.format(index)), - os.path.join(folder,'color{}.png'.format(index)), - os.path.join(folder,'depth{}.tif'.format(index))] + files = [ + os.path.join(run_folder, 'lidar{}.npz'.format(index)), + os.path.join(run_folder, 'color{}.png'.format(index)), + os.path.join(run_folder, 'depth{}.tif'.format(index)) + ] save_scan(*files) index += 1 @@ -105,4 +113,4 @@ def main(folder='data',start_index=1): folder = sys.argv[1] if len(sys.argv) >= 3: start_index = int(sys.argv[2]) - main(folder,start_index) + main(folder, start_index) diff --git a/GEMstack/offboard/log_management/s3.py b/GEMstack/offboard/log_management/s3.py new file mode 100644 index 000000000..1a2550d00 --- /dev/null +++ b/GEMstack/offboard/log_management/s3.py @@ -0,0 +1,187 @@ +#!/usr/bin/env python3 +""" +This client interacts with S3 to upload (push) or download (pull) a specific folder. + +Before running this client make sure you have defined .env in root with following values: +AWS_ACCESS_KEY_ID=example +AWS_SECRET_ACCESS_KEY=example +AWS_DEFAULT_REGION=us-east-1 + +Requires: + pip install boto3 python-dotenv + +Example Usage: + + # Push a folder named "2025-02-12_15-30-00" inside "data/" to S3: + python3 GEMStack/offboard/log_management/s3.py \ + --action push \ + --folder 2025-02-12_15-30-00 \ + --bucket cs588 \ + --s3-prefix captures + + # Pull (download) that same folder from S3 into local "download/" directory: + python3 GEMStack/offboard/log_management/s3.py \ + --action pull \ + --folder 2025-02-12_15-30-00 \ + --bucket cs588 \ + --s3-prefix captures \ + --dest-dir download +""" + +import argparse +import boto3 +import os +import sys +from dotenv import load_dotenv + +def get_s3_client(): + """ + Initializes the S3 client using AWS credentials from environment variables. + Expects: + - AWS_ACCESS_KEY_ID + - AWS_SECRET_ACCESS_KEY + - AWS_DEFAULT_REGION + Exits if any of these are missing. + """ + # load environment variables from .env file + # override in case local config exists for AWS + load_dotenv(override=True) + + access_key = os.environ.get('AWS_ACCESS_KEY_ID') + secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY') + region = os.environ.get('AWS_DEFAULT_REGION') + + if not access_key or not secret_key or not region: + sys.exit( + "Error: AWS credentials not set. Please set AWS_ACCESS_KEY_ID, " + "AWS_SECRET_ACCESS_KEY, and AWS_DEFAULT_REGION environment variables (in .env)." + ) + + print(access_key, secret_key, region) + return boto3.client( + 's3', + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + region_name=region + ) + +def check_s3_connection(s3_client, bucket): + """ + Verifies that we can connect to S3 and access the specified bucket. + """ + try: + s3_client.head_bucket(Bucket=bucket) + print(f"Connection check: Successfully accessed bucket '{bucket}'") + except Exception as e: + sys.exit(f"Error: Could not connect to S3 bucket '{bucket}': {e}") + +def push_folder_to_s3(folder_path, bucket, s3_prefix): + """ + Walks through the folder and uploads each file to S3 under the given prefix. + Files are stored under the key: s3_prefix/folder_name/. + """ + s3_client = get_s3_client() + check_s3_connection(s3_client, bucket) + + folder_name = os.path.basename(folder_path) + files_uploaded = 0 + + for root, _, files in os.walk(folder_path): + for file in files: + local_path = os.path.join(root, file) + # determine the file's path relative to the folder being pushed. + relative_path = os.path.relpath(local_path, folder_path) + s3_key = os.path.join(s3_prefix, folder_name, relative_path) + try: + print(f"Uploading {local_path} to s3://{bucket}/{s3_key}") + s3_client.upload_file(local_path, bucket, s3_key) + files_uploaded += 1 + except Exception as e: + print(f"Error uploading {local_path}: {e}") + + if files_uploaded == 0: + sys.exit(f"Error: No files were uploaded from folder: {folder_path}") + +def pull_folder_from_s3(bucket, s3_prefix, folder_name, dest_dir): + """ + Downloads all objects from S3 whose key begins with s3_prefix/folder_name. + The folder will be recreated under `dest_dir/folder_name`. + """ + s3_client = get_s3_client() + check_s3_connection(s3_client, bucket) + + # this prefix is what we'll look for in the bucket. + prefix = os.path.join(s3_prefix, folder_name) + + paginator = s3_client.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=bucket, Prefix=prefix) + + found_files = False + for page in pages: + if 'Contents' not in page: + continue + for obj in page['Contents']: + key = obj['Key'] + # if the object key is exactly the "folder" (a directory placeholder), skip it. + # e.g. sometimes you can have an empty key or just a trailing slash. + if key.endswith("/"): + continue + + found_files = True + # figure out the relative path to recreate the same structure locally. + relative_path = os.path.relpath(key, prefix) + local_path = os.path.join(dest_dir, folder_name, relative_path) + + # ensure the local directory exists. + os.makedirs(os.path.dirname(local_path), exist_ok=True) + + print(f"Downloading s3://{bucket}/{key} to {local_path}") + try: + s3_client.download_file(bucket, key, local_path) + except Exception as e: + print(f"Error downloading s3://{bucket}/{key}: {e}") + + if not found_files: + sys.exit(f"Error: No files found in bucket '{bucket}' with prefix '{prefix}'") + +def main(): + parser = argparse.ArgumentParser( + description="Push or pull a specific folder to/from an S3 bucket." + ) + parser.add_argument("--action", choices=["push", "pull"], required=True, + help="Choose whether to push (upload) or pull (download) a folder.") + parser.add_argument("--folder", required=True, + help="Folder name (e.g. 2025-02-12_15-30-00)") + parser.add_argument("--base-dir", default="data", + help="Local base directory where capture runs are stored (for push).") + parser.add_argument("--dest-dir", default="download", + help="Local directory to place the downloaded folder (for pull).") + parser.add_argument("--bucket", required=True, + help="S3 bucket name.") + parser.add_argument("--s3-prefix", default="captures", + help="S3 prefix (folder) where data is stored or will be uploaded.") + args = parser.parse_args() + + if args.action == "push": + # pushing from local 'base_dir/folder' to S3 + folder_path = os.path.join(args.base_dir, args.folder) + if not os.path.exists(folder_path): + sys.exit(f"Error: Folder does not exist: {folder_path}") + + # check if the folder is empty. + folder_empty = True + for _, _, files in os.walk(folder_path): + if files: + folder_empty = False + break + if folder_empty: + sys.exit(f"Error: Folder is empty: {folder_path}") + + push_folder_to_s3(folder_path, args.bucket, args.s3_prefix) + + elif args.action == "pull": + # pulling from S3 (prefix/folder) to local 'dest_dir/folder' + pull_folder_from_s3(args.bucket, args.s3_prefix, args.folder, args.dest_dir) + +if __name__ == '__main__': + main() diff --git a/GEMstack/onboard/perception/pedestrian_detection.py b/GEMstack/onboard/perception/pedestrian_detection.py new file mode 100644 index 000000000..a6bec0e59 --- /dev/null +++ b/GEMstack/onboard/perception/pedestrian_detection.py @@ -0,0 +1,147 @@ +import math +import random +from ...state import AllState,VehicleState,ObjectPose,ObjectFrameEnum,AgentState,AgentEnum,AgentActivityEnum +from ..interface.gem import GEMInterface +from ..component import Component +#from ultralytics import YOLO +#import cv2 +from typing import Dict + +def box_to_fake_agent(box): + """Creates a fake agent state from an (x,y,w,h) bounding box. + + The location and size are pretty much meaningless since this is just giving a 2D location. + """ + x,y,w,h = box + pose = ObjectPose(t=0,x=x+w/2,y=y+h/2,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT) + dims = (w,h,0) + return AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0) + + +class PedestrianDetector2D(Component): + """Detects pedestrians.""" + def __init__(self,vehicle_interface : GEMInterface): + self.vehicle_interface = vehicle_interface + #self.detector = YOLO('../../knowledge/detection/yolov8n.pt') + self.last_person_boxes = [] + + def rate(self): + return 4.0 + + def state_inputs(self): + return ['vehicle'] + + def state_outputs(self): + return ['agents'] + + def initialize(self): + #tell the vehicle to use image_callback whenever 'front_camera' gets a reading, and it expects images of type cv2.Mat + #self.vehicle_interface.subscribe_sensor('front_camera',self.image_callback,cv2.Mat) + pass + + #def image_callback(self, image : cv2.Mat): + # detection_result = self.detector(image) + # self.last_person_boxes = [] + # #uncomment if you want to debug the detector... + # #for bb in self.last_person_boxes: + # # x,y,w,h = bb + # # cv2.rectangle(image, (int(x-w/2), int(y-h/2)), (int(x+w/2), int(y+h/2)), (255, 0, 255), 3) + # #cv2.imwrite("pedestrian_detections.png",image) + + def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: + res = {} + for i,b in enumerate(self.last_person_boxes): + x,y,w,h = b + res['pedestrian'+str(i)] = box_to_fake_agent(b) + if len(res) > 0: + print("Detected",len(res),"pedestrians") + return res + + +class FakePedestrianDetector2D(Component): + """Triggers a pedestrian detection at some random time ranges""" + # def __init__(self,vehicle_interface : GEMInterface): + # self.vehicle_interface = vehicle_interface + # self.times = [(5.0,20.0),(30.0,35.0)] + # self.t_start = None + def __init__(self, vehicle_interface): + self.vehicle_interface = vehicle_interface + self.t_start = None + self.last_spawn_time = 0 + self.spawn_interval = 3 # Spawn a pedestrian every 3 seconds + self.max_pedestrians = 10 # Maximum pedestrians at a time + self.pedestrians = {} + self.despawn_time = 30 # Despawn after 30 seconds + self.direction_change_interval = 10 # Change direction every 10 seconds + + def rate(self): + # return 4.0 + return 10.0 + + def state_inputs(self): + return ['vehicle'] + + def state_outputs(self): + return ['agents'] + + def update(self, vehicle: VehicleState) -> Dict[str, AgentState]: + """Simulates pedestrians moving freely in all directions with varied speeds.""" + + if self.t_start is None: + self.t_start = self.vehicle_interface.time() + + current_time = self.vehicle_interface.time() - self.t_start + res = {} + + # Spawn new pedestrians + if current_time - self.last_spawn_time > self.spawn_interval: + if len(self.pedestrians) < self.max_pedestrians: + agent_id = f"pedestrian{len(self.pedestrians)}" + + # Spawn at random locations in a 20x20 area + x_start = random.uniform(-10, 10) + y_start = random.uniform(-10, 10) + + # Random walking direction (angle in radians) + direction = random.uniform(0, 2 * math.pi) + + # Assign a random speed (between 0.5 and 1.5 m/s) + speed = random.uniform(0.5, 1.5) + + self.pedestrians[agent_id] = { + "x": x_start, + "y": y_start, + "speed": speed, + "direction": direction, + "last_direction_change": current_time, + "spawn_time": current_time + } + print(f"Spawned {agent_id} at ({x_start}, {y_start}) moving at {speed:.2f} m/s") + + self.last_spawn_time = current_time + + # Move pedestrians + for agent_id, state in list(self.pedestrians.items()): + t = current_time - state["spawn_time"] + speed = state["speed"] + + # Change direction periodically + if current_time - state["last_direction_change"] > self.direction_change_interval: + state["direction"] = random.uniform(0, 2 * math.pi) + state["last_direction_change"] = current_time + print(f"{agent_id} changed direction") + + # Compute new position based on speed and direction + x_new = state["x"] + speed * 0.1 * math.cos(state["direction"]) + y_new = state["y"] + speed * 0.1 * math.sin(state["direction"]) + + # Remove pedestrian after 30 seconds + if t > self.despawn_time: + print(f"{agent_id} despawns") + del self.pedestrians[agent_id] + else: + state["x"], state["y"] = x_new, y_new + res[agent_id] = box_to_fake_agent((x_new, y_new, 0, 0)) + + print(f"Currently tracking {len(self.pedestrians)} pedestrians") + return res diff --git a/homework/blink_component.py b/GEMstack/onboard/planning/blink_component.py similarity index 100% rename from homework/blink_component.py rename to GEMstack/onboard/planning/blink_component.py diff --git a/homework/longitudinal_planning.py b/GEMstack/onboard/planning/longitudinal_planning.py similarity index 100% rename from homework/longitudinal_planning.py rename to GEMstack/onboard/planning/longitudinal_planning.py diff --git a/homework/pedestrian_yield_logic.py b/GEMstack/onboard/planning/pedestrian_yield_logic.py similarity index 100% rename from homework/pedestrian_yield_logic.py rename to GEMstack/onboard/planning/pedestrian_yield_logic.py diff --git a/GEMstack/scripts/visualization.py b/GEMstack/scripts/visualization.py new file mode 100644 index 000000000..3ff9dada7 --- /dev/null +++ b/GEMstack/scripts/visualization.py @@ -0,0 +1,191 @@ +import json +import os +import time +from klampt import vis +from klampt.model.trajectory import Trajectory +import matplotlib.pyplot as plt +from ..onboard.visualization.klampt_visualization import KlamptVisualization +from ..onboard.visualization.mpl_visualization import MPLVisualization + +LOG_DIR = "logs" +BEHAVIOR_FILE = "behavior.json" + +def select_log_folder(): + log_folders = [f for f in os.listdir(LOG_DIR) if os.path.isdir(os.path.join(LOG_DIR, f))] + if not log_folders: + print("\033[91mNo log folders found.\033[0m") + return None + + print("\n\033[94mAvailable log folders:\033[0m") + for idx, folder in enumerate(log_folders): + print(f"{idx + 1}. {folder}") + + while True: + try: + choice = int(input("\n\033[93mEnter the number of the log folder to use:\033[0m ")) - 1 + if 0 <= choice < len(log_folders): + return os.path.join(LOG_DIR, log_folders[choice]) + print("\033[91mInvalid selection. Please enter a valid number.\033[0m") + except ValueError: + print("\033[91mPlease enter a number.\033[0m") + +def select_visualizer(): + print("\n\033[94mChoose a visualization method:\033[0m") + print("1. Klampt (3D visualization)") + print("2. MPL (Matplotlib 2D visualization)") + + while True: + try: + choice = int(input("\n\033[93mEnter 1 or 2:\033[0m ")) + if choice in [1, 2]: + return choice + print("\033[91mInvalid selection. Please enter 1 or 2.\033[0m") + except ValueError: + print("\033[91mPlease enter a valid number.\033[0m") + +# Load and extract pedestrian and vehicle data +def extract_behavior_data(log_dir): + behavior_path = os.path.join(log_dir, BEHAVIOR_FILE) + if not os.path.exists(behavior_path): + print(f"\033[91mError: {behavior_path} not found.\033[0m") + return None, None, None, None, None, None, None, None + + with open(behavior_path, "r") as f: + data = [json.loads(line) for line in f] + + pedestrian_paths = {} + pedestrian_times = {} + vehicle_path = [] + vehicle_times = [] + speeds = [] + accelerators = [] + brakes = [] + steering_angles = [] + + for entry in data: + time_stamp = entry.get("time", 0) + + # Extract pedestrian data + if "agents" in entry: + for agent_id, agent in entry["agents"].items(): + agent_data = agent.get("data", {}) + if agent_data.get("type") == 3: # Type 3 = pedestrian + pose = agent_data.get("pose", {}) + x, y = pose.get("x", 0), pose.get("y", 0) + + if agent_id not in pedestrian_paths: + pedestrian_paths[agent_id] = [] + pedestrian_times[agent_id] = [] + + pedestrian_paths[agent_id].append((x, y)) + pedestrian_times[agent_id].append(time_stamp) + + # Extract vehicle data + if "vehicle" in entry: + vehicle_data = entry["vehicle"].get("data", {}) + pose = vehicle_data.get("pose", {}) + x, y = pose.get("x", 0), pose.get("y", 0) + + vehicle_path.append((x, y)) + vehicle_times.append(time_stamp) + speeds.append(vehicle_data.get("v", 0)) + accelerators.append(vehicle_data.get("accelerator_pedal_position", 0)) + brakes.append(vehicle_data.get("brake_pedal_position", 0)) + steering_angles.append(vehicle_data.get("steering_wheel_angle", 0)) + + return pedestrian_paths, pedestrian_times, vehicle_path, vehicle_times, speeds, accelerators, brakes, steering_angles + +# Klampt 3D Visualization +def visualize_with_klampt(pedestrian_paths, pedestrian_times, vehicle_path): + """Uses Klampt to visualize pedestrian and vehicle paths.""" + vis.init("PyQt6") + vis.setWindowTitle("Pedestrian and Vehicle Path Visualization") + + klampt_vis = KlamptVisualization(vehicle_interface=None, rate=20.0) + + for agent_id, path in pedestrian_paths.items(): + if len(path) < 2: + continue + + times = pedestrian_times[agent_id] + path_3d = [[float(x), float(y), 0.0] for x, y in path] + + trajectory = Trajectory(times, path_3d) + vis.add(agent_id, trajectory, color=(0, 1, 0, 1), width=2) + + # if vehicle_path: + # vehicle_x, vehicle_y = zip(*vehicle_path) + # vehicle_tuples = [[float(x), float(y), 0.0] for x, y in zip(vehicle_x, vehicle_y)] + # vis.add("Vehicle Path", vehicle_tuples, color=(1, 0, 0, 1), width=2) + + klampt_vis.initialize() + vis.show() + + while vis.shown(): + time.sleep(0.05) + + klampt_vis.cleanup() + vis.kill() + +# MPL 2D Visualization +def visualize_with_mpl(pedestrian_paths, pedestrian_times, vehicle_path, vehicle_data): + vis = MPLVisualization(rate=10.0) + vis.initialize() + + fig, axs = vis.fig, vis.axs + axs[0].clear() + axs[1].clear() + + # Left Plot: Pedestrian & Vehicle Trajectories + axs[0].set_xlabel("X Position") + axs[0].set_ylabel("Y Position") + axs[0].set_title("Pedestrian & Vehicle Trajectories") + + for agent_id, path in pedestrian_paths.items(): + path_x, path_y = zip(*path) + axs[0].plot(path_x, path_y, linestyle='-', marker='o', label=f"Pedestrian {agent_id}") + + # if vehicle_path: + # vehicle_x, vehicle_y = zip(*vehicle_path) + # axs[0].plot(vehicle_x, vehicle_y, linestyle='-', marker='s', color='red', label="Vehicle Path") + + axs[0].legend() + + # Right Plot: Vehicle Controls Over Time + times, speeds, accelerators, brakes, steering_angles = vehicle_data + axs[1].set_xlabel("Time (s)") + axs[1].set_title("Vehicle Controls Over Time") + + axs[1].plot(times, speeds, label="Speed (m/s)", color="blue") + axs[1].plot(times, accelerators, label="Accelerator (%)", color="green") + axs[1].plot(times, brakes, label="Brake (%)", color="red") + axs[1].plot(times, steering_angles, label="Steering Angle", color="purple") + axs[1].legend() + + fig.canvas.draw_idle() + fig.canvas.flush_events() + plt.show(block=True) + + vis.cleanup() + +# Main Execution +if __name__ == "__main__": + visualizer_choice = select_visualizer() + selected_log_folder = select_log_folder() + + if selected_log_folder: + print(f"\n\033[94mLoading behavior data from:\033[0m {selected_log_folder}") + data = extract_behavior_data(selected_log_folder) + + pedestrian_paths, pedestrian_times, vehicle_path, vehicle_times, speeds, accelerators, brakes, steering_angles = data + vehicle_data = (vehicle_times, speeds, accelerators, brakes, steering_angles) + + if pedestrian_paths or vehicle_path: + if visualizer_choice == 1: + print("\033[92mUsing Klampt for visualization...\033[0m") + visualize_with_klampt(pedestrian_paths, pedestrian_times, vehicle_path) + else: + print("\033[92mUsing MPL (Matplotlib) for visualization...\033[0m") + visualize_with_mpl(pedestrian_paths, pedestrian_times, vehicle_path, vehicle_data) + else: + print("\033[91mNo pedestrian or vehicle data found.\033[0m") diff --git a/homework/pedestrian_detection.py b/homework/pedestrian_detection.py deleted file mode 100644 index 3134b1e35..000000000 --- a/homework/pedestrian_detection.py +++ /dev/null @@ -1,84 +0,0 @@ -from ...state import AllState,VehicleState,ObjectPose,ObjectFrameEnum,AgentState,AgentEnum,AgentActivityEnum -from ..interface.gem import GEMInterface -from ..component import Component -#from ultralytics import YOLO -#import cv2 -from typing import Dict - -def box_to_fake_agent(box): - """Creates a fake agent state from an (x,y,w,h) bounding box. - - The location and size are pretty much meaningless since this is just giving a 2D location. - """ - x,y,w,h = box - pose = ObjectPose(t=0,x=x+w/2,y=y+h/2,z=0,yaw=0,pitch=0,roll=0,frame=ObjectFrameEnum.CURRENT) - dims = (w,h,0) - return AgentState(pose=pose,dimensions=dims,outline=None,type=AgentEnum.PEDESTRIAN,activity=AgentActivityEnum.MOVING,velocity=(0,0,0),yaw_rate=0) - - -class PedestrianDetector2D(Component): - """Detects pedestrians.""" - def __init__(self,vehicle_interface : GEMInterface): - self.vehicle_interface = vehicle_interface - #self.detector = YOLO('../../knowledge/detection/yolov8n.pt') - self.last_person_boxes = [] - - def rate(self): - return 4.0 - - def state_inputs(self): - return ['vehicle'] - - def state_outputs(self): - return ['agents'] - - def initialize(self): - #tell the vehicle to use image_callback whenever 'front_camera' gets a reading, and it expects images of type cv2.Mat - #self.vehicle_interface.subscribe_sensor('front_camera',self.image_callback,cv2.Mat) - pass - - #def image_callback(self, image : cv2.Mat): - # detection_result = self.detector(image) - # self.last_person_boxes = [] - # #uncomment if you want to debug the detector... - # #for bb in self.last_person_boxes: - # # x,y,w,h = bb - # # cv2.rectangle(image, (int(x-w/2), int(y-h/2)), (int(x+w/2), int(y+h/2)), (255, 0, 255), 3) - # #cv2.imwrite("pedestrian_detections.png",image) - - def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: - res = {} - for i,b in enumerate(self.last_person_boxes): - x,y,w,h = b - res['pedestrian'+str(i)] = box_to_fake_agent(b) - if len(res) > 0: - print("Detected",len(res),"pedestrians") - return res - - -class FakePedestrianDetector2D(Component): - """Triggers a pedestrian detection at some random time ranges""" - def __init__(self,vehicle_interface : GEMInterface): - self.vehicle_interface = vehicle_interface - self.times = [(5.0,20.0),(30.0,35.0)] - self.t_start = None - - def rate(self): - return 4.0 - - def state_inputs(self): - return ['vehicle'] - - def state_outputs(self): - return ['agents'] - - def update(self, vehicle : VehicleState) -> Dict[str,AgentState]: - if self.t_start is None: - self.t_start = self.vehicle_interface.time() - t = self.vehicle_interface.time() - self.t_start - res = {} - for times in self.times: - if t >= times[0] and t <= times[1]: - res['pedestrian0'] = box_to_fake_agent((0,0,0,0)) - print("Detected a pedestrian") - return res diff --git a/homework/blink_launch.yaml b/launch/blink_launch.yaml similarity index 100% rename from homework/blink_launch.yaml rename to launch/blink_launch.yaml diff --git a/launch/klampt_visualization.yaml b/launch/klampt_visualization.yaml index 2265debf3..32ab587d8 100644 --- a/launch/klampt_visualization.yaml +++ b/launch/klampt_visualization.yaml @@ -2,6 +2,6 @@ type: klampt_visualization.KlamptVisualization args: rate: 20 #Don't include save_as if you don't want to save the video - save_as: - #save_as: "fixed_route_sim.mp4" + # save_as: + save_as: "fixed_route_sim.mp4" diff --git a/launch/mpl_visualization.yaml b/launch/mpl_visualization.yaml index b9642e4de..07ea90e95 100644 --- a/launch/mpl_visualization.yaml +++ b/launch/mpl_visualization.yaml @@ -2,6 +2,6 @@ type: mpl_visualization.MPLVisualization args: rate: 8 #Don't include save_as if you don't want to save the video - save_as: - #save_as: "fixed_route_sim.mp4" + # save_as: + save_as: "fixed_route_sim.mp4" multiprocess: True diff --git a/homework/pedestrian_detection.yaml b/launch/pedestrian_detection.yaml similarity index 100% rename from homework/pedestrian_detection.yaml rename to launch/pedestrian_detection.yaml