From 8e9502b8e551d0bec15b9cd61d14577e44a32de7 Mon Sep 17 00:00:00 2001 From: Nadhmi JAZI Date: Thu, 23 Oct 2025 20:05:50 +0200 Subject: [PATCH 1/2] feat: add a send_raw for publisher Signed-off-by: Nadhmi JAZI --- Cargo.toml | 2 +- src/rcl.rs | 16 ++++++++++++++++ src/topic/publisher.rs | 29 +++++++++++++++++++++++++++++ 3 files changed, 46 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8952eaba..40ad47cd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "safe_drive" -version = "0.4.4" +version = "0.4.5" edition = "2021" authors = ["Yuuki Takano , TIER IV, Inc.", "Seio Inoue"] description = "safe_drive: Formally Specified Rust Bindings for ROS2" diff --git a/src/rcl.rs b/src/rcl.rs index fdaee453..8fe3b3c0 100644 --- a/src/rcl.rs +++ b/src/rcl.rs @@ -982,6 +982,22 @@ impl MTSafeFn { }) } + pub fn rcl_publish_serialized_message( + publisher: *const rcl_publisher_t, + data: &[u8], + allocation: *mut rmw_publisher_allocation_t, + ) -> RCLResult<()> { + let ros_message = rcl_serialized_message_t { + buffer: data.as_ptr() as *mut u8, + buffer_length: data.len(), + buffer_capacity: data.len(), + allocator: unsafe { self::rcutils_get_default_allocator() } + }; + ret_val_to_err(unsafe { + self::rcl_publish_serialized_message(publisher, (&ros_message) as *const _, allocation) + }) + } + pub fn rmw_get_default_publisher_options() -> rmw_publisher_options_t { unsafe { self::rmw_get_default_publisher_options() } } diff --git a/src/topic/publisher.rs b/src/topic/publisher.rs index 24c899a8..9b6fcb03 100644 --- a/src/topic/publisher.rs +++ b/src/topic/publisher.rs @@ -254,6 +254,35 @@ impl Publisher { Ok(()) } + /// Send a raw message. + /// + /// # Safety + /// + /// This function is marked unsafe as the user is reponsable for CDR serialization + /// + pub unsafe fn send_raw(&self, msg: &[u8]) -> Result<(), DynError> { + if crate::is_halt() { + return Err(Signaled.into()); + } + #[cfg(feature = "rcl_stat")] + let start = std::time::SystemTime::now(); + + if let Err(e) = + rcl::MTSafeFn::rcl_publish_serialized_message(self.publisher.as_ref(), msg, null_mut()) + { + return Err(e.into()); + } + + #[cfg(feature = "rcl_stat")] + { + if let Ok(dur) = start.elapsed() { + let mut guard = self.latency_publish.lock(); + guard.add(dur); + } + } + + Ok(()) + } /// Get latency statistics information of `rcl_publish()`. #[cfg(feature = "rcl_stat")] pub fn statistics(&self) -> SerializableTimeStat { From 9da560f687b128ef4959f7c9341f19889bfa3f5d Mon Sep 17 00:00:00 2001 From: Nadhmi JAZI Date: Sun, 26 Oct 2025 15:25:50 +0100 Subject: [PATCH 2/2] fix: dead lock potential fix --- src/selector.rs | 4 +++- src/selector/async_selector.rs | 42 ++++++++++++++++++---------------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/selector.rs b/src/selector.rs index 8a823847..c8cf95ca 100644 --- a/src/selector.rs +++ b/src/selector.rs @@ -1118,7 +1118,9 @@ impl Selector { // set services for (_, h) in self.services.iter() { - let service = h.event.lock(); + let Some(service) = h.event.try_lock() else { + continue; + }; guard.rcl_wait_set_add_service(&mut self.wait_set, &service.service, null_mut())?; } diff --git a/src/selector/async_selector.rs b/src/selector/async_selector.rs index a6078042..ad975576 100644 --- a/src/selector/async_selector.rs +++ b/src/selector/async_selector.rs @@ -167,34 +167,36 @@ fn select( } } - if let Err(_e) = selector.wait() { - if signal_handler::is_halt() { - for (_, h) in selector.subscriptions.iter_mut() { - if let Some(handler) = &mut h.handler { - (*handler)(); - } + if selector + .wait() + .is_err() + && signal_handler::is_halt() + { + for (_, h) in selector.subscriptions.iter_mut() { + if let Some(handler) = &mut h.handler { + (*handler)(); } + } - for (_, h) in selector.services.iter_mut() { - if let Some(handler) = &mut h.handler { - (*handler)(); - } + for (_, h) in selector.services.iter_mut() { + if let Some(handler) = &mut h.handler { + (*handler)(); } + } - for (_, h) in selector.clients.iter_mut() { - if let Some(handler) = &mut h.handler { - (*handler)(); - } + for (_, h) in selector.clients.iter_mut() { + if let Some(handler) = &mut h.handler { + (*handler)(); } + } - for (_, h) in selector.cond.iter_mut() { - if let Some(handler) = &mut h.handler { - (*handler)(); - } + for (_, h) in selector.cond.iter_mut() { + if let Some(handler) = &mut h.handler { + (*handler)(); } - - return Ok(()); } + + return Ok(()); } } }