Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rclrs/src/dynamic_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,10 @@ impl DynamicMessageMetadata {
pub fn structure(&self) -> &MessageStructure {
&self.structure
}

pub(crate) fn type_support_ptr(&self) -> *const rosidl_message_type_support_t {
self.type_support_ptr
}
}

// ========================= impl for DynamicMessage =========================
Expand Down
8 changes: 7 additions & 1 deletion rclrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ mod parameter;
mod publisher;
mod qos;
mod service;
mod serialized_message;
mod serialized_publisher;
mod serialized_subscription;
mod subscription;
mod time;
mod time_source;
Expand All @@ -204,7 +207,7 @@ mod worker;
#[cfg(test)]
mod test_helpers;

Copy link

Copilot AI Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lib.rs has #![warn(missing_docs)], and CI runs cargo rustdoc -- -D warnings / cargo clippy ... -D warnings. Making rcl_bindings public without a doc comment will emit a missing_docs warning for this module item and fail CI. Add a /// doc comment for pub mod rcl_bindings; (or annotate this item with #[allow(missing_docs)] / #[doc(hidden)] if you don't want it documented).

Suggested change
/// Low-level bindings to the underlying ROS 2 `rcl` C API.

Copilot uses AI. Check for mistakes.
mod rcl_bindings;
pub mod rcl_bindings;

pub use action::*;
pub use arguments::*;
Expand All @@ -220,6 +223,9 @@ pub use node::*;
pub use parameter::*;
pub use publisher::*;
pub use qos::*;
pub use serialized_message::*;
pub use serialized_publisher::*;
pub use serialized_subscription::*;
pub use rcl_bindings::rmw_request_id_t;
pub use service::*;
pub use subscription::*;
Expand Down
80 changes: 79 additions & 1 deletion rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ use crate::{
IntoNodeTimerOneshotCallback, IntoNodeTimerRepeatingCallback, IntoTimerOptions, LogParams,
Logger, MessageInfo, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters,
Promise, Publisher, PublisherOptions, PublisherState, RclrsError, RequestedGoal, Service,
ServiceOptions, ServiceState, Subscription, SubscriptionOptions, SubscriptionState,
ServiceOptions, ServiceState, SerializedPublisher, SerializedSubscription, Subscription,
SubscriptionOptions, SubscriptionState,
TerminatedGoal, TimeSource, Timer, TimerState, ToLogParams, Worker, WorkerOptions, WorkerState,
ENTITY_LIFECYCLE_MUTEX,
};
use crate::ToResult;

/// A processing unit that can communicate with other nodes. See the API of
/// [`NodeState`] to find out what methods you can call on a [`Node`].
Expand Down Expand Up @@ -1493,6 +1495,82 @@ impl NodeState {
pub(crate) fn handle(&self) -> &Arc<NodeHandle> {
&self.handle
}

/// Creates a serialized subscription.
///
/// This receives raw serialized (CDR) bytes, using `rcl_take_serialized_message`.
pub fn create_serialized_subscription<'a>(
&self,
topic_type: MessageTypeName,
options: impl Into<SubscriptionOptions<'a>>,
) -> Result<SerializedSubscription, RclrsError> {
Comment on lines +1502 to +1506
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is a bit odd for a subscription, instead of specifying a callback we return a subscription and then leave it to the user to call SerializedSubscription::take whenever they can to see whether a message is available or not.
If I understand correctly, this would be the equivalent of rclcpp create_generic_subscription? In that case should we try to get a consistent API with other subscriptions that are callback based except this callback will have a serialized message as a parameter?
Following up on @mxgrey's note, I also wonder if we can merge the implementation of generic + dynamic pub/sub. It seems to me that both need to look for the message types at runtime and the only difference is that the serializes pub/sub have a callback with raw bytes, while the dynamic pub/sub has a callback with (DynamicMessage, MessageInfo).
If my understanding is correct, how about reusing the dynamic subscription API and just make the callback generic so we can get raw bytes if needed?

let SubscriptionOptions { topic, qos } = options.into();

// Use the same typesupport resolution as dynamic messages.
let metadata = crate::dynamic_message::DynamicMessageMetadata::new(topic_type)?;

let mut sub = unsafe { rcl_get_zero_initialized_subscription() };
let topic_c = std::ffi::CString::new(topic).unwrap();

let _context_lock = self.handle.context_handle.rcl_context.lock().unwrap();
let node = self.handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();

unsafe {
let mut opts = rcl_subscription_get_default_options();
opts.qos = qos.into();
rcl_subscription_init(
&mut sub,
&*node,
metadata.type_support_ptr(),
topic_c.as_ptr(),
&opts,
)
.ok()?;
}

Ok(SerializedSubscription {
handle: Arc::clone(&self.handle),
sub,
})
}

/// Creates a serialized publisher.
///
/// This publishes raw serialized (CDR) bytes, using `rcl_publish_serialized_message`.
pub fn create_serialized_publisher<'a>(
&self,
topic_type: MessageTypeName,
options: impl Into<crate::PublisherOptions<'a>>,
) -> Result<SerializedPublisher, RclrsError> {
Comment on lines +1541 to +1545
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar comment to the dynamic subscription on whether we can streamline this with the dynamic message implementation.
From my understanding, the only difference here is that the publish API takes in a &SerializedMessage rather than a DynamicMessage. What if we shared the implementation by doing something like:

  • Remove this function.
  • Have the DynamicPublisher::publish function take in a generic argument for the message that implements a trait with a function to "call rcl and publish this item".
  • We implement the trait for both DynamicMessage for dynamic message, with the current implementation.
  • We also implement the trait for SerializedMessage, where it just calls rcl_publish_serialized_message as you did.
  • Users can now publish either type of message because the publish signature is some sort of msg: impl GenericPublishable or whatever bikeshedding.

Does this make sense?

let crate::PublisherOptions { topic, qos } = options.into();

let metadata = crate::dynamic_message::DynamicMessageMetadata::new(topic_type)?;
let mut pub_ = unsafe { rcl_get_zero_initialized_publisher() };
let topic_c = std::ffi::CString::new(topic).unwrap();

let _context_lock = self.handle.context_handle.rcl_context.lock().unwrap();
let node = self.handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();

unsafe {
let mut opts = rcl_publisher_get_default_options();
opts.qos = qos.into();
rcl_publisher_init(
&mut pub_,
&*node,
metadata.type_support_ptr(),
topic_c.as_ptr(),
&opts,
)
.ok()?;
}

Ok(SerializedPublisher {
handle: Arc::clone(&self.handle),
pub_,
})
}
}

impl<'a> ToLogParams<'a> for &'a NodeState {
Expand Down
40 changes: 40 additions & 0 deletions rclrs/src/serialized_message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use crate::{rcl_bindings::*, RclrsError, ToResult};

/// A growable serialized message buffer.
///
/// This wraps `rcl_serialized_message_t` (aka `rmw_serialized_message_t`).
pub struct SerializedMessage {
pub(crate) msg: rcl_serialized_message_t,
}

unsafe impl Send for SerializedMessage {}

impl SerializedMessage {
/// Create a new serialized message buffer with the given capacity in bytes.
pub fn new(capacity: usize) -> Result<Self, RclrsError> {
unsafe {
let mut msg = rcutils_get_zero_initialized_uint8_array();
let allocator = rcutils_get_default_allocator();
rcutils_uint8_array_init(&mut msg, capacity, &allocator).ok()?;
Ok(Self { msg })
}
}

/// Return the current serialized payload.
pub fn as_bytes(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.msg.buffer, self.msg.buffer_length) }
}

/// Reset the length to 0 without changing capacity.
pub fn clear(&mut self) {
self.msg.buffer_length = 0;
}
}

impl Drop for SerializedMessage {
fn drop(&mut self) {
unsafe {
let _ = rcutils_uint8_array_fini(&mut self.msg);
}
}
}
34 changes: 34 additions & 0 deletions rclrs/src/serialized_publisher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use crate::{node::NodeHandle, rcl_bindings::*, RclrsError, ToResult, ENTITY_LIFECYCLE_MUTEX};
use std::{ptr, sync::Arc};

use crate::serialized_message::SerializedMessage;

/// A publisher which publishes serialized ROS messages.
pub struct SerializedPublisher {
pub(crate) handle: Arc<NodeHandle>,
pub(crate) pub_: rcl_publisher_t,
}

unsafe impl Send for SerializedPublisher {}
unsafe impl Sync for SerializedPublisher {}

impl Drop for SerializedPublisher {
fn drop(&mut self) {
let _context_lock = self.handle.context_handle.rcl_context.lock().unwrap();
let mut node = self.handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
unsafe {
let _ = rcl_publisher_fini(&mut self.pub_, &mut *node);
}
}
}

impl SerializedPublisher {
/// Publish a serialized (CDR) message.
pub fn publish(&self, msg: &SerializedMessage) -> Result<(), RclrsError> {
unsafe {
rcl_publish_serialized_message(&self.pub_, &msg.msg, ptr::null_mut()).ok()?;
}
Ok(())
}
}
42 changes: 42 additions & 0 deletions rclrs/src/serialized_subscription.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use crate::{node::NodeHandle, rcl_bindings::*, MessageInfo, RclrsError, ENTITY_LIFECYCLE_MUTEX};
use std::{ptr, sync::Arc};

use crate::serialized_message::SerializedMessage;

/// A subscription which receives serialized ROS messages.
pub struct SerializedSubscription {
pub(crate) handle: Arc<NodeHandle>,
pub(crate) sub: rcl_subscription_t,
}

unsafe impl Send for SerializedSubscription {}
unsafe impl Sync for SerializedSubscription {}

impl Drop for SerializedSubscription {
fn drop(&mut self) {
let _context_lock = self.handle.context_handle.rcl_context.lock().unwrap();
let mut node = self.handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
unsafe {
let _ = rcl_subscription_fini(&mut self.sub, &mut *node);
}
}
}

impl SerializedSubscription {
/// Take a serialized (CDR) message.
///
/// Returns `Ok(None)` when no message is available.
pub fn take(&self, buf: &mut SerializedMessage) -> Result<Option<MessageInfo>, RclrsError> {
unsafe {
let mut info: rmw_message_info_t = std::mem::zeroed();
let rc =
rcl_take_serialized_message(&self.sub, &mut buf.msg, &mut info, ptr::null_mut());
if rc != 0 {
// No message available or error. The rmw/rcl API uses negative codes for "take failed".
return Ok(None);
}
Ok(Some(MessageInfo::from_rmw_message_info(&info)))
}
}
}
Loading