Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion callback_isolated_executor/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ if(BUILD_TESTING)
ament_lint_auto_find_test_dependencies()
endif()

add_executable(component_container_callback_isolated src/component_container_callback_isolated.cpp)
add_executable(component_container_callback_isolated
src/component_container_callback_isolated.cpp
src/multi_threaded_executor_internal.cpp
)
target_include_directories(component_container_callback_isolated PRIVATE
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/include>
)
ament_target_dependencies(component_container_callback_isolated rclcpp rclcpp_components cie_thread_configurator)

add_executable(component_container_single src/component_container_single.cpp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "rclcpp/rclcpp.hpp"
#include "rclcpp_components/component_manager.hpp"

#include "callback_isolated_executor/multi_threaded_executor_internal.hpp"
#include "cie_thread_configurator/cie_thread_configurator.hpp"

namespace rclcpp_components {
Expand All @@ -15,14 +16,13 @@ class ComponentManagerCallbackIsolated
: public rclcpp_components::ComponentManager {

struct ExecutorWrapper {
explicit ExecutorWrapper(
std::shared_ptr<rclcpp::executors::SingleThreadedExecutor> executor)
explicit ExecutorWrapper(std::shared_ptr<rclcpp::Executor> executor)
: executor(executor), thread_initialized(false) {}

ExecutorWrapper(const ExecutorWrapper &) = delete;
ExecutorWrapper &operator=(const ExecutorWrapper &) = delete;

std::shared_ptr<rclcpp::executors::SingleThreadedExecutor> executor;
std::shared_ptr<rclcpp::Executor> executor;
std::thread thread;
std::atomic_bool thread_initialized;
};
Expand Down Expand Up @@ -52,6 +52,7 @@ class ComponentManagerCallbackIsolated
rclcpp::Publisher<cie_config_msgs::msg::CallbackGroupInfo>::SharedPtr
client_publisher_;
std::mutex client_publisher_mutex_;
size_t reentrant_parallelism_{4};
};

ComponentManagerCallbackIsolated::~ComponentManagerCallbackIsolated() {
Expand Down Expand Up @@ -134,27 +135,58 @@ void ComponentManagerCallbackIsolated::add_node_to_executor(uint64_t node_id) {
return;
}

auto executor =
std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
executor->add_callback_group(callback_group, node);

auto it = node_id_to_executor_wrappers_[node_id].begin();
it = node_id_to_executor_wrappers_[node_id].emplace(it, executor);
auto &executor_wrapper = *it;

executor_wrapper.thread =
std::thread([&executor_wrapper, group_id, this]() {
auto tid = syscall(SYS_gettid);

{
std::lock_guard<std::mutex> lock(this->client_publisher_mutex_);
cie_thread_configurator::publish_callback_group_info(
this->client_publisher_, tid, group_id);
}

executor_wrapper.thread_initialized = true;
executor_wrapper.executor->spin();
});
if (callback_group->type() == rclcpp::CallbackGroupType::Reentrant &&
reentrant_parallelism_ >= 2) {
// Reentrant callback group: use MultiThreadedExecutorInternal
auto reentrant_executor = std::make_shared<MultiThreadedExecutorInternal>(
reentrant_parallelism_);
reentrant_executor->add_callback_group(callback_group, node);

auto it = node_id_to_executor_wrappers_[node_id].begin();
it = node_id_to_executor_wrappers_[node_id].emplace(it,
reentrant_executor);
auto &executor_wrapper = *it;

executor_wrapper.thread = std::thread(
[&executor_wrapper, reentrant_executor, group_id, this]() {
reentrant_executor->pre_spin();
auto tids = reentrant_executor->get_thread_ids();

{
std::lock_guard<std::mutex> lock(this->client_publisher_mutex_);
for (auto tid : tids) {
cie_thread_configurator::publish_callback_group_info(
this->client_publisher_, tid, group_id);
}
}

executor_wrapper.thread_initialized = true;
executor_wrapper.executor->spin();
});
} else {
// Mutually exclusive callback group: use SingleThreadedExecutor
auto executor =
std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
executor->add_callback_group(callback_group, node);

auto it = node_id_to_executor_wrappers_[node_id].begin();
it = node_id_to_executor_wrappers_[node_id].emplace(it, executor);
auto &executor_wrapper = *it;

executor_wrapper.thread =
std::thread([&executor_wrapper, group_id, this]() {
auto tid = syscall(SYS_gettid);

{
std::lock_guard<std::mutex> lock(this->client_publisher_mutex_);
cie_thread_configurator::publish_callback_group_info(
this->client_publisher_, tid, group_id);
}

executor_wrapper.thread_initialized = true;
executor_wrapper.executor->spin();
});
}
});
}

Expand Down