Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "safe_drive"
version = "0.4.4"
version = "0.4.5"
edition = "2021"
authors = ["Yuuki Takano <yuuki.takano@tier4.jp>, TIER IV, Inc.", "Seio Inoue"]
description = "safe_drive: Formally Specified Rust Bindings for ROS2"
Expand Down
16 changes: 16 additions & 0 deletions src/rcl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,22 @@ impl MTSafeFn {
})
}

pub fn rcl_publish_serialized_message(
publisher: *const rcl_publisher_t,
data: &[u8],
allocation: *mut rmw_publisher_allocation_t,
) -> RCLResult<()> {
let ros_message = rcl_serialized_message_t {
buffer: data.as_ptr() as *mut u8,
buffer_length: data.len(),
buffer_capacity: data.len(),
allocator: unsafe { self::rcutils_get_default_allocator() }
};
ret_val_to_err(unsafe {
self::rcl_publish_serialized_message(publisher, (&ros_message) as *const _, allocation)
})
}

pub fn rmw_get_default_publisher_options() -> rmw_publisher_options_t {
unsafe { self::rmw_get_default_publisher_options() }
}
Expand Down
4 changes: 3 additions & 1 deletion src/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,9 @@ impl Selector {

// set services
for (_, h) in self.services.iter() {
let service = h.event.lock();
let Some(service) = h.event.try_lock() else {
continue;
};
guard.rcl_wait_set_add_service(&mut self.wait_set, &service.service, null_mut())?;
}

Expand Down
42 changes: 22 additions & 20 deletions src/selector/async_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,34 +167,36 @@ fn select(
}
}

if let Err(_e) = selector.wait() {
if signal_handler::is_halt() {
for (_, h) in selector.subscriptions.iter_mut() {
if let Some(handler) = &mut h.handler {
(*handler)();
}
if selector
.wait()
.is_err()
&& signal_handler::is_halt()
{
for (_, h) in selector.subscriptions.iter_mut() {
if let Some(handler) = &mut h.handler {
(*handler)();
}
}

for (_, h) in selector.services.iter_mut() {
if let Some(handler) = &mut h.handler {
(*handler)();
}
for (_, h) in selector.services.iter_mut() {
if let Some(handler) = &mut h.handler {
(*handler)();
}
}

for (_, h) in selector.clients.iter_mut() {
if let Some(handler) = &mut h.handler {
(*handler)();
}
for (_, h) in selector.clients.iter_mut() {
if let Some(handler) = &mut h.handler {
(*handler)();
}
}

for (_, h) in selector.cond.iter_mut() {
if let Some(handler) = &mut h.handler {
(*handler)();
}
for (_, h) in selector.cond.iter_mut() {
if let Some(handler) = &mut h.handler {
(*handler)();
}

return Ok(());
}

return Ok(());
}
}
}
29 changes: 29 additions & 0 deletions src/topic/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,35 @@ impl<T: TypeSupport> Publisher<T> {
Ok(())
}

/// Send a raw message.
///
/// # Safety
///
/// This function is marked unsafe as the user is reponsable for CDR serialization
///
pub unsafe fn send_raw(&self, msg: &[u8]) -> Result<(), DynError> {
if crate::is_halt() {
return Err(Signaled.into());
}
#[cfg(feature = "rcl_stat")]
let start = std::time::SystemTime::now();

if let Err(e) =
rcl::MTSafeFn::rcl_publish_serialized_message(self.publisher.as_ref(), msg, null_mut())
{
return Err(e.into());
}

#[cfg(feature = "rcl_stat")]
{
if let Ok(dur) = start.elapsed() {
let mut guard = self.latency_publish.lock();
guard.add(dur);
}
}

Ok(())
}
/// Get latency statistics information of `rcl_publish()`.
#[cfg(feature = "rcl_stat")]
pub fn statistics(&self) -> SerializableTimeStat {
Expand Down