From 1af8d6f5df4f1dc9b02055771f542682a363ec2c Mon Sep 17 00:00:00 2001 From: atsushi421 Date: Sun, 25 Jan 2026 17:47:29 +0900 Subject: [PATCH] feat: add reentrant callback group support to component_container_callback_isolated Add support for reentrant callback groups in ComponentManagerCallbackIsolated, matching the functionality already present in CallbackIsolatedExecutor (PR #21). - Use MultiThreadedExecutorInternal for reentrant callback groups with configurable parallelism (default 4 threads) - Use SingleThreadedExecutor for mutually exclusive callback groups - Unify ExecutorWrapper to use std::shared_ptr instead of separate executor types - Update CMakeLists.txt to include multi_threaded_executor_internal.cpp and required include directories --- callback_isolated_executor/CMakeLists.txt | 8 +- .../component_container_callback_isolated.cpp | 80 +++++++++++++------ 2 files changed, 63 insertions(+), 25 deletions(-) diff --git a/callback_isolated_executor/CMakeLists.txt b/callback_isolated_executor/CMakeLists.txt index bf31569..3baf8d5 100644 --- a/callback_isolated_executor/CMakeLists.txt +++ b/callback_isolated_executor/CMakeLists.txt @@ -24,7 +24,13 @@ if(BUILD_TESTING) ament_lint_auto_find_test_dependencies() endif() -add_executable(component_container_callback_isolated src/component_container_callback_isolated.cpp) +add_executable(component_container_callback_isolated + src/component_container_callback_isolated.cpp + src/multi_threaded_executor_internal.cpp +) +target_include_directories(component_container_callback_isolated PRIVATE + $ +) ament_target_dependencies(component_container_callback_isolated rclcpp rclcpp_components cie_thread_configurator) add_executable(component_container_single src/component_container_single.cpp) diff --git a/callback_isolated_executor/src/component_container_callback_isolated.cpp b/callback_isolated_executor/src/component_container_callback_isolated.cpp index 13a9c9d..d603550 100644 --- a/callback_isolated_executor/src/component_container_callback_isolated.cpp +++ b/callback_isolated_executor/src/component_container_callback_isolated.cpp @@ -7,6 +7,7 @@ #include "rclcpp/rclcpp.hpp" #include "rclcpp_components/component_manager.hpp" +#include "callback_isolated_executor/multi_threaded_executor_internal.hpp" #include "cie_thread_configurator/cie_thread_configurator.hpp" namespace rclcpp_components { @@ -15,14 +16,13 @@ class ComponentManagerCallbackIsolated : public rclcpp_components::ComponentManager { struct ExecutorWrapper { - explicit ExecutorWrapper( - std::shared_ptr executor) + explicit ExecutorWrapper(std::shared_ptr executor) : executor(executor), thread_initialized(false) {} ExecutorWrapper(const ExecutorWrapper &) = delete; ExecutorWrapper &operator=(const ExecutorWrapper &) = delete; - std::shared_ptr executor; + std::shared_ptr executor; std::thread thread; std::atomic_bool thread_initialized; }; @@ -52,6 +52,7 @@ class ComponentManagerCallbackIsolated rclcpp::Publisher::SharedPtr client_publisher_; std::mutex client_publisher_mutex_; + size_t reentrant_parallelism_{4}; }; ComponentManagerCallbackIsolated::~ComponentManagerCallbackIsolated() { @@ -134,27 +135,58 @@ void ComponentManagerCallbackIsolated::add_node_to_executor(uint64_t node_id) { return; } - auto executor = - std::make_shared(); - executor->add_callback_group(callback_group, node); - - auto it = node_id_to_executor_wrappers_[node_id].begin(); - it = node_id_to_executor_wrappers_[node_id].emplace(it, executor); - auto &executor_wrapper = *it; - - executor_wrapper.thread = - std::thread([&executor_wrapper, group_id, this]() { - auto tid = syscall(SYS_gettid); - - { - std::lock_guard lock(this->client_publisher_mutex_); - cie_thread_configurator::publish_callback_group_info( - this->client_publisher_, tid, group_id); - } - - executor_wrapper.thread_initialized = true; - executor_wrapper.executor->spin(); - }); + if (callback_group->type() == rclcpp::CallbackGroupType::Reentrant && + reentrant_parallelism_ >= 2) { + // Reentrant callback group: use MultiThreadedExecutorInternal + auto reentrant_executor = std::make_shared( + reentrant_parallelism_); + reentrant_executor->add_callback_group(callback_group, node); + + auto it = node_id_to_executor_wrappers_[node_id].begin(); + it = node_id_to_executor_wrappers_[node_id].emplace(it, + reentrant_executor); + auto &executor_wrapper = *it; + + executor_wrapper.thread = std::thread( + [&executor_wrapper, reentrant_executor, group_id, this]() { + reentrant_executor->pre_spin(); + auto tids = reentrant_executor->get_thread_ids(); + + { + std::lock_guard lock(this->client_publisher_mutex_); + for (auto tid : tids) { + cie_thread_configurator::publish_callback_group_info( + this->client_publisher_, tid, group_id); + } + } + + executor_wrapper.thread_initialized = true; + executor_wrapper.executor->spin(); + }); + } else { + // Mutually exclusive callback group: use SingleThreadedExecutor + auto executor = + std::make_shared(); + executor->add_callback_group(callback_group, node); + + auto it = node_id_to_executor_wrappers_[node_id].begin(); + it = node_id_to_executor_wrappers_[node_id].emplace(it, executor); + auto &executor_wrapper = *it; + + executor_wrapper.thread = + std::thread([&executor_wrapper, group_id, this]() { + auto tid = syscall(SYS_gettid); + + { + std::lock_guard lock(this->client_publisher_mutex_); + cie_thread_configurator::publish_callback_group_info( + this->client_publisher_, tid, group_id); + } + + executor_wrapper.thread_initialized = true; + executor_wrapper.executor->spin(); + }); + } }); }