From 9b3ede7dbf97ab8b110122aa0bd74fb3c1ea70be Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Fri, 27 Nov 2020 16:52:53 +0800 Subject: [PATCH 01/14] to support a feature of content filtered topic Signed-off-by: Chen Lihui --- rclcpp/include/rclcpp/subscription_base.hpp | 35 +++++++ .../include/rclcpp/subscription_options.hpp | 70 +++++++++++++ rclcpp/src/rclcpp/subscription_base.cpp | 98 +++++++++++++++++++ 3 files changed, 203 insertions(+) diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 72ddc23be2..0af206aad9 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -283,6 +283,41 @@ class SubscriptionBase : public std::enable_shared_from_this std::vector get_network_flow_endpoints() const; + /// Check if subscription instance can support content filter topic feature. + /** + * Depending on the middleware and the message type, this will return true if the middleware + * can support content filter topic feature. + * + * \return boolean flag indicating if middleware can support content filter topic feature. + */ + RCLCPP_PUBLIC + bool + is_cft_supported() const; + + /// Set the filter expression and expression parameters for the subscription. + /** + * \param[in] filter_expression An filter expression to set. + * \param[in] expression_parameters Array of expression parameters to set. + */ + RCLCPP_PUBLIC + virtual + void + set_cft_expression_parameters( + const std::string & filter_expression, + const std::vector & expression_parameters); + + /// Get the filter expression and expression parameters for the subscription. + /** + * \param[out] filter_expression An filter expression to get. + * \param[out] expression_parameters Array of expression parameters to get. + */ + RCLCPP_PUBLIC + virtual + void + get_cft_expression_parameters( + std::string & filter_expression, + std::vector & expression_parameters) const; + protected: template void diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp index d6d2d4c60d..7a78b55135 100644 --- a/rclcpp/include/rclcpp/subscription_options.hpp +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -21,6 +21,9 @@ #include #include +#include "rcutils/strdup.h" +#include "rcutils/types/string_array.h" + #include "rclcpp/callback_group.hpp" #include "rclcpp/detail/rmw_implementation_specific_subscription_payload.hpp" #include "rclcpp/intra_process_buffer_type.hpp" @@ -81,6 +84,17 @@ struct SubscriptionOptionsBase TopicStatisticsOptions topic_stats_options; QosOverridingOptions qos_overriding_options; + + // Options to configure content filtered topic in the subscription. + struct ContentFilterOptions + { + // Filter expression like SQL statement + std::string filter_expression; + // Expression parameters for filter expression + std::vector expression_parameters; + }; + + ContentFilterOptions content_filter_options; }; /// Structure containing optional configuration for Subscriptions. @@ -123,6 +137,62 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase rmw_implementation_payload->modify_rmw_subscription_options(result.rmw_subscription_options); } + auto fail_clean = [&result]() + { + rcl_ret_t ret = rcl_subscription_options_fini(&result); + if (RCL_RET_OK != ret) { + RCLCPP_ERROR( + rclcpp::get_logger("rclcpp"), + "Failed to fini subscription option: %s", + rcl_get_error_string().str); + rcl_reset_error(); + } + }; + + // utils, string -> char * + // utils, vector -> rcutils_string_array_t * + + // Copy content_filter_options into rcl_subscription_options if necessary. + if (!content_filter_options.filter_expression.empty()) { + char * expression = + rcutils_strdup(content_filter_options.filter_expression.c_str(), allocator); + if (!expression) { + throw std::runtime_error("failed to allocate memory for filter expression"); + } + result.rmw_subscription_options.filter_expression = expression; + } + + if (!content_filter_options.expression_parameters.empty()) { + rcutils_string_array_t * parameters = + static_cast(allocator.allocate( + sizeof(rcutils_string_array_t), + allocator.state)); + if (!parameters) { + fail_clean(); + throw std::runtime_error("failed to allocate memory for expression parameters"); + } + rcutils_ret_t ret = rcutils_string_array_init( + parameters, content_filter_options.expression_parameters.size(), &allocator); + if (RCUTILS_RET_OK != ret) { + fail_clean(); + rclcpp::exceptions::throw_from_rcl_error( + RCL_RET_ERROR, + "failed to initialize string array for expression parameters"); + } + + for (size_t i = 0; i < content_filter_options.expression_parameters.size(); ++i) { + char * parameter = + rcutils_strdup(content_filter_options.expression_parameters[i].c_str(), allocator); + if (!parameter) { + fail_clean(); + throw std::runtime_error("failed to allocate memory for expression parameters"); + } + parameters->data[i] = parameter; + } + + result.rmw_subscription_options.expression_parameters = parameters; + } + return result; } diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 472034f362..f182e674ae 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -19,12 +19,16 @@ #include #include +#include "rcutils/strdup.h" +#include "rcutils/types/string_array.h" + #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/qos_event.hpp" +#include "rclcpp/scope_exit.hpp" #include "rmw/error_handling.h" #include "rmw/rmw.h" @@ -44,6 +48,20 @@ SubscriptionBase::SubscriptionBase( type_support_(type_support_handle), is_serialized_(is_serialized) { + // finalize subscription_options + RCLCPP_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_subscription_options_fini( + const_cast(&subscription_options)); + if (RCL_RET_OK != ret) { + RCLCPP_ERROR( + rclcpp::get_node_logger(node_handle_.get()).get_child("rclcpp"), + "Failed to fini subscription option: %s", + rcl_get_error_string().str); + rcl_reset_error(); + } + }); + auto custom_deletor = [node_handle = this->node_handle_](rcl_subscription_t * rcl_subs) { if (rcl_subscription_fini(rcl_subs, node_handle.get()) != RCL_RET_OK) { @@ -326,3 +344,83 @@ std::vector SubscriptionBase::get_network_flow_endp return network_flow_endpoint_vector; } + +bool +SubscriptionBase::is_cft_supported() const +{ + return rcl_subscription_is_cft_supported(subscription_handle_.get()); +} + +void +SubscriptionBase::set_cft_expression_parameters( + const std::string & filter_expression, + const std::vector & expression_parameters) +{ + rcl_allocator_t allocator = rcl_get_default_allocator(); + rcutils_string_array_t parameters; + parameters = rcutils_get_zero_initialized_string_array(); + rcutils_ret_t rcutils_ret = rcutils_string_array_init( + ¶meters, expression_parameters.size(), &allocator); + if (RCUTILS_RET_OK != rcutils_ret) { + rclcpp::exceptions::throw_from_rcl_error( + RCL_RET_ERROR, + "failed to initialize string array for expression parameters"); + } + RCLCPP_SCOPE_EXIT( + { + rcutils_ret_t rcutils_ret = rcutils_string_array_fini(¶meters); + if (RCUTILS_RET_OK != rcutils_ret) { + RCLCPP_ERROR( + rclcpp::get_node_logger(node_handle_.get()).get_child("rclcpp"), + "failed to finalize parameters: %s", + rcl_get_error_string().str); + rcl_reset_error(); + } + }); + + for (size_t i = 0; i < expression_parameters.size(); ++i) { + parameters.data[i] = rcutils_strdup(expression_parameters[i].c_str(), allocator); + } + rcl_ret_t ret = rcl_subscription_set_cft_expression_parameters( + subscription_handle_.get(), + filter_expression.c_str(), + ¶meters); + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "failed to set cft expression parameters"); + } +} + +void +SubscriptionBase::get_cft_expression_parameters( + std::string & filter_expression, + std::vector & expression_parameters) const +{ + char * expression = NULL; + rcutils_string_array_t parameters; + parameters = rcutils_get_zero_initialized_string_array(); + rcl_ret_t ret = rcl_subscription_get_cft_expression_parameters( + subscription_handle_.get(), + &expression, + ¶meters); + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "failed to get cft expression parameters"); + } + + filter_expression = expression; + for (size_t i = 0; i < parameters.size; ++i) { + expression_parameters.push_back(parameters.data[i]); + } + + rcutils_allocator_t allocator = rcutils_get_default_allocator(); + allocator.deallocate(expression, allocator.state); + rcutils_ret_t rcutils_ret = rcutils_string_array_fini(¶meters); + if (RCUTILS_RET_OK != rcutils_ret) { + RCLCPP_ERROR( + rclcpp::get_node_logger(node_handle_.get()).get_child("rclcpp"), + "failed to finalize parameters: %s", + rcl_get_error_string().str); + rcl_reset_error(); + } +} From 786a467d9aaa9d6596e5c846b0c921f5cd5d6af1 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Thu, 3 Dec 2020 16:59:12 +0800 Subject: [PATCH 02/14] update for checking memory allocated Signed-off-by: Chen Lihui --- rclcpp/src/rclcpp/subscription_base.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index f182e674ae..fb27f4ecfc 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -380,6 +380,9 @@ SubscriptionBase::set_cft_expression_parameters( for (size_t i = 0; i < expression_parameters.size(); ++i) { parameters.data[i] = rcutils_strdup(expression_parameters[i].c_str(), allocator); + if (!parameters.data[i]) { + throw std::runtime_error("failed to allocate memory for expression parameters"); + } } rcl_ret_t ret = rcl_subscription_set_cft_expression_parameters( subscription_handle_.get(), From 27aaa76c1abc3c2e7289900f49265ccb7009a81e Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Mon, 22 Feb 2021 15:47:30 +0800 Subject: [PATCH 03/14] set expression parameter only if filter is valid Signed-off-by: Chen Lihui --- .../include/rclcpp/subscription_options.hpp | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp index 7a78b55135..45f5bcce6c 100644 --- a/rclcpp/include/rclcpp/subscription_options.hpp +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -160,37 +160,37 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase throw std::runtime_error("failed to allocate memory for filter expression"); } result.rmw_subscription_options.filter_expression = expression; - } - if (!content_filter_options.expression_parameters.empty()) { - rcutils_string_array_t * parameters = - static_cast(allocator.allocate( - sizeof(rcutils_string_array_t), - allocator.state)); - if (!parameters) { - fail_clean(); - throw std::runtime_error("failed to allocate memory for expression parameters"); - } - rcutils_ret_t ret = rcutils_string_array_init( - parameters, content_filter_options.expression_parameters.size(), &allocator); - if (RCUTILS_RET_OK != ret) { - fail_clean(); - rclcpp::exceptions::throw_from_rcl_error( - RCL_RET_ERROR, - "failed to initialize string array for expression parameters"); - } - - for (size_t i = 0; i < content_filter_options.expression_parameters.size(); ++i) { - char * parameter = - rcutils_strdup(content_filter_options.expression_parameters[i].c_str(), allocator); - if (!parameter) { + if (!content_filter_options.expression_parameters.empty()) { + rcutils_string_array_t * parameters = + static_cast(allocator.allocate( + sizeof(rcutils_string_array_t), + allocator.state)); + if (!parameters) { fail_clean(); throw std::runtime_error("failed to allocate memory for expression parameters"); } - parameters->data[i] = parameter; - } + rcutils_ret_t ret = rcutils_string_array_init( + parameters, content_filter_options.expression_parameters.size(), &allocator); + if (RCUTILS_RET_OK != ret) { + fail_clean(); + rclcpp::exceptions::throw_from_rcl_error( + RCL_RET_ERROR, + "failed to initialize string array for expression parameters"); + } - result.rmw_subscription_options.expression_parameters = parameters; + for (size_t i = 0; i < content_filter_options.expression_parameters.size(); ++i) { + char * parameter = + rcutils_strdup(content_filter_options.expression_parameters[i].c_str(), allocator); + if (!parameter) { + fail_clean(); + throw std::runtime_error("failed to allocate memory for expression parameters"); + } + parameters->data[i] = parameter; + } + + result.rmw_subscription_options.expression_parameters = parameters; + } } return result; From 0402da8787be1e764b1ca16ab8d6af66a4df4113 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Tue, 23 Feb 2021 16:52:08 +0800 Subject: [PATCH 04/14] add test Signed-off-by: Chen Lihui --- .../include/rclcpp/subscription_options.hpp | 6 +- rclcpp/package.xml | 1 + rclcpp/src/rclcpp/subscription_base.cpp | 14 ---- rclcpp/test/rclcpp/CMakeLists.txt | 1 + rclcpp/test/rclcpp/test_subscription.cpp | 75 +++++++++++++++++++ 5 files changed, 80 insertions(+), 17 deletions(-) diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp index 45f5bcce6c..94b1969ffc 100644 --- a/rclcpp/include/rclcpp/subscription_options.hpp +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -163,13 +163,15 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase if (!content_filter_options.expression_parameters.empty()) { rcutils_string_array_t * parameters = - static_cast(allocator.allocate( + static_cast(allocator.zero_allocate( + 1, sizeof(rcutils_string_array_t), allocator.state)); if (!parameters) { fail_clean(); throw std::runtime_error("failed to allocate memory for expression parameters"); } + result.rmw_subscription_options.expression_parameters = parameters; rcutils_ret_t ret = rcutils_string_array_init( parameters, content_filter_options.expression_parameters.size(), &allocator); if (RCUTILS_RET_OK != ret) { @@ -188,8 +190,6 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase } parameters->data[i] = parameter; } - - result.rmw_subscription_options.expression_parameters = parameters; } } diff --git a/rclcpp/package.xml b/rclcpp/package.xml index 99783768e9..b551ca3fa1 100644 --- a/rclcpp/package.xml +++ b/rclcpp/package.xml @@ -46,6 +46,7 @@ rmw rmw_implementation_cmake rosidl_default_generators + std_msgs test_msgs diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index fb27f4ecfc..d84d2df6c4 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -48,20 +48,6 @@ SubscriptionBase::SubscriptionBase( type_support_(type_support_handle), is_serialized_(is_serialized) { - // finalize subscription_options - RCLCPP_SCOPE_EXIT( - { - rcl_ret_t ret = rcl_subscription_options_fini( - const_cast(&subscription_options)); - if (RCL_RET_OK != ret) { - RCLCPP_ERROR( - rclcpp::get_node_logger(node_handle_.get()).get_child("rclcpp"), - "Failed to fini subscription option: %s", - rcl_get_error_string().str); - rcl_reset_error(); - } - }); - auto custom_deletor = [node_handle = this->node_handle_](rcl_subscription_t * rcl_subs) { if (rcl_subscription_fini(rcl_subs, node_handle.get()) != RCL_RET_OK) { diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index d78a1adef5..a2f7c67072 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -507,6 +507,7 @@ if(TARGET test_subscription) "rmw" "rosidl_runtime_cpp" "rosidl_typesupport_cpp" + "std_msgs" "test_msgs" ) target_link_libraries(test_subscription ${PROJECT_NAME} mimick) diff --git a/rclcpp/test/rclcpp/test_subscription.cpp b/rclcpp/test/rclcpp/test_subscription.cpp index 5812985272..43b5858a3e 100644 --- a/rclcpp/test/rclcpp/test_subscription.cpp +++ b/rclcpp/test/rclcpp/test_subscription.cpp @@ -28,6 +28,7 @@ #include "../utils/rclcpp_gtest_macros.hpp" #include "test_msgs/msg/empty.hpp" +#include "std_msgs/msg/string.hpp" using namespace std::chrono_literals; @@ -144,6 +145,44 @@ class SubscriptionClass } }; +class TestCftSubscription : public ::testing::Test +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + node = std::make_shared("test_cft_subscription", "/ns"); + rclcpp::SubscriptionOptionsBase options_base; + options_base.content_filter_options.filter_expression = + "data MATCH 'Hello World: 5' or data MATCH %0"; + options_base.content_filter_options.expression_parameters = {"'Hello World: 10'"}; + rclcpp::SubscriptionOptionsWithAllocator> subscription_options( + options_base); + auto callback = [](std::shared_ptr) {}; + sub = node->create_subscription( + "topic", 10, callback, subscription_options); + } + + void TearDown() + { + sub.reset(); + node.reset(); + } + +protected: + rclcpp::Node::SharedPtr node; + rclcpp::Subscription::SharedPtr sub; +}; + /* Testing subscription construction and destruction. */ @@ -440,6 +479,42 @@ TEST_F(TestSubscription, handle_loaned_message) { EXPECT_NO_THROW(sub->handle_loaned_message(&msg, message_info)); } +TEST_F(TestCftSubscription, is_cft_supported) { + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_is_cft_supported, false); + EXPECT_FALSE(sub->is_cft_supported()); + } + + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_is_cft_supported, true); + EXPECT_TRUE(sub->is_cft_supported()); + } +} + +TEST_F(TestCftSubscription, get_cft_expression_parameters_error) { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_get_cft_expression_parameters, RCL_RET_ERROR); + + std::string filter_expression; + std::vector expression_parameters; + EXPECT_THROW( + sub->get_cft_expression_parameters(filter_expression, expression_parameters), + rclcpp::exceptions::RCLError); +} + +TEST_F(TestCftSubscription, set_cft_expression_parameters_error) { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_set_cft_expression_parameters, RCL_RET_ERROR); + + std::string filter_expression = "data MATCH %0"; + std::string expression_parameter = "'Hello World: 8'"; + EXPECT_THROW( + sub->set_cft_expression_parameters(filter_expression, {expression_parameter}), + rclcpp::exceptions::RCLError); +} + /* Testing subscription with intraprocess enabled and invalid QoS */ From 38fff61733b73d1a708bdf7f90342f4ee3e688a5 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Fri, 12 Mar 2021 13:18:05 +0800 Subject: [PATCH 05/14] use a default empty value for expresion parameters Signed-off-by: Chen Lihui --- rclcpp/include/rclcpp/subscription_base.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 0af206aad9..10e186d400 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -304,7 +304,7 @@ class SubscriptionBase : public std::enable_shared_from_this void set_cft_expression_parameters( const std::string & filter_expression, - const std::vector & expression_parameters); + const std::vector & expression_parameters = {}); /// Get the filter expression and expression parameters for the subscription. /** From abd254c09d6cdf31413128d5555924558e7c4191 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Fri, 12 Mar 2021 13:39:13 +0800 Subject: [PATCH 06/14] remove std_msgs dependency Signed-off-by: Chen Lihui --- rclcpp/package.xml | 1 - rclcpp/test/rclcpp/CMakeLists.txt | 1 - rclcpp/test/rclcpp/test_subscription.cpp | 16 ++++++++-------- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/rclcpp/package.xml b/rclcpp/package.xml index b551ca3fa1..99783768e9 100644 --- a/rclcpp/package.xml +++ b/rclcpp/package.xml @@ -46,7 +46,6 @@ rmw rmw_implementation_cmake rosidl_default_generators - std_msgs test_msgs diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index a2f7c67072..d78a1adef5 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -507,7 +507,6 @@ if(TARGET test_subscription) "rmw" "rosidl_runtime_cpp" "rosidl_typesupport_cpp" - "std_msgs" "test_msgs" ) target_link_libraries(test_subscription ${PROJECT_NAME} mimick) diff --git a/rclcpp/test/rclcpp/test_subscription.cpp b/rclcpp/test/rclcpp/test_subscription.cpp index 43b5858a3e..426188e548 100644 --- a/rclcpp/test/rclcpp/test_subscription.cpp +++ b/rclcpp/test/rclcpp/test_subscription.cpp @@ -27,8 +27,8 @@ #include "../mocking_utils/patch.hpp" #include "../utils/rclcpp_gtest_macros.hpp" +#include "test_msgs/msg/basic_types.hpp" #include "test_msgs/msg/empty.hpp" -#include "std_msgs/msg/string.hpp" using namespace std::chrono_literals; @@ -163,12 +163,12 @@ class TestCftSubscription : public ::testing::Test node = std::make_shared("test_cft_subscription", "/ns"); rclcpp::SubscriptionOptionsBase options_base; options_base.content_filter_options.filter_expression = - "data MATCH 'Hello World: 5' or data MATCH %0"; - options_base.content_filter_options.expression_parameters = {"'Hello World: 10'"}; + "int32_value = %0"; + options_base.content_filter_options.expression_parameters = {"10"}; rclcpp::SubscriptionOptionsWithAllocator> subscription_options( options_base); - auto callback = [](std::shared_ptr) {}; - sub = node->create_subscription( + auto callback = [](std::shared_ptr) {}; + sub = node->create_subscription( "topic", 10, callback, subscription_options); } @@ -180,7 +180,7 @@ class TestCftSubscription : public ::testing::Test protected: rclcpp::Node::SharedPtr node; - rclcpp::Subscription::SharedPtr sub; + rclcpp::Subscription::SharedPtr sub; }; /* @@ -508,8 +508,8 @@ TEST_F(TestCftSubscription, set_cft_expression_parameters_error) { auto mock = mocking_utils::patch_and_return( "lib:rclcpp", rcl_subscription_set_cft_expression_parameters, RCL_RET_ERROR); - std::string filter_expression = "data MATCH %0"; - std::string expression_parameter = "'Hello World: 8'"; + std::string filter_expression = "int32_value = %0"; + std::string expression_parameter = "100"; EXPECT_THROW( sub->set_cft_expression_parameters(filter_expression, {expression_parameter}), rclcpp::exceptions::RCLError); From f0214729e364c5262f9aeb97424c01ae2bb90f6d Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Wed, 13 Oct 2021 13:50:34 +0800 Subject: [PATCH 07/14] use new rcl interface Signed-off-by: Chen Lihui --- rclcpp/include/rclcpp/subscription_base.hpp | 13 ++- .../include/rclcpp/subscription_options.hpp | 67 +++-------- rclcpp/include/rclcpp/utilities.hpp | 9 ++ rclcpp/src/rclcpp/subscription_base.cpp | 92 ++++++++-------- rclcpp/src/rclcpp/utilities.cpp | 13 +++ rclcpp/test/rclcpp/CMakeLists.txt | 12 ++ rclcpp/test/rclcpp/test_subscription.cpp | 75 ------------- ...st_subscription_content_filtered_topic.cpp | 104 ++++++++++++++++++ 8 files changed, 207 insertions(+), 178 deletions(-) create mode 100644 rclcpp/test/rclcpp/test_subscription_content_filtered_topic.cpp diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 10e186d400..895c056f51 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -283,21 +283,20 @@ class SubscriptionBase : public std::enable_shared_from_this std::vector get_network_flow_endpoints() const; - /// Check if subscription instance can support content filter topic feature. + /// Check if content filtered topic feature of the subscription instance is enabled. /** - * Depending on the middleware and the message type, this will return true if the middleware - * can support content filter topic feature. - * - * \return boolean flag indicating if middleware can support content filter topic feature. + * \return boolean flag indicating if middleware can support content filtered topic feature. */ RCLCPP_PUBLIC bool - is_cft_supported() const; + is_cft_enabled() const; /// Set the filter expression and expression parameters for the subscription. /** * \param[in] filter_expression An filter expression to set. * \param[in] expression_parameters Array of expression parameters to set. + * \throws RCLBadAlloc if memory cannot be allocated + * \throws RCLError if an unexpect error occurs */ RCLCPP_PUBLIC virtual @@ -310,6 +309,8 @@ class SubscriptionBase : public std::enable_shared_from_this /** * \param[out] filter_expression An filter expression to get. * \param[out] expression_parameters Array of expression parameters to get. + * \throws RCLBadAlloc if memory cannot be allocated + * \throws RCLError if an unexpect error occurs */ RCLCPP_PUBLIC virtual diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp index 94b1969ffc..77ad131e49 100644 --- a/rclcpp/include/rclcpp/subscription_options.hpp +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -88,9 +88,9 @@ struct SubscriptionOptionsBase // Options to configure content filtered topic in the subscription. struct ContentFilterOptions { - // Filter expression like SQL statement + // Filter expression like the 'where' part of SQL statement std::string filter_expression; - // Expression parameters for filter expression + // Expression parameters if there is placeholder '%n' in the filter expression std::vector expression_parameters; }; @@ -137,59 +137,18 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase rmw_implementation_payload->modify_rmw_subscription_options(result.rmw_subscription_options); } - auto fail_clean = [&result]() - { - rcl_ret_t ret = rcl_subscription_options_fini(&result); - if (RCL_RET_OK != ret) { - RCLCPP_ERROR( - rclcpp::get_logger("rclcpp"), - "Failed to fini subscription option: %s", - rcl_get_error_string().str); - rcl_reset_error(); - } - }; - - // utils, string -> char * - // utils, vector -> rcutils_string_array_t * - - // Copy content_filter_options into rcl_subscription_options if necessary. + // Copy content_filter_options into rcl_subscription_options. if (!content_filter_options.filter_expression.empty()) { - char * expression = - rcutils_strdup(content_filter_options.filter_expression.c_str(), allocator); - if (!expression) { - throw std::runtime_error("failed to allocate memory for filter expression"); - } - result.rmw_subscription_options.filter_expression = expression; - - if (!content_filter_options.expression_parameters.empty()) { - rcutils_string_array_t * parameters = - static_cast(allocator.zero_allocate( - 1, - sizeof(rcutils_string_array_t), - allocator.state)); - if (!parameters) { - fail_clean(); - throw std::runtime_error("failed to allocate memory for expression parameters"); - } - result.rmw_subscription_options.expression_parameters = parameters; - rcutils_ret_t ret = rcutils_string_array_init( - parameters, content_filter_options.expression_parameters.size(), &allocator); - if (RCUTILS_RET_OK != ret) { - fail_clean(); - rclcpp::exceptions::throw_from_rcl_error( - RCL_RET_ERROR, - "failed to initialize string array for expression parameters"); - } - - for (size_t i = 0; i < content_filter_options.expression_parameters.size(); ++i) { - char * parameter = - rcutils_strdup(content_filter_options.expression_parameters[i].c_str(), allocator); - if (!parameter) { - fail_clean(); - throw std::runtime_error("failed to allocate memory for expression parameters"); - } - parameters->data[i] = parameter; - } + std::vector cstrings = + get_c_vector_string(content_filter_options.expression_parameters); + rcl_ret_t ret = rcl_subscription_options_set_content_filtered_topic_options( + get_c_string(content_filter_options.filter_expression), + cstrings.size(), + cstrings.data(), + &result); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to set content_filtered_topic_options"); } } diff --git a/rclcpp/include/rclcpp/utilities.hpp b/rclcpp/include/rclcpp/utilities.hpp index 5011a12c6a..0a659f9a26 100644 --- a/rclcpp/include/rclcpp/utilities.hpp +++ b/rclcpp/include/rclcpp/utilities.hpp @@ -289,6 +289,15 @@ RCLCPP_PUBLIC const char * get_c_string(const std::string & string_in); +/// Return the std::vector of C string from the given std::vector. +/** + * \param[in] string_in is a std::string + * \return the std::vector of C string from the std::vector + */ +RCLCPP_PUBLIC +std::vector +get_c_vector_string(const std::vector & strings); + } // namespace rclcpp #endif // RCLCPP__UTILITIES_HPP_ diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index d84d2df6c4..581b520cc8 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -19,6 +19,8 @@ #include #include +#include "rcpputils/scope_exit.hpp" + #include "rcutils/strdup.h" #include "rcutils/types/string_array.h" @@ -332,9 +334,9 @@ std::vector SubscriptionBase::get_network_flow_endp } bool -SubscriptionBase::is_cft_supported() const +SubscriptionBase::is_cft_enabled() const { - return rcl_subscription_is_cft_supported(subscription_handle_.get()); + return rcl_subscription_is_cft_enabled(subscription_handle_.get()); } void @@ -342,38 +344,34 @@ SubscriptionBase::set_cft_expression_parameters( const std::string & filter_expression, const std::vector & expression_parameters) { - rcl_allocator_t allocator = rcl_get_default_allocator(); - rcutils_string_array_t parameters; - parameters = rcutils_get_zero_initialized_string_array(); - rcutils_ret_t rcutils_ret = rcutils_string_array_init( - ¶meters, expression_parameters.size(), &allocator); - if (RCUTILS_RET_OK != rcutils_ret) { + rcl_subscription_content_filtered_topic_options_t options = + rcl_subscription_get_default_content_filtered_topic_options(); + + std::vector cstrings = + get_c_vector_string(expression_parameters); + rcl_ret_t ret = rcl_subscription_content_filtered_topic_options_init( + get_c_string(filter_expression), + cstrings.size(), + cstrings.data(), + &options); + if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error( - RCL_RET_ERROR, - "failed to initialize string array for expression parameters"); + ret, "failed to init subscription content_filtered_topic option"); } - RCLCPP_SCOPE_EXIT( - { - rcutils_ret_t rcutils_ret = rcutils_string_array_fini(¶meters); - if (RCUTILS_RET_OK != rcutils_ret) { + RCPPUTILS_SCOPE_EXIT( + rcl_ret_t ret = rcl_subscription_content_filtered_topic_options_fini(&options); + if (RCL_RET_OK != ret) { RCLCPP_ERROR( - rclcpp::get_node_logger(node_handle_.get()).get_child("rclcpp"), - "failed to finalize parameters: %s", + rclcpp::get_logger("rclcpp"), + "Failed to fini subscription content_filtered_topic option: %s", rcl_get_error_string().str); rcl_reset_error(); } - }); + ); - for (size_t i = 0; i < expression_parameters.size(); ++i) { - parameters.data[i] = rcutils_strdup(expression_parameters[i].c_str(), allocator); - if (!parameters.data[i]) { - throw std::runtime_error("failed to allocate memory for expression parameters"); - } - } - rcl_ret_t ret = rcl_subscription_set_cft_expression_parameters( + ret = rcl_subscription_set_cft_expression_parameters( subscription_handle_.get(), - filter_expression.c_str(), - ¶meters); + &options); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret, "failed to set cft expression parameters"); @@ -385,31 +383,39 @@ SubscriptionBase::get_cft_expression_parameters( std::string & filter_expression, std::vector & expression_parameters) const { - char * expression = NULL; - rcutils_string_array_t parameters; - parameters = rcutils_get_zero_initialized_string_array(); + rcl_subscription_content_filtered_topic_options_t options = + rcl_subscription_get_default_content_filtered_topic_options(); + + // use rcl_content_filtered_topic_options_t rcl_ret_t ret = rcl_subscription_get_cft_expression_parameters( subscription_handle_.get(), - &expression, - ¶meters); + &options); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret, "failed to get cft expression parameters"); } - filter_expression = expression; - for (size_t i = 0; i < parameters.size; ++i) { - expression_parameters.push_back(parameters.data[i]); + RCLCPP_SCOPE_EXIT( + rcl_ret_t ret = rcl_subscription_content_filtered_topic_options_fini(&options); + if (RCL_RET_OK != ret) { + RCLCPP_ERROR( + rclcpp::get_logger("rclcpp"), + "Failed to fini subscription content_filtered_topic option: %s", + rcl_get_error_string().str); + rcl_reset_error(); + } + ); + + rmw_subscription_content_filtered_topic_options_t * content_filtered_topic_options = + options.rmw_subscription_content_filtered_topic_options; + if (content_filtered_topic_options->filter_expression) { + filter_expression = content_filtered_topic_options->filter_expression; } - rcutils_allocator_t allocator = rcutils_get_default_allocator(); - allocator.deallocate(expression, allocator.state); - rcutils_ret_t rcutils_ret = rcutils_string_array_fini(¶meters); - if (RCUTILS_RET_OK != rcutils_ret) { - RCLCPP_ERROR( - rclcpp::get_node_logger(node_handle_.get()).get_child("rclcpp"), - "failed to finalize parameters: %s", - rcl_get_error_string().str); - rcl_reset_error(); + if (content_filtered_topic_options->expression_parameters) { + for (size_t i = 0; i < content_filtered_topic_options->expression_parameters->size; ++i) { + expression_parameters.push_back( + content_filtered_topic_options->expression_parameters->data[i]); + } } } diff --git a/rclcpp/src/rclcpp/utilities.cpp b/rclcpp/src/rclcpp/utilities.cpp index c3e3915d79..8195cc5a03 100644 --- a/rclcpp/src/rclcpp/utilities.cpp +++ b/rclcpp/src/rclcpp/utilities.cpp @@ -203,4 +203,17 @@ get_c_string(const std::string & string_in) return string_in.c_str(); } +std::vector +get_c_vector_string(const std::vector & strings) +{ + std::vector cstrings; + cstrings.reserve(strings.size()); + + for(size_t i = 0; i < strings.size(); ++i) { + cstrings.push_back(strings[i].c_str()); + } + + return cstrings; +} + } // namespace rclcpp diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index d78a1adef5..ef5476a0f8 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -739,3 +739,15 @@ ament_add_gtest(test_graph_listener test_graph_listener.cpp) if(TARGET test_graph_listener) target_link_libraries(test_graph_listener ${PROJECT_NAME} mimick) endif() + +ament_add_gtest(test_subscription_content_filtered_topic test_subscription_content_filtered_topic.cpp) +if(TARGET test_subscription_content_filtered_topic) + ament_target_dependencies(test_subscription_content_filtered_topic + "rcl_interfaces" + "rmw" + "rosidl_runtime_cpp" + "rosidl_typesupport_cpp" + "test_msgs" + ) + target_link_libraries(test_subscription_content_filtered_topic ${PROJECT_NAME} mimick) +endif() diff --git a/rclcpp/test/rclcpp/test_subscription.cpp b/rclcpp/test/rclcpp/test_subscription.cpp index 426188e548..5812985272 100644 --- a/rclcpp/test/rclcpp/test_subscription.cpp +++ b/rclcpp/test/rclcpp/test_subscription.cpp @@ -27,7 +27,6 @@ #include "../mocking_utils/patch.hpp" #include "../utils/rclcpp_gtest_macros.hpp" -#include "test_msgs/msg/basic_types.hpp" #include "test_msgs/msg/empty.hpp" using namespace std::chrono_literals; @@ -145,44 +144,6 @@ class SubscriptionClass } }; -class TestCftSubscription : public ::testing::Test -{ -public: - static void SetUpTestCase() - { - rclcpp::init(0, nullptr); - } - - static void TearDownTestCase() - { - rclcpp::shutdown(); - } - - void SetUp() - { - node = std::make_shared("test_cft_subscription", "/ns"); - rclcpp::SubscriptionOptionsBase options_base; - options_base.content_filter_options.filter_expression = - "int32_value = %0"; - options_base.content_filter_options.expression_parameters = {"10"}; - rclcpp::SubscriptionOptionsWithAllocator> subscription_options( - options_base); - auto callback = [](std::shared_ptr) {}; - sub = node->create_subscription( - "topic", 10, callback, subscription_options); - } - - void TearDown() - { - sub.reset(); - node.reset(); - } - -protected: - rclcpp::Node::SharedPtr node; - rclcpp::Subscription::SharedPtr sub; -}; - /* Testing subscription construction and destruction. */ @@ -479,42 +440,6 @@ TEST_F(TestSubscription, handle_loaned_message) { EXPECT_NO_THROW(sub->handle_loaned_message(&msg, message_info)); } -TEST_F(TestCftSubscription, is_cft_supported) { - { - auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_subscription_is_cft_supported, false); - EXPECT_FALSE(sub->is_cft_supported()); - } - - { - auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_subscription_is_cft_supported, true); - EXPECT_TRUE(sub->is_cft_supported()); - } -} - -TEST_F(TestCftSubscription, get_cft_expression_parameters_error) { - auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_subscription_get_cft_expression_parameters, RCL_RET_ERROR); - - std::string filter_expression; - std::vector expression_parameters; - EXPECT_THROW( - sub->get_cft_expression_parameters(filter_expression, expression_parameters), - rclcpp::exceptions::RCLError); -} - -TEST_F(TestCftSubscription, set_cft_expression_parameters_error) { - auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_subscription_set_cft_expression_parameters, RCL_RET_ERROR); - - std::string filter_expression = "int32_value = %0"; - std::string expression_parameter = "100"; - EXPECT_THROW( - sub->set_cft_expression_parameters(filter_expression, {expression_parameter}), - rclcpp::exceptions::RCLError); -} - /* Testing subscription with intraprocess enabled and invalid QoS */ diff --git a/rclcpp/test/rclcpp/test_subscription_content_filtered_topic.cpp b/rclcpp/test/rclcpp/test_subscription_content_filtered_topic.cpp new file mode 100644 index 0000000000..3ed2df96a6 --- /dev/null +++ b/rclcpp/test/rclcpp/test_subscription_content_filtered_topic.cpp @@ -0,0 +1,104 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/rclcpp.hpp" + +#include "../mocking_utils/patch.hpp" +#include "../utils/rclcpp_gtest_macros.hpp" + +#include "test_msgs/msg/basic_types.hpp" +#include "test_msgs/msg/empty.hpp" + +class TestCftSubscription : public ::testing::Test +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + node = std::make_shared("test_cft_subscription", "/ns"); + + auto options = rclcpp::SubscriptionOptions(); + options.content_filter_options.filter_expression = "int32_value = %0"; + options.content_filter_options.expression_parameters = {"10"}; + + auto callback = [](std::shared_ptr) {}; + sub = node->create_subscription( + "topic", 10, callback, options); + } + + void TearDown() + { + sub.reset(); + node.reset(); + } + +protected: + rclcpp::Node::SharedPtr node; + rclcpp::Subscription::SharedPtr sub; +}; + +TEST_F(TestCftSubscription, is_cft_enabled) { + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_is_cft_enabled, false); + EXPECT_FALSE(sub->is_cft_enabled()); + } + + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_is_cft_enabled, true); + EXPECT_TRUE(sub->is_cft_enabled()); + } +} + +TEST_F(TestCftSubscription, get_cft_expression_parameters_error) { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_get_cft_expression_parameters, RCL_RET_ERROR); + + std::string filter_expression; + std::vector expression_parameters; + EXPECT_THROW( + sub->get_cft_expression_parameters(filter_expression, expression_parameters), + rclcpp::exceptions::RCLError); +} + +TEST_F(TestCftSubscription, set_cft_expression_parameters_error) { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_set_cft_expression_parameters, RCL_RET_ERROR); + + std::string filter_expression = "int32_value = %0"; + std::string expression_parameter = "100"; + EXPECT_THROW( + sub->set_cft_expression_parameters(filter_expression, {expression_parameter}), + rclcpp::exceptions::RCLError); +} From f3df09c3fe92800b02a672cee88d1265f967d106 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Mon, 25 Oct 2021 15:11:27 +0800 Subject: [PATCH 08/14] remove unused include header files and fix unscrutify Signed-off-by: Chen Lihui --- rclcpp/include/rclcpp/subscription_options.hpp | 3 --- rclcpp/src/rclcpp/subscription_base.cpp | 14 ++++++-------- rclcpp/src/rclcpp/utilities.cpp | 6 +++--- 3 files changed, 9 insertions(+), 14 deletions(-) diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp index 77ad131e49..b1b2ca8a6e 100644 --- a/rclcpp/include/rclcpp/subscription_options.hpp +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -21,9 +21,6 @@ #include #include -#include "rcutils/strdup.h" -#include "rcutils/types/string_array.h" - #include "rclcpp/callback_group.hpp" #include "rclcpp/detail/rmw_implementation_specific_subscription_payload.hpp" #include "rclcpp/intra_process_buffer_type.hpp" diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 581b520cc8..5b12bd50fe 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -21,16 +21,12 @@ #include "rcpputils/scope_exit.hpp" -#include "rcutils/strdup.h" -#include "rcutils/types/string_array.h" - #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/node_interfaces/node_base_interface.hpp" #include "rclcpp/qos_event.hpp" -#include "rclcpp/scope_exit.hpp" #include "rmw/error_handling.h" #include "rmw/rmw.h" @@ -359,6 +355,7 @@ SubscriptionBase::set_cft_expression_parameters( ret, "failed to init subscription content_filtered_topic option"); } RCPPUTILS_SCOPE_EXIT( + { rcl_ret_t ret = rcl_subscription_content_filtered_topic_options_fini(&options); if (RCL_RET_OK != ret) { RCLCPP_ERROR( @@ -367,7 +364,7 @@ SubscriptionBase::set_cft_expression_parameters( rcl_get_error_string().str); rcl_reset_error(); } - ); + }); ret = rcl_subscription_set_cft_expression_parameters( subscription_handle_.get(), @@ -386,7 +383,7 @@ SubscriptionBase::get_cft_expression_parameters( rcl_subscription_content_filtered_topic_options_t options = rcl_subscription_get_default_content_filtered_topic_options(); - // use rcl_content_filtered_topic_options_t + // use rcl_content_filtered_topic_options_t instead of char * and rcl_ret_t ret = rcl_subscription_get_cft_expression_parameters( subscription_handle_.get(), &options); @@ -395,7 +392,8 @@ SubscriptionBase::get_cft_expression_parameters( rclcpp::exceptions::throw_from_rcl_error(ret, "failed to get cft expression parameters"); } - RCLCPP_SCOPE_EXIT( + RCPPUTILS_SCOPE_EXIT( + { rcl_ret_t ret = rcl_subscription_content_filtered_topic_options_fini(&options); if (RCL_RET_OK != ret) { RCLCPP_ERROR( @@ -404,7 +402,7 @@ SubscriptionBase::get_cft_expression_parameters( rcl_get_error_string().str); rcl_reset_error(); } - ); + }); rmw_subscription_content_filtered_topic_options_t * content_filtered_topic_options = options.rmw_subscription_content_filtered_topic_options; diff --git a/rclcpp/src/rclcpp/utilities.cpp b/rclcpp/src/rclcpp/utilities.cpp index 8195cc5a03..55a45d62d2 100644 --- a/rclcpp/src/rclcpp/utilities.cpp +++ b/rclcpp/src/rclcpp/utilities.cpp @@ -206,11 +206,11 @@ get_c_string(const std::string & string_in) std::vector get_c_vector_string(const std::vector & strings) { - std::vector cstrings; + std::vector cstrings; cstrings.reserve(strings.size()); - for(size_t i = 0; i < strings.size(); ++i) { - cstrings.push_back(strings[i].c_str()); + for (size_t i = 0; i < strings.size(); ++i) { + cstrings.push_back(strings[i].c_str()); } return cstrings; From 2dba94052bbe538144b1226dab061447891cfa14 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Mon, 25 Oct 2021 15:21:58 +0800 Subject: [PATCH 09/14] update comment Signed-off-by: Chen Lihui --- rclcpp/include/rclcpp/subscription_base.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 895c056f51..6b16ddff06 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -285,7 +285,7 @@ class SubscriptionBase : public std::enable_shared_from_this /// Check if content filtered topic feature of the subscription instance is enabled. /** - * \return boolean flag indicating if middleware can support content filtered topic feature. + * \return boolean flag indicating if the content filtered topic of this subscription is enabled. */ RCLCPP_PUBLIC bool From 7c75adc00c71fcb3a09293aef1326961b608eda9 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Tue, 26 Oct 2021 13:37:06 +0800 Subject: [PATCH 10/14] update comment Signed-off-by: Chen Lihui --- rclcpp/include/rclcpp/subscription_base.hpp | 2 ++ rclcpp/include/rclcpp/subscription_options.hpp | 9 ++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 6b16ddff06..e2ec5fde7d 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -294,7 +294,9 @@ class SubscriptionBase : public std::enable_shared_from_this /// Set the filter expression and expression parameters for the subscription. /** * \param[in] filter_expression An filter expression to set. + * \sa SubscriptionOptionsBase::ContentFilterOptions::filter_expression * \param[in] expression_parameters Array of expression parameters to set. + * \sa SubscriptionOptionsBase::ContentFilterOptions::expression_parameters * \throws RCLBadAlloc if memory cannot be allocated * \throws RCLError if an unexpect error occurs */ diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp index b1b2ca8a6e..8c23aacc99 100644 --- a/rclcpp/include/rclcpp/subscription_options.hpp +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -82,12 +82,15 @@ struct SubscriptionOptionsBase QosOverridingOptions qos_overriding_options; - // Options to configure content filtered topic in the subscription. + /// Options to configure content filtered topic in the subscription. struct ContentFilterOptions { - // Filter expression like the 'where' part of SQL statement + /// Filter expression is similar to the WHERE part of an SQL clause std::string filter_expression; - // Expression parameters if there is placeholder '%n' in the filter expression + /** + * Expression parameters is the tokens placeholder ‘parameters’ (i.e., "%n" tokens begin from 0) + * in the filter_expression + */ std::vector expression_parameters; }; From 614e6f669aebe449c2379adf8b3d015621e4b073 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Tue, 26 Oct 2021 14:24:59 +0800 Subject: [PATCH 11/14] rename Signed-off-by: Chen Lihui --- rclcpp/include/rclcpp/subscription_base.hpp | 4 +- .../include/rclcpp/subscription_options.hpp | 4 +- rclcpp/src/rclcpp/subscription_base.cpp | 38 +++++++++---------- rclcpp/test/rclcpp/CMakeLists.txt | 8 ++-- ...p => test_subscription_content_filter.cpp} | 12 +++--- 5 files changed, 33 insertions(+), 33 deletions(-) rename rclcpp/test/rclcpp/{test_subscription_content_filtered_topic.cpp => test_subscription_content_filter.cpp} (84%) diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index e2ec5fde7d..a83f1f3334 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -303,7 +303,7 @@ class SubscriptionBase : public std::enable_shared_from_this RCLCPP_PUBLIC virtual void - set_cft_expression_parameters( + set_content_filter( const std::string & filter_expression, const std::vector & expression_parameters = {}); @@ -317,7 +317,7 @@ class SubscriptionBase : public std::enable_shared_from_this RCLCPP_PUBLIC virtual void - get_cft_expression_parameters( + get_content_filter( std::string & filter_expression, std::vector & expression_parameters) const; diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp index 8c23aacc99..e44704c7b0 100644 --- a/rclcpp/include/rclcpp/subscription_options.hpp +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -141,14 +141,14 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase if (!content_filter_options.filter_expression.empty()) { std::vector cstrings = get_c_vector_string(content_filter_options.expression_parameters); - rcl_ret_t ret = rcl_subscription_options_set_content_filtered_topic_options( + rcl_ret_t ret = rcl_subscription_options_set_content_filter_options( get_c_string(content_filter_options.filter_expression), cstrings.size(), cstrings.data(), &result); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error( - ret, "failed to set content_filtered_topic_options"); + ret, "failed to set content_filter_options"); } } diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 5b12bd50fe..29554fcd43 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -336,16 +336,16 @@ SubscriptionBase::is_cft_enabled() const } void -SubscriptionBase::set_cft_expression_parameters( +SubscriptionBase::set_content_filter( const std::string & filter_expression, const std::vector & expression_parameters) { - rcl_subscription_content_filtered_topic_options_t options = - rcl_subscription_get_default_content_filtered_topic_options(); + rcl_subscription_content_filter_options_t options = + rcl_subscription_get_default_content_filter_options(); std::vector cstrings = get_c_vector_string(expression_parameters); - rcl_ret_t ret = rcl_subscription_content_filtered_topic_options_init( + rcl_ret_t ret = rcl_subscription_content_filter_options_init( get_c_string(filter_expression), cstrings.size(), cstrings.data(), @@ -356,7 +356,7 @@ SubscriptionBase::set_cft_expression_parameters( } RCPPUTILS_SCOPE_EXIT( { - rcl_ret_t ret = rcl_subscription_content_filtered_topic_options_fini(&options); + rcl_ret_t ret = rcl_subscription_content_filter_options_fini(&options); if (RCL_RET_OK != ret) { RCLCPP_ERROR( rclcpp::get_logger("rclcpp"), @@ -366,7 +366,7 @@ SubscriptionBase::set_cft_expression_parameters( } }); - ret = rcl_subscription_set_cft_expression_parameters( + ret = rcl_subscription_set_content_filter( subscription_handle_.get(), &options); @@ -376,15 +376,15 @@ SubscriptionBase::set_cft_expression_parameters( } void -SubscriptionBase::get_cft_expression_parameters( +SubscriptionBase::get_content_filter( std::string & filter_expression, std::vector & expression_parameters) const { - rcl_subscription_content_filtered_topic_options_t options = - rcl_subscription_get_default_content_filtered_topic_options(); + rcl_subscription_content_filter_options_t options = + rcl_subscription_get_default_content_filter_options(); - // use rcl_content_filtered_topic_options_t instead of char * and - rcl_ret_t ret = rcl_subscription_get_cft_expression_parameters( + // use rcl_content_filter_options_t instead of char * and + rcl_ret_t ret = rcl_subscription_get_content_filter( subscription_handle_.get(), &options); @@ -394,7 +394,7 @@ SubscriptionBase::get_cft_expression_parameters( RCPPUTILS_SCOPE_EXIT( { - rcl_ret_t ret = rcl_subscription_content_filtered_topic_options_fini(&options); + rcl_ret_t ret = rcl_subscription_content_filter_options_fini(&options); if (RCL_RET_OK != ret) { RCLCPP_ERROR( rclcpp::get_logger("rclcpp"), @@ -404,16 +404,16 @@ SubscriptionBase::get_cft_expression_parameters( } }); - rmw_subscription_content_filtered_topic_options_t * content_filtered_topic_options = - options.rmw_subscription_content_filtered_topic_options; - if (content_filtered_topic_options->filter_expression) { - filter_expression = content_filtered_topic_options->filter_expression; + rmw_subscription_content_filter_options_t * content_filter_options = + options.rmw_subscription_content_filter_options; + if (content_filter_options->filter_expression) { + filter_expression = content_filter_options->filter_expression; } - if (content_filtered_topic_options->expression_parameters) { - for (size_t i = 0; i < content_filtered_topic_options->expression_parameters->size; ++i) { + if (content_filter_options->expression_parameters) { + for (size_t i = 0; i < content_filter_options->expression_parameters->size; ++i) { expression_parameters.push_back( - content_filtered_topic_options->expression_parameters->data[i]); + content_filter_options->expression_parameters->data[i]); } } } diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index ef5476a0f8..d00b5f3ab2 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -740,14 +740,14 @@ if(TARGET test_graph_listener) target_link_libraries(test_graph_listener ${PROJECT_NAME} mimick) endif() -ament_add_gtest(test_subscription_content_filtered_topic test_subscription_content_filtered_topic.cpp) -if(TARGET test_subscription_content_filtered_topic) - ament_target_dependencies(test_subscription_content_filtered_topic +ament_add_gtest(test_subscription_content_filter test_subscription_content_filter.cpp) +if(TARGET test_subscription_content_filter) + ament_target_dependencies(test_subscription_content_filter "rcl_interfaces" "rmw" "rosidl_runtime_cpp" "rosidl_typesupport_cpp" "test_msgs" ) - target_link_libraries(test_subscription_content_filtered_topic ${PROJECT_NAME} mimick) + target_link_libraries(test_subscription_content_filter ${PROJECT_NAME} mimick) endif() diff --git a/rclcpp/test/rclcpp/test_subscription_content_filtered_topic.cpp b/rclcpp/test/rclcpp/test_subscription_content_filter.cpp similarity index 84% rename from rclcpp/test/rclcpp/test_subscription_content_filtered_topic.cpp rename to rclcpp/test/rclcpp/test_subscription_content_filter.cpp index 3ed2df96a6..01da0a8f2d 100644 --- a/rclcpp/test/rclcpp/test_subscription_content_filtered_topic.cpp +++ b/rclcpp/test/rclcpp/test_subscription_content_filter.cpp @@ -81,24 +81,24 @@ TEST_F(TestCftSubscription, is_cft_enabled) { } } -TEST_F(TestCftSubscription, get_cft_expression_parameters_error) { +TEST_F(TestCftSubscription, get_content_filter_error) { auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_subscription_get_cft_expression_parameters, RCL_RET_ERROR); + "lib:rclcpp", rcl_subscription_get_content_filter, RCL_RET_ERROR); std::string filter_expression; std::vector expression_parameters; EXPECT_THROW( - sub->get_cft_expression_parameters(filter_expression, expression_parameters), + sub->get_content_filter(filter_expression, expression_parameters), rclcpp::exceptions::RCLError); } -TEST_F(TestCftSubscription, set_cft_expression_parameters_error) { +TEST_F(TestCftSubscription, set_content_filter_error) { auto mock = mocking_utils::patch_and_return( - "lib:rclcpp", rcl_subscription_set_cft_expression_parameters, RCL_RET_ERROR); + "lib:rclcpp", rcl_subscription_set_content_filter, RCL_RET_ERROR); std::string filter_expression = "int32_value = %0"; std::string expression_parameter = "100"; EXPECT_THROW( - sub->set_cft_expression_parameters(filter_expression, {expression_parameter}), + sub->set_content_filter(filter_expression, {expression_parameter}), rclcpp::exceptions::RCLError); } From f509a3469b790fd80311a89e2931b420e7ca9a16 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Fri, 14 Jan 2022 15:38:26 +0800 Subject: [PATCH 12/14] refactor test Signed-off-by: Chen Lihui --- rclcpp/include/rclcpp/wait_for_message.hpp | 3 + rclcpp/test/rclcpp/CMakeLists.txt | 25 +- .../test_subscription_content_filter.cpp | 221 +++++++++++++++++- 3 files changed, 231 insertions(+), 18 deletions(-) diff --git a/rclcpp/include/rclcpp/wait_for_message.hpp b/rclcpp/include/rclcpp/wait_for_message.hpp index 20c9df4db6..352fae022a 100644 --- a/rclcpp/include/rclcpp/wait_for_message.hpp +++ b/rclcpp/include/rclcpp/wait_for_message.hpp @@ -56,6 +56,9 @@ bool wait_for_message( wait_set.add_subscription(subscription); wait_set.add_guard_condition(gc); auto ret = wait_set.wait(time_to_wait); + // if it is fixed in other PR, remove them and rebase. + wait_set.remove_subscription(subscription); + wait_set.remove_guard_condition(gc); if (ret.kind() != rclcpp::WaitResultKind::Ready) { return false; } diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index d00b5f3ab2..619867f8b2 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -740,14 +740,19 @@ if(TARGET test_graph_listener) target_link_libraries(test_graph_listener ${PROJECT_NAME} mimick) endif() -ament_add_gtest(test_subscription_content_filter test_subscription_content_filter.cpp) -if(TARGET test_subscription_content_filter) - ament_target_dependencies(test_subscription_content_filter - "rcl_interfaces" - "rmw" - "rosidl_runtime_cpp" - "rosidl_typesupport_cpp" - "test_msgs" +function(test_subscription_content_filter_for_rmw_implementation) + set(rmw_implementation_env_var RMW_IMPLEMENTATION=${rmw_implementation}) + ament_add_gmock(test_subscription_content_filter${target_suffix} + test_subscription_content_filter.cpp + ENV ${rmw_implementation_env_var} ) - target_link_libraries(test_subscription_content_filter ${PROJECT_NAME} mimick) -endif() + if(TARGET test_subscription_content_filter${target_suffix}) + target_link_libraries(test_subscription_content_filter${target_suffix} ${PROJECT_NAME} mimick) + ament_target_dependencies(test_subscription_content_filter${target_suffix} + "rcpputils" + "rosidl_typesupport_cpp" + "test_msgs" + ) + endif() +endfunction() +call_for_each_rmw_implementation(test_subscription_content_filter_for_rmw_implementation) diff --git a/rclcpp/test/rclcpp/test_subscription_content_filter.cpp b/rclcpp/test/rclcpp/test_subscription_content_filter.cpp index 01da0a8f2d..ab6998517c 100644 --- a/rclcpp/test/rclcpp/test_subscription_content_filter.cpp +++ b/rclcpp/test/rclcpp/test_subscription_content_filter.cpp @@ -23,6 +23,7 @@ #include "rclcpp/exceptions.hpp" #include "rclcpp/rclcpp.hpp" +#include "rclcpp/wait_for_message.hpp" #include "../mocking_utils/patch.hpp" #include "../utils/rclcpp_gtest_macros.hpp" @@ -30,7 +31,14 @@ #include "test_msgs/msg/basic_types.hpp" #include "test_msgs/msg/empty.hpp" -class TestCftSubscription : public ::testing::Test +#ifdef RMW_IMPLEMENTATION +# define CLASSNAME_(NAME, SUFFIX) NAME ## __ ## SUFFIX +# define CLASSNAME(NAME, SUFFIX) CLASSNAME_(NAME, SUFFIX) +#else +# define CLASSNAME(NAME, SUFFIX) NAME +#endif + +class CLASSNAME (TestContentFilterSubscription, RMW_IMPLEMENTATION) : public ::testing::Test { public: static void SetUpTestCase() @@ -45,15 +53,17 @@ class TestCftSubscription : public ::testing::Test void SetUp() { - node = std::make_shared("test_cft_subscription", "/ns"); + node = std::make_shared("test_content_filter_node", "/ns"); + context = node->get_node_options().context(); + qos.reliable().transient_local(); auto options = rclcpp::SubscriptionOptions(); - options.content_filter_options.filter_expression = "int32_value = %0"; - options.content_filter_options.expression_parameters = {"10"}; + options.content_filter_options.filter_expression = filter_expression_init; + options.content_filter_options.expression_parameters = expression_parameters_1; auto callback = [](std::shared_ptr) {}; sub = node->create_subscription( - "topic", 10, callback, options); + "content_filter_topic", qos, callback, options); } void TearDown() @@ -62,12 +72,49 @@ class TestCftSubscription : public ::testing::Test node.reset(); } + template + bool wait_for(const Condition & condition, const Duration & timeout) + { + using clock = std::chrono::system_clock; + auto start = clock::now(); + while (!condition()) { + if ((clock::now() - start) > timeout) { + return false; + } + rclcpp::spin_some(node); + } + return true; + } + protected: rclcpp::Node::SharedPtr node; + rclcpp::Context::SharedPtr context; + rclcpp::QoS qos{rclcpp::KeepLast{10}}; rclcpp::Subscription::SharedPtr sub; + + std::string filter_expression_init = "int32_value = %0"; + std::vector expression_parameters_1 = {"3"}; + std::vector expression_parameters_2 = {"4"}; }; -TEST_F(TestCftSubscription, is_cft_enabled) { +bool operator==(const test_msgs::msg::BasicTypes & m1, const test_msgs::msg::BasicTypes & m2) +{ + return m1.bool_value == m2.bool_value && + m1.byte_value == m2.byte_value && + m1.char_value == m2.char_value && + m1.float32_value == m2.float32_value && + m1.float64_value == m2.float64_value && + m1.int8_value == m2.int8_value && + m1.uint8_value == m2.uint8_value && + m1.int16_value == m2.int16_value && + m1.uint16_value == m2.uint16_value && + m1.int32_value == m2.int32_value && + m1.uint32_value == m2.uint32_value && + m1.int64_value == m2.int64_value && + m1.uint64_value == m2.uint64_value; +} + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), is_cft_enabled) { { auto mock = mocking_utils::patch_and_return( "lib:rclcpp", rcl_subscription_is_cft_enabled, false); @@ -81,7 +128,7 @@ TEST_F(TestCftSubscription, is_cft_enabled) { } } -TEST_F(TestCftSubscription, get_content_filter_error) { +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), get_content_filter_error) { auto mock = mocking_utils::patch_and_return( "lib:rclcpp", rcl_subscription_get_content_filter, RCL_RET_ERROR); @@ -92,7 +139,7 @@ TEST_F(TestCftSubscription, get_content_filter_error) { rclcpp::exceptions::RCLError); } -TEST_F(TestCftSubscription, set_content_filter_error) { +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), set_content_filter_error) { auto mock = mocking_utils::patch_and_return( "lib:rclcpp", rcl_subscription_set_content_filter, RCL_RET_ERROR); @@ -102,3 +149,161 @@ TEST_F(TestCftSubscription, set_content_filter_error) { sub->set_content_filter(filter_expression, {expression_parameter}), rclcpp::exceptions::RCLError); } + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), get_content_filter) { + std::string filter_expression; + std::vector expression_parameters; + + if (sub->is_cft_enabled()) { + EXPECT_NO_THROW( + sub->get_content_filter(filter_expression, expression_parameters)); + + EXPECT_EQ(filter_expression, filter_expression_init); + EXPECT_EQ(expression_parameters, expression_parameters_1); + } else { + EXPECT_THROW( + sub->get_content_filter(filter_expression, expression_parameters), + rclcpp::exceptions::RCLError); + } +} + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), set_content_filter) { + if (sub->is_cft_enabled()) { + EXPECT_NO_THROW( + sub->set_content_filter(filter_expression_init, expression_parameters_2)); + } else { + EXPECT_THROW( + sub->set_content_filter(filter_expression_init, expression_parameters_2), + rclcpp::exceptions::RCLError); + } +} + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), content_filter_get_begin) { + using namespace std::chrono_literals; + { + test_msgs::msg::BasicTypes msg; + rclcpp::MessageInfo msg_info; + EXPECT_FALSE(sub->take(msg, msg_info)); + } + { + rclcpp::PublisherOptions po; + auto pub = node->create_publisher("content_filter_topic", qos, po); + + auto connected = [pub, sub = this->sub]() -> bool { + return pub->get_subscription_count() && sub->get_publisher_count(); + }; + ASSERT_TRUE(wait_for(connected, 5s)); + + test_msgs::msg::BasicTypes original_message; + original_message.int32_value = 3; + pub->publish(original_message); + + test_msgs::msg::BasicTypes output_message; + bool receive = wait_for_message(output_message, sub, context, 5s); + EXPECT_TRUE(receive); + EXPECT_EQ(original_message, output_message); + + if (sub->is_cft_enabled()) { + EXPECT_NO_THROW( + sub->set_content_filter(filter_expression_init, expression_parameters_2)); + + test_msgs::msg::BasicTypes original_message; + original_message.int32_value = 3; + pub->publish(original_message); + + test_msgs::msg::BasicTypes output_message; + bool receive = wait_for_message(output_message, sub, context, 5s); + EXPECT_FALSE(receive); + } + } +} + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), content_filter_get_later) { + using namespace std::chrono_literals; + { + test_msgs::msg::BasicTypes msg; + rclcpp::MessageInfo msg_info; + EXPECT_FALSE(sub->take(msg, msg_info)); + } + { + rclcpp::PublisherOptions po; + auto pub = node->create_publisher("content_filter_topic", qos, po); + + auto connected = [pub, sub = this->sub]() -> bool { + return pub->get_subscription_count() && sub->get_publisher_count(); + }; + ASSERT_TRUE(wait_for(connected, 5s)); + + test_msgs::msg::BasicTypes original_message; + original_message.int32_value = 4; + pub->publish(original_message); + + test_msgs::msg::BasicTypes output_message; + bool receive = wait_for_message(output_message, sub, context, 5s); + if (sub->is_cft_enabled()) { + EXPECT_FALSE(receive); + } else { + EXPECT_TRUE(receive); + EXPECT_EQ(original_message, output_message); + } + + if (sub->is_cft_enabled()) { + EXPECT_NO_THROW( + sub->set_content_filter(filter_expression_init, expression_parameters_2)); + + test_msgs::msg::BasicTypes original_message; + original_message.int32_value = 4; + pub->publish(original_message); + + test_msgs::msg::BasicTypes output_message; + bool receive = wait_for_message(output_message, sub, context, 5s); + EXPECT_TRUE(receive); + EXPECT_EQ(original_message, output_message); + } + } +} + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), content_filter_reset) { + using namespace std::chrono_literals; + { + test_msgs::msg::BasicTypes msg; + rclcpp::MessageInfo msg_info; + EXPECT_FALSE(sub->take(msg, msg_info)); + } + { + rclcpp::PublisherOptions po; + auto pub = node->create_publisher("content_filter_topic", qos, po); + + auto connected = [pub, sub = this->sub]() -> bool { + return pub->get_subscription_count() && sub->get_publisher_count(); + }; + ASSERT_TRUE(wait_for(connected, 5s)); + + test_msgs::msg::BasicTypes original_message; + original_message.int32_value = 4; + pub->publish(original_message); + + test_msgs::msg::BasicTypes output_message; + bool receive = wait_for_message(output_message, sub, context, 5s); + if (sub->is_cft_enabled()) { + EXPECT_FALSE(receive); + } else { + EXPECT_TRUE(receive); + EXPECT_EQ(original_message, output_message); + } + + if (sub->is_cft_enabled()) { + EXPECT_NO_THROW( + sub->set_content_filter("")); + + test_msgs::msg::BasicTypes original_message; + original_message.int32_value = 4; + pub->publish(original_message); + + test_msgs::msg::BasicTypes output_message; + bool receive = wait_for_message(output_message, sub, context, 5s); + EXPECT_TRUE(receive); + EXPECT_EQ(original_message, output_message); + } + } +} From 8c66c4172db19e15b769cb2d6ebf1955e521fb39 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Mon, 17 Jan 2022 15:04:26 +0800 Subject: [PATCH 13/14] add content filter for parameter event subscription Signed-off-by: Chen Lihui --- rclcpp/src/rclcpp/time_source.cpp | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/rclcpp/src/rclcpp/time_source.cpp b/rclcpp/src/rclcpp/time_source.cpp index 140b5531d2..1e19c96e5b 100644 --- a/rclcpp/src/rclcpp/time_source.cpp +++ b/rclcpp/src/rclcpp/time_source.cpp @@ -126,10 +126,19 @@ void TimeSource::attachNode( return result; }); + const rclcpp::QoS qos = ( + rclcpp::QoS(rclcpp::QoSInitialization::from_rmw(rmw_qos_profile_parameter_events)) + ); + rclcpp::SubscriptionOptions options; + options.content_filter_options.filter_expression = + std::string("node = '") + node_base_->get_fully_qualified_name() + "'"; + // TODO(tfoote) use parameters interface not subscribe to events via topic ticketed #609 parameter_subscription_ = rclcpp::AsyncParametersClient::on_parameter_event( node_topics_, - std::bind(&TimeSource::on_parameter_event, this, std::placeholders::_1)); + std::bind(&TimeSource::on_parameter_event, this, std::placeholders::_1), + qos, + options); } void TimeSource::detachNode() @@ -299,6 +308,11 @@ void TimeSource::on_parameter_event( { // Filter out events on 'use_sim_time' parameter instances in other nodes. if (event->node != node_base_->get_fully_qualified_name()) { + if (parameter_subscription_->is_cft_enabled()) { + RCLCPP_ERROR( + logger_, "this node '%s' should not get parameter event of another node '%s'", + node_base_->get_fully_qualified_name(), event->node.c_str()); + } return; } // Filter for only 'use_sim_time' being added or changed. From 50aeb0aea489781f10b8575fbe3724a9653714e3 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Mon, 17 Jan 2022 15:54:45 +0800 Subject: [PATCH 14/14] add/remove guid to action client while request/handle_message to support content filter for action client feedback subscription Signed-off-by: Chen Lihui --- .../include/rclcpp_action/client.hpp | 21 +++++++++ rclcpp_action/src/client.cpp | 46 +++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/rclcpp_action/include/rclcpp_action/client.hpp b/rclcpp_action/include/rclcpp_action/client.hpp index 8294f2be68..7887a5338e 100644 --- a/rclcpp_action/include/rclcpp_action/client.hpp +++ b/rclcpp_action/include/rclcpp_action/client.hpp @@ -158,6 +158,18 @@ class ClientBase : public rclcpp::Waitable GoalUUID generate_goal_id(); + /// \internal + RCLCPP_ACTION_PUBLIC + virtual + bool + add_goal_uuid(const GoalUUID & goal_uuid); + + /// \internal + RCLCPP_ACTION_PUBLIC + virtual + bool + remove_goal_uuid(const GoalUUID & goal_uuid); + /// \internal RCLCPP_ACTION_PUBLIC virtual @@ -715,6 +727,15 @@ class Client : public ClientBase continue; } goal_handle->set_status(status.status); + switch (status.status) { + case GoalStatus::STATUS_SUCCEEDED: + case GoalStatus::STATUS_ABORTED: + case GoalStatus::STATUS_CANCELED: + static_cast(remove_goal_uuid(goal_id)); + break; + default: + break; + } } } diff --git a/rclcpp_action/src/client.cpp b/rclcpp_action/src/client.cpp index 5839a4f932..b1482d4900 100644 --- a/rclcpp_action/src/client.cpp +++ b/rclcpp_action/src/client.cpp @@ -120,6 +120,8 @@ class ClientBaseImpl std::independent_bits_engine< std::default_random_engine, 8, unsigned int> random_bytes_generator; + + std::mutex goal_uuids_mutex; }; ClientBase::ClientBase( @@ -302,9 +304,26 @@ ClientBase::send_goal_request(std::shared_ptr request, ResponseCallback ca { std::unique_lock guard(pimpl_->goal_requests_mutex); int64_t sequence_number; + // goal_id, which type is unique_identifier_msgs::msg::UUID, + // is the first member in ActionT::Impl::SendGoalService::Request + auto goal_id = std::static_pointer_cast(request); + if (!add_goal_uuid(goal_id->uuid)) { + RCLCPP_DEBUG( + get_logger(), + "failed to add goal uuid for setting content filtered topic for action subscriptions: %s", + rcl_get_error_string().str); + rcl_reset_error(); + } rcl_ret_t ret = rcl_action_send_goal_request( pimpl_->client_handle.get(), request.get(), &sequence_number); if (RCL_RET_OK != ret) { + if (!remove_goal_uuid(goal_id->uuid)) { + RCLCPP_DEBUG( + get_logger(), + "failed to remove goal uuid: %s", + rcl_get_error_string().str); + rcl_reset_error(); + } rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send goal request"); } assert(pimpl_->pending_goal_responses.count(sequence_number) == 0); @@ -383,6 +402,33 @@ ClientBase::generate_goal_id() return goal_id; } +bool +ClientBase::add_goal_uuid(const GoalUUID & goal_uuid) +{ + std::lock_guard guard(pimpl_->goal_uuids_mutex); + + rcl_ret_t ret = rcl_action_add_goal_uuid( + pimpl_->client_handle.get(), + static_cast(goal_uuid.data())); + if (RCL_RET_OK != ret) { + return false; + } + return true; +} + +bool +ClientBase::remove_goal_uuid(const GoalUUID & goal_uuid) +{ + std::lock_guard guard(pimpl_->goal_uuids_mutex); + rcl_ret_t ret = rcl_action_remove_goal_uuid( + pimpl_->client_handle.get(), + static_cast(goal_uuid.data())); + if (RCL_RET_OK != ret) { + return false; + } + return true; +} + std::shared_ptr ClientBase::take_data() {