Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 60 additions & 52 deletions GEMstack/offboard/calibration/capture_lidar_zed.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,107 @@
# ROS Headers
#!/usr/bin/env python3
"""
Capture ROS messages and save each run in its own time-stamped folder.
"""

import rospy
from sensor_msgs.msg import Image,PointCloud2
from sensor_msgs.msg import Image, PointCloud2
import sensor_msgs.point_cloud2 as pc2
import ctypes
import struct

# OpenCV and cv2 bridge
import cv2
from cv_bridge import CvBridge
import numpy as np
import os
import datetime

lidar_points = None
camera_image = None
depth_image = None
bridge = CvBridge()

def lidar_callback(lidar : PointCloud2):
def lidar_callback(lidar: PointCloud2):
global lidar_points
lidar_points = lidar

def camera_callback(img : Image):
def camera_callback(img: Image):
global camera_image
camera_image = img

def depth_callback(img : Image):
def depth_callback(img: Image):
global depth_image
depth_image = img

def pc2_to_numpy(pc2_msg, want_rgb = False):
def pc2_to_numpy(pc2_msg, want_rgb=False):
gen = pc2.read_points(pc2_msg, skip_nans=True)
if want_rgb:
xyzpack = np.array(list(gen),dtype=np.float32)
xyzpack = np.array(list(gen), dtype=np.float32)
if xyzpack.shape[1] != 4:
raise ValueError("PointCloud2 does not have points")
xyzrgb = np.empty((xyzpack.shape[0],6))
xyzrgb[:,:3] = xyzpack[:,:3]
for i,x in enumerate(xyzpack):
rgb = x[3]
# cast float32 to int so that bitwise operations are possible
s = struct.pack('>f' ,rgb)
i = struct.unpack('>l',s)[0]
# you can get back the float value by the inverse operations
pack = ctypes.c_uint32(i).value
r = (pack & 0x00FF0000)>> 16
g = (pack & 0x0000FF00)>> 8
xyzrgb = np.empty((xyzpack.shape[0], 6))
xyzrgb[:, :3] = xyzpack[:, :3]
for i, x in enumerate(xyzpack):
rgb = x[3]
s = struct.pack('>f', rgb)
i_val = struct.unpack('>l', s)[0]
pack = ctypes.c_uint32(i_val).value
r = (pack & 0x00FF0000) >> 16
g = (pack & 0x0000FF00) >> 8
b = (pack & 0x000000FF)
#r,g,b values in the 0-255 range
xyzrgb[i,3:] = (r,g,b)
xyzrgb[i, 3:] = (r, g, b)
return xyzrgb
else:
return np.array(list(gen),dtype=np.float32)[:,:3]
return np.array(list(gen), dtype=np.float32)[:, :3]

def save_scan(lidar_fn,color_fn,depth_fn):
print("Saving scan to",lidar_fn,color_fn,depth_fn)
pc = pc2_to_numpy(lidar_points,want_rgb=False)
np.savez(lidar_fn,pc)
cv2.imwrite(color_fn,bridge.imgmsg_to_cv2(camera_image))
def save_scan(lidar_fn, color_fn, depth_fn):
print("Saving scan to", lidar_fn, color_fn, depth_fn)
pc = pc2_to_numpy(lidar_points, want_rgb=False)
np.savez(lidar_fn, pc)
cv2.imwrite(color_fn, bridge.imgmsg_to_cv2(camera_image))
dimage = bridge.imgmsg_to_cv2(depth_image)
dimage_non_nan = dimage[np.isfinite(dimage)]
print("Depth range",np.min(dimage_non_nan),np.max(dimage_non_nan))
dimage = np.nan_to_num(dimage,nan=0,posinf=0,neginf=0)
dimage = (dimage/4000*0xffff)
print("Depth pixel range",np.min(dimage),np.max(dimage))
print("Depth range", np.min(dimage_non_nan), np.max(dimage_non_nan))
dimage = np.nan_to_num(dimage, nan=0, posinf=0, neginf=0)
dimage = (dimage / 4000 * 0xffff)
print("Depth pixel range", np.min(dimage), np.max(dimage))
dimage = dimage.astype(np.uint16)
cv2.imwrite(depth_fn,dimage)
cv2.imwrite(depth_fn, dimage)

def main(base_folder='data', start_index=1):
# unique folder for this capture run based on the current date/time.
run_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
run_folder = os.path.join(base_folder, run_timestamp)
os.makedirs(run_folder, exist_ok=True)
print("Capture run folder:", run_folder)

rospy.init_node("capture_lidar_zed", disable_signals=True)
rospy.Subscriber("/lidar1/velodyne_points", PointCloud2, lidar_callback)
rospy.Subscriber("/zed2/zed_node/rgb/image_rect_color", Image, camera_callback)
rospy.Subscriber("/zed2/zed_node/depth/depth_registered", Image, depth_callback)

def main(folder='data',start_index=1):
rospy.init_node("capture_lidar_zed",disable_signals=True)
lidar_sub = rospy.Subscriber("/lidar1/velodyne_points", PointCloud2, lidar_callback)
camera_sub = rospy.Subscriber("/zed2/zed_node/rgb/image_rect_color", Image, camera_callback)
depth_sub = rospy.Subscriber("/zed2/zed_node/depth/depth_registered", Image, depth_callback)
index = start_index
print("Press any key to:")
print(" store lidar point clouds as npz")
print(" store color images as png")
print(" store depth images (m scaled by 0xffff/4000) as 16-bit tif")
print(" - store lidar point clouds as npz")
print(" - store color images as png")
print(" - store depth images (m scaled by 0xffff/4000) as 16-bit tif")
print("Press Escape or Ctrl+C to quit")
while True:
if camera_image:
cv2.imshow("result",bridge.imgmsg_to_cv2(camera_image))

while not rospy.is_shutdown():
if camera_image is not None:
cv2.imshow("result", bridge.imgmsg_to_cv2(camera_image))
key = cv2.waitKey(30)
if key == -1:
#nothing
pass
pass # No key pressed.
elif key == 27:
#escape
break
break # Escape key pressed.
else:
if lidar_points is None or camera_image is None or depth_image is None:
print("Missing some messages?")
else:
files = [os.path.join(folder,'lidar{}.npz'.format(index)),
os.path.join(folder,'color{}.png'.format(index)),
os.path.join(folder,'depth{}.tif'.format(index))]
files = [
os.path.join(run_folder, 'lidar{}.npz'.format(index)),
os.path.join(run_folder, 'color{}.png'.format(index)),
os.path.join(run_folder, 'depth{}.tif'.format(index))
]
save_scan(*files)
index += 1

Expand All @@ -105,4 +113,4 @@ def main(folder='data',start_index=1):
folder = sys.argv[1]
if len(sys.argv) >= 3:
start_index = int(sys.argv[2])
main(folder,start_index)
main(folder, start_index)
187 changes: 187 additions & 0 deletions GEMstack/offboard/log_management/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#!/usr/bin/env python3
"""
This client interacts with S3 to upload (push) or download (pull) a specific folder.

Before running this client make sure you have defined .env in root with following values:
AWS_ACCESS_KEY_ID=example
AWS_SECRET_ACCESS_KEY=example
AWS_DEFAULT_REGION=us-east-1

Requires:
pip install boto3 python-dotenv

Example Usage:

# Push a folder named "2025-02-12_15-30-00" inside "data/" to S3:
python3 GEMStack/offboard/log_management/s3.py \
--action push \
--folder 2025-02-12_15-30-00 \
--bucket cs588 \
--s3-prefix captures

# Pull (download) that same folder from S3 into local "download/" directory:
python3 GEMStack/offboard/log_management/s3.py \
--action pull \
--folder 2025-02-12_15-30-00 \
--bucket cs588 \
--s3-prefix captures \
--dest-dir download
"""

import argparse
import boto3
import os
import sys
from dotenv import load_dotenv

def get_s3_client():
"""
Initializes the S3 client using AWS credentials from environment variables.
Expects:
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- AWS_DEFAULT_REGION
Exits if any of these are missing.
"""
# load environment variables from .env file
# override in case local config exists for AWS
load_dotenv(override=True)

access_key = os.environ.get('AWS_ACCESS_KEY_ID')
secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
region = os.environ.get('AWS_DEFAULT_REGION')

if not access_key or not secret_key or not region:
sys.exit(
"Error: AWS credentials not set. Please set AWS_ACCESS_KEY_ID, "
"AWS_SECRET_ACCESS_KEY, and AWS_DEFAULT_REGION environment variables (in .env)."
)

print(access_key, secret_key, region)
return boto3.client(
's3',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region
)

def check_s3_connection(s3_client, bucket):
"""
Verifies that we can connect to S3 and access the specified bucket.
"""
try:
s3_client.head_bucket(Bucket=bucket)
print(f"Connection check: Successfully accessed bucket '{bucket}'")
except Exception as e:
sys.exit(f"Error: Could not connect to S3 bucket '{bucket}': {e}")

def push_folder_to_s3(folder_path, bucket, s3_prefix):
"""
Walks through the folder and uploads each file to S3 under the given prefix.
Files are stored under the key: s3_prefix/folder_name/<relative_file_path>.
"""
s3_client = get_s3_client()
check_s3_connection(s3_client, bucket)

folder_name = os.path.basename(folder_path)
files_uploaded = 0

for root, _, files in os.walk(folder_path):
for file in files:
local_path = os.path.join(root, file)
# determine the file's path relative to the folder being pushed.
relative_path = os.path.relpath(local_path, folder_path)
s3_key = os.path.join(s3_prefix, folder_name, relative_path)
try:
print(f"Uploading {local_path} to s3://{bucket}/{s3_key}")
s3_client.upload_file(local_path, bucket, s3_key)
files_uploaded += 1
except Exception as e:
print(f"Error uploading {local_path}: {e}")

if files_uploaded == 0:
sys.exit(f"Error: No files were uploaded from folder: {folder_path}")

def pull_folder_from_s3(bucket, s3_prefix, folder_name, dest_dir):
"""
Downloads all objects from S3 whose key begins with s3_prefix/folder_name.
The folder will be recreated under `dest_dir/folder_name`.
"""
s3_client = get_s3_client()
check_s3_connection(s3_client, bucket)

# this prefix is what we'll look for in the bucket.
prefix = os.path.join(s3_prefix, folder_name)

paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=bucket, Prefix=prefix)

found_files = False
for page in pages:
if 'Contents' not in page:
continue
for obj in page['Contents']:
key = obj['Key']
# if the object key is exactly the "folder" (a directory placeholder), skip it.
# e.g. sometimes you can have an empty key or just a trailing slash.
if key.endswith("/"):
continue

found_files = True
# figure out the relative path to recreate the same structure locally.
relative_path = os.path.relpath(key, prefix)
local_path = os.path.join(dest_dir, folder_name, relative_path)

# ensure the local directory exists.
os.makedirs(os.path.dirname(local_path), exist_ok=True)

print(f"Downloading s3://{bucket}/{key} to {local_path}")
try:
s3_client.download_file(bucket, key, local_path)
except Exception as e:
print(f"Error downloading s3://{bucket}/{key}: {e}")

if not found_files:
sys.exit(f"Error: No files found in bucket '{bucket}' with prefix '{prefix}'")

def main():
parser = argparse.ArgumentParser(
description="Push or pull a specific folder to/from an S3 bucket."
)
parser.add_argument("--action", choices=["push", "pull"], required=True,
help="Choose whether to push (upload) or pull (download) a folder.")
parser.add_argument("--folder", required=True,
help="Folder name (e.g. 2025-02-12_15-30-00)")
parser.add_argument("--base-dir", default="data",
help="Local base directory where capture runs are stored (for push).")
parser.add_argument("--dest-dir", default="download",
help="Local directory to place the downloaded folder (for pull).")
parser.add_argument("--bucket", required=True,
help="S3 bucket name.")
parser.add_argument("--s3-prefix", default="captures",
help="S3 prefix (folder) where data is stored or will be uploaded.")
args = parser.parse_args()

if args.action == "push":
# pushing from local 'base_dir/folder' to S3
folder_path = os.path.join(args.base_dir, args.folder)
if not os.path.exists(folder_path):
sys.exit(f"Error: Folder does not exist: {folder_path}")

# check if the folder is empty.
folder_empty = True
for _, _, files in os.walk(folder_path):
if files:
folder_empty = False
break
if folder_empty:
sys.exit(f"Error: Folder is empty: {folder_path}")

push_folder_to_s3(folder_path, args.bucket, args.s3_prefix)

elif args.action == "pull":
# pulling from S3 (prefix/folder) to local 'dest_dir/folder'
pull_folder_from_s3(args.bucket, args.s3_prefix, args.folder, args.dest_dir)

if __name__ == '__main__':
main()
Loading
Loading