Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions obelisk/python/obelisk_py/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ def __init__(self, node_name: str) -> None:
"""Initialize the Obelisk node."""
super().__init__(node_name)
self.declare_parameter("callback_group_settings", "")
# ROS parameter designed to let the user feed a file path for their own code
self.declare_parameter("params_path", "")
self.declare_parameter("params_path", "") # lets the user feed a file path to custom params

# for auto-configuration
self._obk_pub_settings = []
Expand Down
47 changes: 39 additions & 8 deletions obelisk/python/obelisk_py/core/sim/mujoco.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from obelisk_py.core.obelisk_typing import ObeliskSensorMsg, is_in_bound
from obelisk_py.core.robot import ObeliskSimRobot
from obelisk_py.core.utils.msg import np_to_multiarray


class ObeliskMujocoRobot(ObeliskSimRobot):
Expand Down Expand Up @@ -43,6 +44,9 @@ def _get_msg_type_from_mj_sensor_type(self, sensor_type: str) -> Type[ObeliskSen
if sensor_type == "jointpos":
assert is_in_bound(osm.JointEncoders, ObeliskSensorMsg)
return osm.JointEncoders # type: ignore
elif sensor_type == "camera":
assert is_in_bound(osm.ObkImage, ObeliskSensorMsg)
return osm.ObkImage # type: ignore
else:
raise NotImplementedError(f"Sensor type {sensor_type} not supported! Check your spelling or open a PR.")

Expand All @@ -66,15 +70,40 @@ def _create_timer_callback_from_msg_type(
Returns:
The timer callback function.
"""
if msg_type in [osm.JointEncoders]:

def timer_callback() -> None:
"""Timer callback for the sensor."""
msg = msg_type()
y = []
for sensor_name in sensor_names:
y.append(self.mj_data.sensor(sensor_name).data)
msg.y = list(np.concatenate(y)) # assume all ObeliskSensorMsg objs have a y field
pub_sensor.publish(msg)

elif msg_type == osm.ObkImage:

def timer_callback() -> None:
"""Timer callback for the sensor."""
images = []
for cam_name in sensor_names:
try:
cam_id = self.mj_model.cam(cam_name).id
width, height = self.mj_model.cam_resolution[cam_id]
with mujoco.Renderer(self.mj_model, height, width) as renderer:
renderer.update_scene(self.mj_data, camera=cam_name)
image = renderer.render()
images.append(image)
except KeyError as e:
self.get_logger().error(f"Could not find camera {cam_name} in the model!")
raise e

msg = msg_type()
msg.y = np_to_multiarray(np.stack(images)) # (num_images, height, width, channels), dtype: uint8
pub_sensor.publish(msg)

def timer_callback() -> None:
"""Timer callback for the sensor."""
msg = msg_type()
y = []
for sensor_name in sensor_names:
y.append(self.mj_data.sensor(sensor_name).data)
msg.y = list(np.concatenate(y)) # assume all ObeliskSensorMsg objs have a y field
pub_sensor.publish(msg)
else:
raise NotImplementedError(f"Message type {msg_type} not supported! Check your spelling or open a PR.")

return timer_callback

Expand Down Expand Up @@ -106,6 +135,7 @@ def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn: # no
model_xml_path = self.model_xml_path
self.mj_model = MjModel.from_xml_path(model_xml_path)
self.mj_data = MjData(self.mj_model)
mujoco.mj_forward(self.mj_model, self.mj_data) # type: ignore

# set the configuration parameters for non-sensor fields
self.n_u = int(config_dict["n_u"])
Expand Down Expand Up @@ -161,6 +191,7 @@ def on_configure(self, state: LifecycleState) -> TransitionCallbackReturn: # no
callback_group=cbg,
)
timer_sensor.cancel()
self.get_logger().info(f"Created sensor publisher and timer for topic {topic} with dt {dt}.")
self.obk_publishers[f"sensor_group_{i}"] = pub_sensor
self.obk_timers[f"sensor_group_{i}"] = timer_sensor

Expand Down