Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <chrono>

#include "cie_config_msgs/msg/callback_group_info.hpp"
#include "rclcpp/rclcpp.hpp"

Expand All @@ -9,7 +11,12 @@ class CallbackIsolatedExecutor : public rclcpp::Executor {
std::mutex client_publisher_mutex_;
rclcpp::Publisher<cie_config_msgs::msg::CallbackGroupInfo>::SharedPtr
client_publisher_;
size_t reentrant_parallelism_{4};

// Parameters for the MultiThreadedExecutorInternal used for reentrant
// callback groups
size_t reentrant_parallelism_ = 4;
bool yield_before_execute_ = false;
std::chrono::nanoseconds next_exec_timeout_ = std::chrono::nanoseconds(-1);

// Nodes associated with this CallbackIsolatedExecutor, appended by add_node()
// and removed by remove_node()
Expand Down Expand Up @@ -44,7 +51,10 @@ class CallbackIsolatedExecutor : public rclcpp::Executor {
public:
RCLCPP_PUBLIC
explicit CallbackIsolatedExecutor(
const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions());
const rclcpp::ExecutorOptions &options = rclcpp::ExecutorOptions(),
size_t reentrant_parallelism = 4, bool yield_before_execute = false,
std::chrono::nanoseconds next_exec_timeout =
std::chrono::nanoseconds(-1));

RCLCPP_PUBLIC
void spin() override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ class MultiThreadedExecutorInternal : public rclcpp::Executor {
void run();

public:
explicit MultiThreadedExecutorInternal(size_t number_of_threads)
explicit MultiThreadedExecutorInternal(
size_t number_of_threads, bool yield_before_execute = false,
std::chrono::nanoseconds next_exec_timeout = std::chrono::nanoseconds(-1))
: rclcpp::Executor(rclcpp::ExecutorOptions()),
number_of_threads_(number_of_threads) {
// hardcode for now
yield_before_execute_ = false;
next_exec_timeout_ = std::chrono::nanoseconds(-1);
}
number_of_threads_(number_of_threads),
yield_before_execute_(yield_before_execute),
next_exec_timeout_(next_exec_timeout) {}

void pre_spin();

Expand Down
11 changes: 7 additions & 4 deletions callback_isolated_executor/src/callback_isolated_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
#include "callback_isolated_executor/multi_threaded_executor_internal.hpp"

CallbackIsolatedExecutor::CallbackIsolatedExecutor(
const rclcpp::ExecutorOptions &options)
: rclcpp::Executor(options) {
const rclcpp::ExecutorOptions &options, size_t reentrant_parallelism,
bool yield_before_execute, std::chrono::nanoseconds next_exec_timeout)
: rclcpp::Executor(options), reentrant_parallelism_(reentrant_parallelism),
yield_before_execute_(yield_before_execute),
next_exec_timeout_(next_exec_timeout) {
client_publisher_ = cie_thread_configurator::create_client_publisher();
}

Expand Down Expand Up @@ -94,8 +97,8 @@ void CallbackIsolatedExecutor::spin_mutually_exclusive_callback_group(
void CallbackIsolatedExecutor::spin_reentrant_callback_group(
rclcpp::CallbackGroup::SharedPtr group,
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node) {
auto executor =
std::make_shared<MultiThreadedExecutorInternal>(reentrant_parallelism_);
auto executor = std::make_shared<MultiThreadedExecutorInternal>(
reentrant_parallelism_, yield_before_execute_, next_exec_timeout_);
executor->add_callback_group(group, node);
auto callback_group_id =
cie_thread_configurator::create_callback_group_id(group, node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ class ComponentManagerCallbackIsolated
create_publisher<cie_config_msgs::msg::CallbackGroupInfo>(
"/cie_thread_configurator/callback_group_info",
rclcpp::QoS(1000).keep_all());

// Declare and get parameters for MultiThreadedExecutorInternal
auto reentrant_parallelism_param =
declare_parameter("reentrant_parallelism", 4);
if (reentrant_parallelism_param < 0) {
RCLCPP_WARN(get_logger(),
"reentrant_parallelism must be non-negative, using default "
"value 4");
reentrant_parallelism_param = 4;
}
reentrant_parallelism_ = static_cast<size_t>(reentrant_parallelism_param);
yield_before_execute_ = declare_parameter("yield_before_execute", false);
auto timeout_ns = declare_parameter("next_exec_timeout_ns", -1L);
next_exec_timeout_ = std::chrono::nanoseconds(timeout_ns);
}

~ComponentManagerCallbackIsolated();
Expand All @@ -52,7 +66,12 @@ class ComponentManagerCallbackIsolated
rclcpp::Publisher<cie_config_msgs::msg::CallbackGroupInfo>::SharedPtr
client_publisher_;
std::mutex client_publisher_mutex_;

// Parameters for the MultiThreadedExecutorInternal used for reentrant
// callback groups
size_t reentrant_parallelism_{4};
bool yield_before_execute_{false};
std::chrono::nanoseconds next_exec_timeout_{std::chrono::nanoseconds(-1)};
};

ComponentManagerCallbackIsolated::~ComponentManagerCallbackIsolated() {
Expand Down Expand Up @@ -139,7 +158,7 @@ void ComponentManagerCallbackIsolated::add_node_to_executor(uint64_t node_id) {
reentrant_parallelism_ >= 2) {
// Reentrant callback group: use MultiThreadedExecutorInternal
auto reentrant_executor = std::make_shared<MultiThreadedExecutorInternal>(
reentrant_parallelism_);
reentrant_parallelism_, yield_before_execute_, next_exec_timeout_);
reentrant_executor->add_callback_group(callback_group, node);

auto it = node_id_to_executor_wrappers_[node_id].begin();
Expand Down
6 changes: 4 additions & 2 deletions cie_sample_application/src/reentrant_node_main.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include <chrono>

#include "callback_isolated_executor/callback_isolated_executor.hpp"
#include "cie_sample_application/reentrant_node.hpp"

int main(int argc, char *argv[]) {
rclcpp::init(argc, argv);

auto node = std::make_shared<ReentrantNode>();
auto executor = std::make_shared<CallbackIsolatedExecutor>();
// executor->set_reentrant_parallelism(4);
auto executor = std::make_shared<CallbackIsolatedExecutor>(
rclcpp::ExecutorOptions(), 4, false, std::chrono::nanoseconds(-1));

executor->add_node(node);
executor->spin();
Expand Down