diff --git a/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp b/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp index 226b601..9e4b806 100644 --- a/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp +++ b/callback_isolated_executor/include/callback_isolated_executor/callback_isolated_executor.hpp @@ -1,5 +1,7 @@ #pragma once +#include + #include "cie_config_msgs/msg/callback_group_info.hpp" #include "rclcpp/rclcpp.hpp" @@ -9,7 +11,12 @@ class CallbackIsolatedExecutor : public rclcpp::Executor { std::mutex client_publisher_mutex_; rclcpp::Publisher::SharedPtr client_publisher_; - size_t reentrant_parallelism_{4}; + + // Parameters for the MultiThreadedExecutorInternal used for reentrant + // callback groups + size_t reentrant_parallelism_ = 4; + bool yield_before_execute_ = false; + std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1); // Nodes associated with this CallbackIsolatedExecutor, appended by add_node() // and removed by remove_node() @@ -44,7 +51,10 @@ class CallbackIsolatedExecutor : public rclcpp::Executor { public: RCLCPP_PUBLIC explicit CallbackIsolatedExecutor( - const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions()); + const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(), + size_t reentrant_parallelism = 4, bool yield_before_execute = false, + std::chrono::nanoseconds next_exec_timeout = + std::chrono::nanoseconds(-1)); RCLCPP_PUBLIC void spin() override; diff --git a/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp b/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp index dfefae6..1021537 100644 --- a/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp +++ b/callback_isolated_executor/include/callback_isolated_executor/multi_threaded_executor_internal.hpp @@ -29,13 +29,13 @@ class MultiThreadedExecutorInternal : public rclcpp::Executor { void run(); public: - explicit MultiThreadedExecutorInternal(size_t number_of_threads) + explicit MultiThreadedExecutorInternal( + size_t number_of_threads, bool yield_before_execute = false, + std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1)) : rclcpp::Executor(rclcpp::ExecutorOptions()), - number_of_threads_(number_of_threads) { - // hardcode for now - yield_before_execute_ = false; - next_exec_timeout_ = std::chrono::nanoseconds(-1); - } + number_of_threads_(number_of_threads), + yield_before_execute_(yield_before_execute), + next_exec_timeout_(next_exec_timeout) {} void pre_spin(); diff --git a/callback_isolated_executor/src/callback_isolated_executor.cpp b/callback_isolated_executor/src/callback_isolated_executor.cpp index 0cd31c5..6aa08f6 100644 --- a/callback_isolated_executor/src/callback_isolated_executor.cpp +++ b/callback_isolated_executor/src/callback_isolated_executor.cpp @@ -11,8 +11,11 @@ #include "callback_isolated_executor/multi_threaded_executor_internal.hpp" CallbackIsolatedExecutor::CallbackIsolatedExecutor( - const rclcpp::ExecutorOptions &options) - : rclcpp::Executor(options) { + const rclcpp::ExecutorOptions &options, size_t reentrant_parallelism, + bool yield_before_execute, std::chrono::nanoseconds next_exec_timeout) + : rclcpp::Executor(options), reentrant_parallelism_(reentrant_parallelism), + yield_before_execute_(yield_before_execute), + next_exec_timeout_(next_exec_timeout) { client_publisher_ = cie_thread_configurator::create_client_publisher(); } @@ -94,8 +97,8 @@ void CallbackIsolatedExecutor::spin_mutually_exclusive_callback_group( void CallbackIsolatedExecutor::spin_reentrant_callback_group( rclcpp::CallbackGroup::SharedPtr group, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) { - auto executor = - std::make_shared(reentrant_parallelism_); + auto executor = std::make_shared( + reentrant_parallelism_, yield_before_execute_, next_exec_timeout_); executor->add_callback_group(group, node); auto callback_group_id = cie_thread_configurator::create_callback_group_id(group, node); diff --git a/callback_isolated_executor/src/component_container_callback_isolated.cpp b/callback_isolated_executor/src/component_container_callback_isolated.cpp index d603550..fa81687 100644 --- a/callback_isolated_executor/src/component_container_callback_isolated.cpp +++ b/callback_isolated_executor/src/component_container_callback_isolated.cpp @@ -35,6 +35,20 @@ class ComponentManagerCallbackIsolated create_publisher( "/cie_thread_configurator/callback_group_info", rclcpp::QoS(1000).keep_all()); + + // Declare and get parameters for MultiThreadedExecutorInternal + auto reentrant_parallelism_param = + declare_parameter("reentrant_parallelism", 4); + if (reentrant_parallelism_param < 0) { + RCLCPP_WARN(get_logger(), + "reentrant_parallelism must be non-negative, using default " + "value 4"); + reentrant_parallelism_param = 4; + } + reentrant_parallelism_ = static_cast(reentrant_parallelism_param); + yield_before_execute_ = declare_parameter("yield_before_execute", false); + auto timeout_ns = declare_parameter("next_exec_timeout_ns", -1L); + next_exec_timeout_ = std::chrono::nanoseconds(timeout_ns); } ~ComponentManagerCallbackIsolated(); @@ -52,7 +66,12 @@ class ComponentManagerCallbackIsolated rclcpp::Publisher::SharedPtr client_publisher_; std::mutex client_publisher_mutex_; + + // Parameters for the MultiThreadedExecutorInternal used for reentrant + // callback groups size_t reentrant_parallelism_{4}; + bool yield_before_execute_{false}; + std::chrono::nanoseconds next_exec_timeout_{std::chrono::nanoseconds(-1)}; }; ComponentManagerCallbackIsolated::~ComponentManagerCallbackIsolated() { @@ -139,7 +158,7 @@ void ComponentManagerCallbackIsolated::add_node_to_executor(uint64_t node_id) { reentrant_parallelism_ >= 2) { // Reentrant callback group: use MultiThreadedExecutorInternal auto reentrant_executor = std::make_shared( - reentrant_parallelism_); + reentrant_parallelism_, yield_before_execute_, next_exec_timeout_); reentrant_executor->add_callback_group(callback_group, node); auto it = node_id_to_executor_wrappers_[node_id].begin(); diff --git a/cie_sample_application/src/reentrant_node_main.cpp b/cie_sample_application/src/reentrant_node_main.cpp index 13863b3..af18f85 100644 --- a/cie_sample_application/src/reentrant_node_main.cpp +++ b/cie_sample_application/src/reentrant_node_main.cpp @@ -1,3 +1,5 @@ +#include + #include "callback_isolated_executor/callback_isolated_executor.hpp" #include "cie_sample_application/reentrant_node.hpp" @@ -5,8 +7,8 @@ int main(int argc, char *argv[]) { rclcpp::init(argc, argv); auto node = std::make_shared(); - auto executor = std::make_shared(); - // executor->set_reentrant_parallelism(4); + auto executor = std::make_shared( + rclcpp::ExecutorOptions(), 4, false, std::chrono::nanoseconds(-1)); executor->add_node(node); executor->spin();