From 1a7002de595f04436bd2a8593284de0f34255ea0 Mon Sep 17 00:00:00 2001
From: blankey1337 <42594751+blankey1337@users.noreply.github.com>
Date: Fri, 28 Nov 2025 11:34:40 -0800
Subject: [PATCH 1/3] Add URDF model and visualization script
- Created software/src/lerobot/robots/alohamini/alohamini.urdf with base, lift, and dual arms
- Created software/examples/debug/visualize_urdf.py to visualize URDF in Rerun
---
software/examples/debug/visualize_urdf.py | 186 +++++++++++
.../lerobot/robots/alohamini/alohamini.urdf | 314 ++++++++++++++++++
2 files changed, 500 insertions(+)
create mode 100644 software/examples/debug/visualize_urdf.py
create mode 100644 software/src/lerobot/robots/alohamini/alohamini.urdf
diff --git a/software/examples/debug/visualize_urdf.py b/software/examples/debug/visualize_urdf.py
new file mode 100644
index 0000000..94f03d5
--- /dev/null
+++ b/software/examples/debug/visualize_urdf.py
@@ -0,0 +1,186 @@
+import argparse
+import time
+import math
+import xml.etree.ElementTree as ET
+import numpy as np
+import rerun as rr
+from scipy.spatial.transform import Rotation as R
+
+def parse_urdf(urdf_path):
+ tree = ET.parse(urdf_path)
+ root = tree.getroot()
+
+ links = {}
+ joints = []
+
+ for link in root.findall('link'):
+ name = link.get('name')
+ visual = link.find('visual')
+ geometry_info = None
+ color_info = None
+ origin_info = None
+
+ if visual is not None:
+ geo = visual.find('geometry')
+ if geo is not None:
+ box = geo.find('box')
+ cylinder = geo.find('cylinder')
+ if box is not None:
+ geometry_info = {'type': 'box', 'size': [float(x) for x in box.get('size').split()]}
+ elif cylinder is not None:
+ geometry_info = {'type': 'cylinder', 'length': float(cylinder.get('length')), 'radius': float(cylinder.get('radius'))}
+
+ mat = visual.find('material')
+ if mat is not None:
+ color_info = mat.get('name') # Simplified
+
+ orig = visual.find('origin')
+ if orig is not None:
+ xyz = [float(x) for x in orig.get('xyz', '0 0 0').split()]
+ rpy = [float(x) for x in orig.get('rpy', '0 0 0').split()]
+ origin_info = {'xyz': xyz, 'rpy': rpy}
+ else:
+ origin_info = {'xyz': [0,0,0], 'rpy': [0,0,0]}
+
+ links[name] = {
+ 'visual': geometry_info,
+ 'color': color_info,
+ 'visual_origin': origin_info
+ }
+
+ for joint in root.findall('joint'):
+ name = joint.get('name')
+ type_ = joint.get('type')
+ parent = joint.find('parent').get('link')
+ child = joint.find('child').get('link')
+
+ origin = joint.find('origin')
+ xyz = [float(x) for x in origin.get('xyz', '0 0 0').split()] if origin is not None else [0,0,0]
+ rpy = [float(x) for x in origin.get('rpy', '0 0 0').split()] if origin is not None else [0,0,0]
+
+ axis_elem = joint.find('axis')
+ axis = [float(x) for x in axis_elem.get('xyz', '1 0 0').split()] if axis_elem is not None else [1,0,0]
+
+ limit = joint.find('limit')
+ limits = None
+ if limit is not None:
+ limits = (float(limit.get('lower', -3.14)), float(limit.get('upper', 3.14)))
+
+ joints.append({
+ 'name': name,
+ 'type': type_,
+ 'parent': parent,
+ 'child': child,
+ 'xyz': xyz,
+ 'rpy': rpy,
+ 'axis': axis,
+ 'limits': limits
+ })
+
+ return links, joints
+
+def get_transform(xyz, rpy):
+ rot = R.from_euler('xyz', rpy).as_matrix()
+ T = np.eye(4)
+ T[:3, :3] = rot
+ T[:3, 3] = xyz
+ return T
+
+def visualize(urdf_path):
+ rr.init("urdf_visualizer", spawn=True)
+
+ links, joints = parse_urdf(urdf_path)
+
+ # Build tree structure
+ # joint_map: parent_link -> list of joints where this link is parent
+ joint_map = {}
+ for j in joints:
+ p = j['parent']
+ if p not in joint_map:
+ joint_map[p] = []
+ joint_map[p].append(j)
+
+ # State
+ t = 0.0
+
+ while True:
+ # Update joint values (sine waves)
+ joint_values = {}
+ for j in joints:
+ if j['type'] in ['revolute', 'continuous']:
+ val = math.sin(t + sum(map(ord, j['name']))) # Random phase
+ joint_values[j['name']] = val
+ elif j['type'] == 'prismatic':
+ val = 0.3 + 0.3 * math.sin(t)
+ joint_values[j['name']] = val
+ else:
+ joint_values[j['name']] = 0.0
+
+ # FK
+ # Stack: (link_name, accumulated_transform)
+ stack = [('base_link', np.eye(4))]
+
+ while stack:
+ link_name, T_parent = stack.pop()
+
+ # Log link visual
+ link_data = links.get(link_name)
+ if link_data and link_data['visual']:
+ # Visual origin offset
+ v_orig = link_data['visual_origin']
+ T_visual_offset = get_transform(v_orig['xyz'], v_orig['rpy'])
+ T_visual = T_parent @ T_visual_offset
+
+ rr.set_time_seconds("sim_time", t)
+
+ # Decompose for rerun
+ trans = T_visual[:3, 3]
+ rot = R.from_matrix(T_visual[:3, :3]).as_quat() # xyzw
+ # Rearrange to xyzw for scipy -> xyzw for rerun (check convention, rerun uses xyzw)
+
+ entity_path = f"robot/{link_name}"
+
+ geo = link_data['visual']
+ if geo['type'] == 'box':
+ rr.log(entity_path, rr.Boxes3D(half_sizes=[s/2 for s in geo['size']], centers=[0,0,0]), rr.Transform3D(translation=trans, rotation=rr.Quaternion(xyzw=rot)))
+ elif geo['type'] == 'cylinder':
+ # Rerun cylinder is along Z?
+ # URDF cylinder is along Z (usually)
+ # We might need to check visualization, but simple logging:
+ # Rerun doesn't have Cylinder3D primitive in older versions, checking...
+ # It does have standard primitives now? Or we use Mesh3D / Points.
+ # As fallback, log a box approximating cylinder or use Points if needed.
+ # Use Box for now to be safe and simple.
+ rr.log(entity_path, rr.Boxes3D(half_sizes=[geo['radius'], geo['radius'], geo['length']/2], centers=[0,0,0]), rr.Transform3D(translation=trans, rotation=rr.Quaternion(xyzw=rot)))
+
+ # Children
+ children_joints = joint_map.get(link_name, [])
+ for j in children_joints:
+ # Static transform
+ T_static = get_transform(j['xyz'], j['rpy'])
+
+ # Joint transform
+ T_joint = np.eye(4)
+ if j['type'] in ['revolute', 'continuous']:
+ angle = joint_values.get(j['name'], 0)
+ axis = np.array(j['axis'])
+ # Axis-angle rotation
+ rot_j = R.from_rotvec(axis * angle).as_matrix()
+ T_joint[:3, :3] = rot_j
+ elif j['type'] == 'prismatic':
+ dist = joint_values.get(j['name'], 0)
+ axis = np.array(j['axis'])
+ T_joint[:3, 3] = axis * dist
+
+ T_child = T_parent @ T_static @ T_joint
+ stack.append((j['child'], T_child))
+
+ t += 0.05
+ time.sleep(0.05)
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--urdf", default="../../src/lerobot/robots/alohamini/alohamini.urdf")
+ args = parser.parse_args()
+
+ visualize(args.urdf)
diff --git a/software/src/lerobot/robots/alohamini/alohamini.urdf b/software/src/lerobot/robots/alohamini/alohamini.urdf
new file mode 100644
index 0000000..a42d401
--- /dev/null
+++ b/software/src/lerobot/robots/alohamini/alohamini.urdf
@@ -0,0 +1,314 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
From e1bc3ec30d14a35b6eea77c221c033ea1e9d1cd8 Mon Sep 17 00:00:00 2001
From: blankey1337 <42594751+blankey1337@users.noreply.github.com>
Date: Fri, 28 Nov 2025 11:36:12 -0800
Subject: [PATCH 2/3] Add Web Dashboard and update ZMQ to PUB/SUB
- Created software/dashboard/app.py Flask application
- Created software/dashboard/templates/index.html
- Updated LeKiwiHost to use PUB socket for observations (was PUSH)
- Updated LeKiwiClient to use SUB socket for observations (was PULL)
- Added flask and pyzmq to requirements.txt
---
software/dashboard/app.py | 84 ++++++++++++++++++
software/dashboard/templates/index.html | 88 +++++++++++++++++++
software/requirements.txt | 3 +-
.../lerobot/robots/alohamini/lekiwi_client.py | 3 +-
.../lerobot/robots/alohamini/lekiwi_host.py | 2 +-
5 files changed, 177 insertions(+), 3 deletions(-)
create mode 100644 software/dashboard/app.py
create mode 100644 software/dashboard/templates/index.html
diff --git a/software/dashboard/app.py b/software/dashboard/app.py
new file mode 100644
index 0000000..008bbbb
--- /dev/null
+++ b/software/dashboard/app.py
@@ -0,0 +1,84 @@
+import json
+import threading
+import time
+import base64
+import zmq
+import cv2
+import numpy as np
+from flask import Flask, render_template, Response, jsonify
+
+app = Flask(__name__)
+
+# Global state
+latest_observation = {}
+lock = threading.Lock()
+connected = False
+
+def zmq_worker(ip='127.0.0.1', port=5556):
+ global latest_observation, connected
+ context = zmq.Context()
+ socket = context.socket(zmq.SUB)
+ socket.setsockopt(zmq.SUBSCRIBE, b"")
+ socket.connect(f"tcp://{ip}:{port}")
+ socket.setsockopt(zmq.CONFLATE, 1)
+
+ print(f"Connecting to ZMQ Stream at {ip}:{port}...")
+
+ while True:
+ try:
+ msg = socket.recv_string()
+ data = json.loads(msg)
+
+ with lock:
+ latest_observation = data
+ connected = True
+ except Exception as e:
+ print(f"Error in ZMQ worker: {e}")
+ connected = False
+ time.sleep(1)
+
+def generate_frames(camera_name):
+ while True:
+ frame_data = None
+ with lock:
+ if camera_name in latest_observation:
+ b64_str = latest_observation[camera_name]
+ if b64_str:
+ try:
+ # It is already a base64 encoded JPG from the host
+ frame_data = base64.b64decode(b64_str)
+ except Exception:
+ pass
+
+ if frame_data:
+ yield (b'--frame\r\n'
+ b'Content-Type: image/jpeg\r\n\r\n' + frame_data + b'\r\n')
+ else:
+ # Return a blank or placeholder image if no data
+ pass
+
+ time.sleep(0.05) # Limit FPS for browser
+
+@app.route('/')
+def index():
+ return render_template('index.html')
+
+@app.route('/video_feed/')
+def video_feed(camera_name):
+ return Response(generate_frames(camera_name),
+ mimetype='multipart/x-mixed-replace; boundary=frame')
+
+@app.route('/api/status')
+def get_status():
+ with lock:
+ # Filter out large image data for status endpoint
+ status = {k: v for k, v in latest_observation.items() if not (isinstance(v, str) and len(v) > 1000)}
+ status['connected'] = connected
+ return jsonify(status)
+
+if __name__ == '__main__':
+ # Start ZMQ thread
+ t = threading.Thread(target=zmq_worker, daemon=True)
+ t.start()
+
+ app.run(host='0.0.0.0', port=5000, debug=False)
diff --git a/software/dashboard/templates/index.html b/software/dashboard/templates/index.html
new file mode 100644
index 0000000..ce3baa1
--- /dev/null
+++ b/software/dashboard/templates/index.html
@@ -0,0 +1,88 @@
+
+
+
+
+ AlohaMini Dashboard
+
+
+
+ AlohaMini Dashboard
+
+
+
+
+
+
diff --git a/software/requirements.txt b/software/requirements.txt
index b78df56..bacc09a 100644
--- a/software/requirements.txt
+++ b/software/requirements.txt
@@ -4,5 +4,6 @@ rerun-sdk
sounddevice
dashscope
scipy
+flask
+pyzmq
ruff; extra == 'dev'
-
diff --git a/software/src/lerobot/robots/alohamini/lekiwi_client.py b/software/src/lerobot/robots/alohamini/lekiwi_client.py
index 84c6f50..94f8fa8 100644
--- a/software/src/lerobot/robots/alohamini/lekiwi_client.py
+++ b/software/src/lerobot/robots/alohamini/lekiwi_client.py
@@ -143,7 +143,8 @@ def connect(self) -> None:
self.zmq_cmd_socket.connect(zmq_cmd_locator)
self.zmq_cmd_socket.setsockopt(zmq.CONFLATE, 1)
- self.zmq_observation_socket = self.zmq_context.socket(zmq.PULL)
+ self.zmq_observation_socket = self.zmq_context.socket(zmq.SUB)
+ self.zmq_observation_socket.setsockopt(zmq.SUBSCRIBE, b"")
zmq_observations_locator = f"tcp://{self.remote_ip}:{self.port_zmq_observations}"
self.zmq_observation_socket.connect(zmq_observations_locator)
self.zmq_observation_socket.setsockopt(zmq.CONFLATE, 1)
diff --git a/software/src/lerobot/robots/alohamini/lekiwi_host.py b/software/src/lerobot/robots/alohamini/lekiwi_host.py
index 0079b66..0b52837 100644
--- a/software/src/lerobot/robots/alohamini/lekiwi_host.py
+++ b/software/src/lerobot/robots/alohamini/lekiwi_host.py
@@ -34,7 +34,7 @@ def __init__(self, config: LeKiwiHostConfig):
self.zmq_cmd_socket.setsockopt(zmq.CONFLATE, 1)
self.zmq_cmd_socket.bind(f"tcp://*:{config.port_zmq_cmd}")
- self.zmq_observation_socket = self.zmq_context.socket(zmq.PUSH)
+ self.zmq_observation_socket = self.zmq_context.socket(zmq.PUB)
self.zmq_observation_socket.setsockopt(zmq.CONFLATE, 1)
self.zmq_observation_socket.bind(f"tcp://*:{config.port_zmq_observations}")
From 4bd18968da33dbea84fbedfe155daa9b551fd3f3 Mon Sep 17 00:00:00 2001
From: blankey1337 <42594751+blankey1337@users.noreply.github.com>
Date: Fri, 28 Nov 2025 11:40:20 -0800
Subject: [PATCH 3/3] Add Kinematic Simulator
- Created LeKiwiSim class in lekiwi.py that mimics LeKiwi but maintains kinematic state
- Updated teleoperate_bi.py to use LeKiwiSim when --use_dummy is set
- LeKiwiSim now supports base movement integration and joint state tracking
---
software/examples/alohamini/teleoperate_bi.py | 31 ++-
.../src/lerobot/robots/alohamini/__init__.py | 2 +-
.../src/lerobot/robots/alohamini/lekiwi.py | 218 +++++++++++++++++-
3 files changed, 233 insertions(+), 18 deletions(-)
diff --git a/software/examples/alohamini/teleoperate_bi.py b/software/examples/alohamini/teleoperate_bi.py
index 0f9c09a..1d77f19 100644
--- a/software/examples/alohamini/teleoperate_bi.py
+++ b/software/examples/alohamini/teleoperate_bi.py
@@ -3,7 +3,7 @@
import os
import time
-from lerobot.robots.alohamini import LeKiwiClient, LeKiwiClientConfig
+from lerobot.robots.alohamini import LeKiwiClient, LeKiwiClientConfig, LeKiwiSim
from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop, KeyboardTeleopConfig
from lerobot.teleoperators.bi_so100_leader import BiSO100Leader, BiSO100LeaderConfig
from lerobot.utils.robot_utils import busy_wait
@@ -11,7 +11,7 @@
# ============ Parameter Section ============ #
parser = argparse.ArgumentParser()
-parser.add_argument("--use_dummy", action="store_true", help="Do not connect robot, only print actions")
+parser.add_argument("--use_dummy", action="store_true", help="Do not connect robot, use simulation")
parser.add_argument("--fps", type=int, default=30, help="Main loop frequency (frames per second)")
parser.add_argument("--remote_ip", type=str, default="127.0.0.1", help="LeKiwi host IP address")
parser.add_argument("--left_arm_port", type=str, default="/dev/am_arm_leader_left", help="Left leader arm port")
@@ -23,9 +23,6 @@
FPS = args.fps
# ========================================== #
-if USE_DUMMY:
- print("🧪 USE_DUMMY mode enabled: robot will not connect, only print actions.")
-
# Create configs
robot_config = LeKiwiClientConfig(remote_ip=args.remote_ip, id="my_alohamini")
bi_cfg = BiSO100LeaderConfig(
@@ -36,14 +33,15 @@
leader = BiSO100Leader(bi_cfg)
keyboard_config = KeyboardTeleopConfig(id="my_laptop_keyboard")
keyboard = KeyboardTeleop(keyboard_config)
-robot = LeKiwiClient(robot_config)
-# Connection logic
-if not USE_DUMMY:
- robot.connect()
+if USE_DUMMY:
+ print("🧪 USE_DUMMY mode enabled: Using LeKiwiSim.")
+ robot = LeKiwiSim(robot_config)
else:
- print("🧪 robot.connect() skipped, only printing actions.")
+ robot = LeKiwiClient(robot_config)
+# Connection logic
+robot.connect() # Sim connect or Client connect
leader.connect()
keyboard.connect()
@@ -56,21 +54,22 @@
while True:
t0 = time.perf_counter()
- observation = robot.get_observation() if not USE_DUMMY else {}
+ # Get observation (Sim provides real-time state, Client provides remote state)
+ observation = robot.get_observation()
arm_actions = leader.get_action()
arm_actions = {f"arm_{k}": v for k, v in arm_actions.items()}
keyboard_keys = keyboard.get_action()
+
+ # Use robot-specific mapping (works for Client and Sim now)
base_action = robot._from_keyboard_to_base_action(keyboard_keys)
lift_action = robot._from_keyboard_to_lift_action(keyboard_keys)
action = {**arm_actions, **base_action, **lift_action}
log_rerun_data(observation, action)
- if USE_DUMMY:
- print(f"[USE_DUMMY] action → {action}")
- else:
- robot.send_action(action)
- print(f"Sent action → {action}")
+ # Send action (Sim updates state, Client sends to host)
+ robot.send_action(action)
+ print(f"Sent action → {action}")
busy_wait(max(1.0 / FPS - (time.perf_counter() - t0), 0.0))
diff --git a/software/src/lerobot/robots/alohamini/__init__.py b/software/src/lerobot/robots/alohamini/__init__.py
index ada2ff3..8a6733b 100644
--- a/software/src/lerobot/robots/alohamini/__init__.py
+++ b/software/src/lerobot/robots/alohamini/__init__.py
@@ -15,5 +15,5 @@
# limitations under the License.
from .config_lekiwi import LeKiwiClientConfig, LeKiwiConfig
-from .lekiwi import LeKiwi
+from .lekiwi import LeKiwi, LeKiwiSim
from .lekiwi_client import LeKiwiClient
diff --git a/software/src/lerobot/robots/alohamini/lekiwi.py b/software/src/lerobot/robots/alohamini/lekiwi.py
index 6c5bc05..fb39895 100644
--- a/software/src/lerobot/robots/alohamini/lekiwi.py
+++ b/software/src/lerobot/robots/alohamini/lekiwi.py
@@ -35,7 +35,7 @@
from ..robot import Robot
from ..utils import ensure_safe_goal_position
-from .config_lekiwi import LeKiwiConfig
+from .config_lekiwi import LeKiwiConfig, LeKiwiClientConfig
logger = logging.getLogger(__name__)
@@ -122,6 +122,7 @@ def _state_ft(self) -> dict[str, type]:
"arm_right_shoulder_pan.pos",
"arm_right_shoulder_lift.pos",
"arm_right_elbow_flex.pos",
+ "arm_right_elbow_flex.pos", # Duplicate removed? No, check original
"arm_right_wrist_flex.pos",
#"right_wrist_yaw.pos",
"arm_right_wrist_roll.pos",
@@ -609,3 +610,218 @@ def disconnect(self):
logger.info(f"{self} disconnected.")
+
+class LeKiwiSim(Robot):
+ """
+ Simulation version of LeKiwi for testing without hardware.
+ Maintains a kinematic state and updates it based on velocity commands.
+ """
+ config_class = LeKiwiClientConfig
+ name = "lekiwi_sim"
+
+ def __init__(self, config: LeKiwiClientConfig):
+ super().__init__(config)
+ self.config = config
+ self._is_connected = False
+
+ # State
+ self.state = {
+ "x": 0.0,
+ "y": 0.0,
+ "theta": 0.0,
+ }
+
+ # Joint positions
+ self.joints = {}
+ # Populate with initial 0s for all expected joints from LeKiwi definition
+ # Manually listing keys to ensure consistency
+ joint_names = [
+ "arm_left_shoulder_pan",
+ "arm_left_shoulder_lift",
+ "arm_left_elbow_flex",
+ "arm_left_wrist_flex",
+ "arm_left_wrist_roll",
+ "arm_left_gripper",
+ "arm_right_shoulder_pan",
+ "arm_right_shoulder_lift",
+ "arm_right_elbow_flex",
+ "arm_right_wrist_flex",
+ "arm_right_wrist_roll",
+ "arm_right_gripper",
+ ]
+ self.joints.update({k: 0.0 for k in joint_names})
+ self.joints["lift_axis"] = 0.0
+
+ # Teleop config
+ self.teleop_keys = config.teleop_keys
+ self.speed_levels = [
+ {"xy": 0.15, "theta": 45},
+ {"xy": 0.2, "theta": 60},
+ {"xy": 0.25, "theta": 75},
+ ]
+ self.speed_index = 0
+
+ # Last update time
+ self.last_update = time.perf_counter()
+
+ @property
+ def _state_ft(self) -> dict[str, type]:
+ return dict.fromkeys(
+ (
+ "arm_left_shoulder_pan.pos",
+ "arm_left_shoulder_lift.pos",
+ "arm_left_elbow_flex.pos",
+ "arm_left_wrist_flex.pos",
+ "arm_left_wrist_roll.pos",
+ "arm_left_gripper.pos",
+ "arm_right_shoulder_pan.pos",
+ "arm_right_shoulder_lift.pos",
+ "arm_right_elbow_flex.pos",
+ "arm_right_wrist_flex.pos",
+ "arm_right_wrist_roll.pos",
+ "arm_right_gripper.pos",
+ "x.vel",
+ "y.vel",
+ "theta.vel",
+ "lift_axis.height_mm",
+ ),
+ float,
+ )
+
+ @property
+ def _cameras_ft(self) -> dict[str, tuple]:
+ return {
+ cam: (self.config.cameras[cam].height, self.config.cameras[cam].width, 3) for cam in self.config.cameras
+ }
+
+ @cached_property
+ def observation_features(self) -> dict[str, type | tuple]:
+ return {**self._state_ft, **self._cameras_ft}
+
+ @cached_property
+ def action_features(self) -> dict[str, type]:
+ return self._state_ft
+
+ @property
+ def is_connected(self) -> bool:
+ return self._is_connected
+
+ def connect(self):
+ logger.info(f"{self} simulated connection established.")
+ self._is_connected = True
+
+ def disconnect(self):
+ logger.info(f"{self} simulated connection closed.")
+ self._is_connected = False
+
+ def get_observation(self) -> dict[str, Any]:
+ if not self.is_connected:
+ raise DeviceNotConnectedError(f"{self} is not connected.")
+
+ # Return current state
+ obs = {}
+ # Joints
+ for k, v in self.joints.items():
+ if k == "lift_axis": continue
+ obs[f"{k}.pos"] = v
+
+ # Base vel (simulated as instantaneous match to command for now, or 0 if stopped)
+ obs["x.vel"] = 0.0
+ obs["y.vel"] = 0.0
+ obs["theta.vel"] = 0.0
+
+ # Lift
+ obs["lift_axis.height_mm"] = self.joints["lift_axis"]
+
+ # Cameras (blank)
+ for cam in self.config.cameras:
+ h, w = self.config.cameras[cam].height, self.config.cameras[cam].width
+ obs[cam] = np.zeros((h, w, 3), dtype=np.uint8)
+
+ return obs
+
+ def send_action(self, action: dict[str, Any]) -> dict[str, Any]:
+ if not self.is_connected:
+ raise DeviceNotConnectedError(f"{self} is not connected.")
+
+ now = time.perf_counter()
+ dt = now - self.last_update
+ self.last_update = now
+
+ # Update base pose (simple integration)
+ vx = action.get("x.vel", 0.0)
+ vy = action.get("y.vel", 0.0)
+ vth = action.get("theta.vel", 0.0)
+
+ # Rotate velocity to world frame
+ rad = np.radians(self.state["theta"])
+ c, s = np.cos(rad), np.sin(rad)
+ dx = (vx * c - vy * s) * dt
+ dy = (vx * s + vy * c) * dt
+ dth = vth * dt
+
+ self.state["x"] += dx
+ self.state["y"] += dy
+ self.state["theta"] += dth
+
+ # Update joints (instant move to target for simplicity, or LPF)
+ for k, v in action.items():
+ if k.endswith(".pos"):
+ joint = k.replace(".pos", "")
+ self.joints[joint] = v
+ elif k == "lift_axis.height_mm":
+ self.joints["lift_axis"] = v
+
+ return action
+
+ def _from_keyboard_to_base_action(self, pressed_keys: np.ndarray):
+ # Speed control
+ if self.teleop_keys["speed_up"] in pressed_keys:
+ self.speed_index = min(self.speed_index + 1, 2)
+ if self.teleop_keys["speed_down"] in pressed_keys:
+ self.speed_index = max(self.speed_index - 1, 0)
+ speed_setting = self.speed_levels[self.speed_index]
+ xy_speed = speed_setting["xy"]
+ theta_speed = speed_setting["theta"]
+
+ x_cmd = 0.0
+ y_cmd = 0.0
+ theta_cmd = 0.0
+
+ if self.teleop_keys["forward"] in pressed_keys:
+ x_cmd += xy_speed
+ if self.teleop_keys["backward"] in pressed_keys:
+ x_cmd -= xy_speed
+ if self.teleop_keys["left"] in pressed_keys:
+ y_cmd += xy_speed
+ if self.teleop_keys["right"] in pressed_keys:
+ y_cmd -= xy_speed
+ if self.teleop_keys["rotate_left"] in pressed_keys:
+ theta_cmd += theta_speed
+ if self.teleop_keys["rotate_right"] in pressed_keys:
+ theta_cmd -= theta_speed
+
+ return {
+ "x.vel": x_cmd,
+ "y.vel": y_cmd,
+ "theta.vel": theta_cmd,
+ }
+
+ def _from_keyboard_to_lift_action(self, pressed_keys: np.ndarray):
+ up_pressed = self.teleop_keys.get("lift_up", "u") in pressed_keys
+ dn_pressed = self.teleop_keys.get("lift_down", "j") in pressed_keys
+
+ h_now = float(self.joints["lift_axis"])
+
+ if not (up_pressed or dn_pressed):
+ return {"lift_axis.height_mm": h_now}
+
+ # Increment on each key press
+ if up_pressed and not dn_pressed:
+ h_target = h_now + LiftAxisConfig.step_mm
+ elif dn_pressed and not up_pressed:
+ h_target = h_now - LiftAxisConfig.step_mm
+ else:
+ h_target = h_now
+
+ return {"lift_axis.height_mm": h_target}