diff --git a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp index 17dec389..2deeb24b 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp @@ -44,7 +44,8 @@ rcutils_ret_t rcutils_uint8_array_copy( rcutils_uint8_array_t * const dst, - const rcutils_uint8_array_t * const src); + const rcutils_uint8_array_t * const src, + const bool realloc_if_needed = true); rmw_qos_policy_kind_t dds_qos_policy_to_rmw_qos_policy(const DDS_QosPolicyId_t last_policy_id); diff --git a/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp b/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp index 0af69c85..dcd16b32 100644 --- a/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp +++ b/rmw_connextdds_common/include/rmw_connextdds/type_support.hpp @@ -15,6 +15,7 @@ #ifndef RMW_CONNEXTDDS__TYPE_SUPPORT_HPP_ #define RMW_CONNEXTDDS__TYPE_SUPPORT_HPP_ +#include #include #include @@ -56,10 +57,16 @@ struct RMW_Connext_RequestReplyMessage class RMW_Connext_MessageTypeSupport { + const rosidl_message_type_support_t * _message_type_support; const rosidl_message_type_support_t * _type_support_fastrtps; + message_type_support_key_callbacks_t _key_callbacks; bool _unbounded; bool _empty; + bool _keyed; + bool _unbounded_key; + bool _is_cpp; uint32_t _serialized_size_max; + uint32_t _key_serialized_size_max; std::string _type_name; RMW_Connext_MessageType _message_type; rmw_context_impl_t * const _ctx; @@ -79,6 +86,11 @@ class RMW_Connext_MessageTypeSupport this->_type_support_fastrtps->data); } + const rosidl_message_type_support_t * message_type_support() const + { + return this->_message_type_support; + } + rmw_context_impl_t * ctx() const { return this->_ctx; @@ -99,6 +111,14 @@ class RMW_Connext_MessageTypeSupport return this->_serialized_size_max; } + uint32_t type_key_serialized_size_max() const + { + if (!this->_keyed) { + return 0; + } + return this->_key_serialized_size_max; + } + bool unbounded() const { return this->_unbounded; @@ -109,6 +129,21 @@ class RMW_Connext_MessageTypeSupport return this->_empty; } + bool keyed() const + { + return this->_keyed; + } + + bool unbounded_key() const + { + return this->_unbounded_key; + } + + bool is_cpp() const + { + return this->_is_cpp; + } + RMW_Connext_MessageType message_type() const { return this->_message_type; @@ -139,14 +174,32 @@ class RMW_Connext_MessageTypeSupport rmw_ret_t serialize( const void * const ros_msg, - rcutils_uint8_array_t * const to_buffer); + rcutils_uint8_array_t * const to_buffer, + const bool include_encapsulation = true); rmw_ret_t deserialize( void * const ros_msg, const rcutils_uint8_array_t * const from_buffer, - size_t & size_out, const bool header_only = false); + rmw_ret_t serialize_key( + const void * const ros_msg, + rcutils_uint8_array_t * const to_buffer, + RTIEncapsulationId encapsulation_id, + const bool include_encapsulation = true); + + // Not available in message_type_support_key_callbacks_t yet + // + // Because deserialize_key is not supported yet there will be errors receiving dispose samples + // that have the serialized key as payload and the sample will be reported as lost. + // + // By default, dispose samples do not contain the serialized key, so this should only be a + // problem if the DataWriterQos.protocol.serialize_key_with_dispose is set to true. + // + // rmw_ret_t deserialize_key( + // void * const ros_msg, + // const rcutils_uint8_array_t * const from_buffer); + static RMW_Connext_MessageTypeSupport * register_type_support( @@ -174,7 +227,11 @@ class RMW_Connext_MessageTypeSupport const rosidl_message_type_support_t * const type_support, uint32_t & serialized_size_max, bool & unbounded, - bool & empty); + bool & empty, + bool & keyed, + bool & unbounded_key, + message_type_support_key_callbacks_t & key_callbacks, + uint32_t & key_serialized_size_max); }; struct RMW_Connext_Message diff --git a/rmw_connextdds_common/src/common/rmw_impl.cpp b/rmw_connextdds_common/src/common/rmw_impl.cpp index ba5329f7..d0e5a456 100644 --- a/rmw_connextdds_common/src/common/rmw_impl.cpp +++ b/rmw_connextdds_common/src/common/rmw_impl.cpp @@ -69,10 +69,15 @@ rmw_connextdds_create_topic_name( rcutils_ret_t rcutils_uint8_array_copy( rcutils_uint8_array_t * const dst, - const rcutils_uint8_array_t * const src) + const rcutils_uint8_array_t * const src, + const bool realloc_if_needed) { if (src->buffer_length > 0) { if (src->buffer_length > dst->buffer_capacity) { + if (!realloc_if_needed) { + return RCUTILS_RET_ERROR; + } + rcutils_ret_t rc = rcutils_uint8_array_resize(dst, src->buffer_length); @@ -1844,12 +1849,9 @@ RMW_Connext_Subscriber::take_next( // request header. if (this->type_support->type_requestreply()) { if (this->ctx->request_reply_mapping == RMW_Connext_RequestReplyMapping::Basic) { - size_t deserialized_size = 0; - UNUSED_ARG(deserialized_size); - if (RMW_RET_OK != this->type_support->deserialize( - ros_message, &msg->data_buffer, deserialized_size, true /* header_only */)) + ros_message, &msg->data_buffer, true /* header_only */)) { RMW_CONNEXT_LOG_ERROR_SET("failed to deserialize taken sample") rc_exit = RMW_RET_ERROR; @@ -1895,11 +1897,8 @@ RMW_Connext_Subscriber::take_next( continue; } } else { - size_t deserialized_size = 0; - if (RMW_RET_OK != - this->type_support->deserialize( - ros_message, &msg->data_buffer, deserialized_size)) + this->type_support->deserialize(ros_message, &msg->data_buffer)) { RMW_CONNEXT_LOG_ERROR_SET( "failed to deserialize taken sample") diff --git a/rmw_connextdds_common/src/common/rmw_type_support.cpp b/rmw_connextdds_common/src/common/rmw_type_support.cpp index f20ae70e..8891fa89 100644 --- a/rmw_connextdds_common/src/common/rmw_type_support.cpp +++ b/rmw_connextdds_common/src/common/rmw_type_support.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include @@ -224,12 +225,17 @@ RMW_Connext_MessageTypeSupport::RMW_Connext_MessageTypeSupport( const rosidl_message_type_support_t * const type_supports, const char * const type_name, rmw_context_impl_t * const ctx) -: _type_support_fastrtps( +: _message_type_support(nullptr), + _type_support_fastrtps( RMW_Connext_MessageTypeSupport::get_type_support_fastrtps( type_supports)), _unbounded(false), _empty(false), + _keyed(false), + _unbounded_key(false), + _is_cpp(false), _serialized_size_max(0), + _key_serialized_size_max(0), _type_name(), _message_type(message_type), _ctx(ctx) @@ -261,7 +267,16 @@ RMW_Connext_MessageTypeSupport::RMW_Connext_MessageTypeSupport( this->_type_support_fastrtps, this->_serialized_size_max, this->_unbounded, - this->_empty); + this->_empty, + this->_keyed, + this->_unbounded_key, + this->_key_callbacks, + this->_key_serialized_size_max); + + if (keyed()) { + _message_type_support = RMW_Connext_MessageTypeSupport::get_type_support_intro( + type_supports, _is_cpp); + } if (this->type_requestreply() && this->_ctx->request_reply_mapping == RMW_Connext_RequestReplyMapping::Basic) @@ -270,22 +285,34 @@ RMW_Connext_MessageTypeSupport::RMW_Connext_MessageTypeSupport( RMW_Connext_RequestReplyMapping_Basic_serialized_size_max(this); } + // TODO(fgallegosalido) preallocate serialized key buffer + // if (this->keyed() && !this->unbounded_key()) { + // // ... + // } + RMW_CONNEXT_LOG_DEBUG_A( "[type support] new %s: " "type.unbounded=%d, " "type.empty=%d, " + "type.keyed=%d," + "type.unbounded_key=%d," "type.serialized_size_max=%u, " + "type.key_serialized_size_max=%u, " "type.reqreply=%d", this->type_name(), this->_unbounded, this->_empty, + this->_keyed, + this->_unbounded_key, this->_serialized_size_max, + this->_key_serialized_size_max, this->type_requestreply()) } rmw_ret_t RMW_Connext_MessageTypeSupport::serialize( const void * const ros_msg, - rcutils_uint8_array_t * const to_buffer) + rcutils_uint8_array_t * const to_buffer, + const bool include_encapsulation) { auto callbacks = static_cast( @@ -316,7 +343,9 @@ rmw_ret_t RMW_Connext_MessageTypeSupport::serialize( const void * payload = ros_msg; try { - cdr_stream.serialize_encapsulation(); + if (include_encapsulation) { + cdr_stream.serialize_encapsulation(); + } if (this->type_requestreply()) { const RMW_Connext_RequestReplyMessage * const rr_msg = @@ -379,7 +408,6 @@ rmw_ret_t RMW_Connext_MessageTypeSupport::deserialize( void * const ros_msg, const rcutils_uint8_array_t * const from_buffer, - size_t & size_out, const bool header_only) { auto callbacks = @@ -475,17 +503,106 @@ RMW_Connext_MessageTypeSupport::deserialize( return RMW_RET_ERROR; } - size_out = cdr_stream.get_serialized_data_length(); + return RMW_RET_OK; +} + +rmw_ret_t +RMW_Connext_MessageTypeSupport::serialize_key( + const void * const ros_msg, + rcutils_uint8_array_t * const to_buffer, + RTIEncapsulationId encapsulation_id, + const bool include_encapsulation) +{ + if (!this->keyed()) { + return RMW_RET_ERROR; + } + + eprosima::fastcdr::FastBuffer cdr_buffer( + reinterpret_cast(to_buffer->buffer), + to_buffer->buffer_capacity); + eprosima::fastcdr::Cdr cdr_stream( + cdr_buffer, + (RTICdrEncapsulation_isBigEndianCdrEncapsulationId(encapsulation_id)) ? + eprosima::fastcdr::Cdr::BIG_ENDIANNESS : + eprosima::fastcdr::Cdr::LITTLE_ENDIANNESS, + eprosima::fastcdr::CdrVersion::XCDRv1); + cdr_stream.set_encoding_flag( + eprosima::fastcdr::EncodingAlgorithmFlag::PLAIN_CDR); + + try { + if (include_encapsulation) { + cdr_stream.serialize_encapsulation(); + } + if (!this->_key_callbacks.cdr_serialize_key(ros_msg, cdr_stream)) { + return RMW_RET_ERROR; + } + } catch(const eprosima::fastcdr::exception::NotEnoughMemoryException & exc) { + // In Release mode, the logging is disabled at compile time, so we mark this + // variable as unused to avoid a warning. + UNUSED_ARG(exc); + // RMW_Connext_TypePlugin_serialize_key is called twice in + // RMW_Connext_TypePlugin_instance_to_key_hash. It can fail the first time + // it is called due to not enough memory, and then allocate the necessary + // memory, so we should not print an error message here. + RMW_CONNEXT_LOG_DEBUG_A( + "Failed to serialize key for %s sample: %s", this->type_name(), exc.what()); + return RMW_RET_ERROR; + } catch (const std::exception & exc) { + RMW_CONNEXT_LOG_ERROR_A_SET( + "Failed to serialize key for %s sample: %s", this->type_name(), exc.what()) + return RMW_RET_ERROR; + } catch (...) { + RMW_CONNEXT_LOG_ERROR_A_SET("Failed to serialize key for %s sample", this->type_name()) + return RMW_RET_ERROR; + } + + to_buffer->buffer_length = cdr_stream.get_serialized_data_length(); RMW_CONNEXT_LOG_DEBUG_A( - "[type support] %s deserialized: " - "consumed=%lu", + "[type support] %s serialized key: " + "buffer.length=%lu", this->_type_name.c_str(), - size_out) + to_buffer->buffer_length); return RMW_RET_OK; } +// TODO(fgallegosalido): Revisit key deserialization in the future. +// rmw_ret_t +// RMW_Connext_MessageTypeSupport::deserialize_key( +// void * const ros_msg, +// const rcutils_uint8_array_t * const from_buffer) +// { +// if (!this->keyed()) { +// return RMW_RET_ERROR; +// } + +// eprosima::fastcdr::FastBuffer cdr_buffer( +// reinterpret_cast(from_buffer->buffer), +// from_buffer->buffer_length); +// eprosima::fastcdr::Cdr cdr_stream( +// cdr_buffer, +// eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, +// eprosima::fastcdr::CdrVersion::XCDRv1); + +// void * payload = ros_msg; + +// try { +// cdr_stream.read_encapsulation(); +// if (!this->_key_callbacks.cdr_deserialize_key(cdr_stream, payload)) { +// return RMW_RET_ERROR; +// } +// } catch (const std::exception & exc) { +// RMW_CONNEXT_LOG_ERROR_A_SET( +// "Failed to deserialize key for %s sample: %s", this->type_name(), exc.what()) +// return RMW_RET_ERROR; +// } catch (...) { +// RMW_CONNEXT_LOG_ERROR_A_SET("Failed to deserialize key for %s sample", this->type_name()) +// return RMW_RET_ERROR; +// } + +// return RMW_RET_OK; +// } uint32_t RMW_Connext_MessageTypeSupport::serialized_size_max( const void * const ros_msg, @@ -664,14 +781,21 @@ void RMW_Connext_MessageTypeSupport::type_info( const rosidl_message_type_support_t * const type_support, uint32_t & serialized_size_max, bool & unbounded, - bool & empty) + bool & empty, + bool & keyed, + bool & unbounded_key, + message_type_support_key_callbacks_t & key_callbacks, + uint32_t & key_serialized_size_max) { serialized_size_max = 0; unbounded = false; empty = false; + keyed = false; + unbounded_key = false; auto callbacks = static_cast(type_support->data); + keyed = callbacks->key_callbacks != nullptr; /* The fastrtps type support sets full_bounded to false if unbounded, but assumes full_bounded == true by default */ @@ -687,17 +811,33 @@ void RMW_Connext_MessageTypeSupport::type_info( static_cast(callbacks->max_serialized_size(full_bounded)); #endif - unbounded = !full_bounded; - if (full_bounded && serialized_size_max == 0) { /* Empty message */ empty = true; serialized_size_max = 1; } + unbounded = !full_bounded; + /* add encapsulation size to static serialized_size_max */ - serialized_size_max += - RMW_Connext_MessageTypeSupport::ENCAPSULATION_HEADER_SIZE; + if (!unbounded) { + serialized_size_max += + RMW_Connext_MessageTypeSupport::ENCAPSULATION_HEADER_SIZE; + } else { + serialized_size_max = RTI_CDR_MAX_SERIALIZED_SIZE; + } + + if (keyed) { + key_callbacks = *callbacks->key_callbacks; + key_serialized_size_max = key_callbacks.max_serialized_size_key(unbounded_key); + /* add encapsulation size to static serialized_size_max */ + if (!unbounded_key) { + key_serialized_size_max += + RMW_Connext_MessageTypeSupport::ENCAPSULATION_HEADER_SIZE; + } else { + key_serialized_size_max = RTI_CDR_MAX_SERIALIZED_SIZE; + } + } } diff --git a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp index d9fcd91d..e1d91c20 100644 --- a/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/dds_api_ndds.cpp @@ -796,27 +796,25 @@ rmw_connextdds_take_samples( DDS_Long data_len = 0; void ** data_buffer = nullptr; + // TODO(fgallegosalido): Use DDS_DataReader_read_or_take_instance_untypedI + // when ROS2 support for instances is added. DDS_ReturnCode_t rc = - DDS_DataReader_read_or_take_instance_untypedI( - sub->reader(), - &is_loan, - &data_buffer, - &data_len, - sub->info_seq(), - 0 /* data_seq_len */, - 0 /* data_seq_max_len */, - DDS_BOOLEAN_TRUE /* data_seq_has_ownership */, - NULL /* data_seq_contiguous_buffer_for_copy */, - 1 /* data_size -- ignored because loaning*/, - DDS_LENGTH_UNLIMITED /* max_samples */, - &DDS_HANDLE_NIL /* a_handle */, -#if !RMW_CONNEXT_DDS_API_PRO_LEGACY - NULL /* topic_query_guid */, -#endif /* RMW_CONNEXT_DDS_API_PRO_LEGACY */ - DDS_ANY_SAMPLE_STATE, - DDS_ANY_VIEW_STATE, - DDS_ANY_INSTANCE_STATE, - DDS_BOOLEAN_TRUE /* take */); + DDS_DataReader_read_or_take_untypedI( + sub->reader(), + &is_loan, + &data_buffer, + &data_len, + sub->info_seq(), + 0 /* data_seq_len */, + 0 /* data_seq_max_len */, + DDS_BOOLEAN_TRUE /* data_seq_has_ownership */, + NULL /* data_seq_contiguous_buffer_for_copy */, + 1 /* data_size -- ignored because loaning*/, + DDS_LENGTH_UNLIMITED /* max_samples */, + DDS_ANY_SAMPLE_STATE, + DDS_ANY_VIEW_STATE, + DDS_ANY_INSTANCE_STATE, + DDS_BOOLEAN_TRUE /* take */); if (DDS_RETCODE_OK != rc) { if (DDS_RETCODE_NO_DATA == rc) { return RMW_RET_OK; @@ -824,7 +822,8 @@ rmw_connextdds_take_samples( RMW_CONNEXT_LOG_ERROR_SET("failed to take data from DDS reader") return RMW_RET_ERROR; } - RMW_CONNEXT_ASSERT(data_len > 0)(void) RMW_Connext_MessagePtrSeq_loan_contiguous( + RMW_CONNEXT_ASSERT(data_len > 0); + (void) RMW_Connext_MessagePtrSeq_loan_contiguous( sub->data_seq(), reinterpret_cast(data_buffer), data_len, @@ -882,7 +881,9 @@ rmw_connextdds_count_unread_samples( unread_count = 0; DDS_ReturnCode_t rc = DDS_RETCODE_ERROR; do { - rc = DDS_DataReader_read_or_take_instance_untypedI( + // TODO(fgallegosalido): Use DDS_DataReader_read_or_take_instance_untypedI + // when ROS 2 support for instances is added. + rc = DDS_DataReader_read_or_take_untypedI( sub->reader(), &is_loan, &data_buffer, @@ -894,10 +895,6 @@ rmw_connextdds_count_unread_samples( NULL /* data_seq_contiguous_buffer_for_copy */, 1 /* data_size -- ignored because loaning*/, DDS_LENGTH_UNLIMITED /* max_samples */, - &DDS_HANDLE_NIL /* a_handle */, - #if !RMW_CONNEXT_DDS_API_PRO_LEGACY - NULL /* topic_query_guid */, - #endif /* RMW_CONNEXT_DDS_API_PRO_LEGACY */ DDS_NOT_READ_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE, diff --git a/rmw_connextdds_common/src/ndds/rmw_type_support_ndds.cpp b/rmw_connextdds_common/src/ndds/rmw_type_support_ndds.cpp index ddf172f2..f2535f71 100644 --- a/rmw_connextdds_common/src/ndds/rmw_type_support_ndds.cpp +++ b/rmw_connextdds_common/src/ndds/rmw_type_support_ndds.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include "rcpputils/scope_exit.hpp" @@ -180,6 +181,14 @@ RMW_Connext_TypePlugin_get_serialized_sample_size( unsigned int current_alignment, const void * sample); +static +unsigned int +RMW_Connext_TypePlugin_get_serialized_key_max_size( + PRESTypePluginEndpointData endpoint_data, + RTIBool include_encapsulation, + RTIEncapsulationId encapsulation_id, + unsigned int size); + RTIBool RMW_Connext_TypePlugin_get_buffer( PRESTypePluginEndpointData endpointData, @@ -268,6 +277,44 @@ RMW_Connext_TypePlugin_destroy_data(void ** sample, void * user_data) delete msg; } +static +RTIBool +RMW_Connext_TypePlugin_create_key(void ** key, void * user_data) +{ + RMW_Connext_MessageTypeSupport * const type_support = + reinterpret_cast(user_data); + + const auto buffer_size = (type_support->unbounded_key()) ? + 0 : type_support->type_key_serialized_size_max(); + + RMW_Connext_Message * msg = new (std::nothrow) RMW_Connext_Message(); + if (nullptr == msg) { + return RTI_FALSE; + } + if (RMW_RET_OK != RMW_Connext_Message_initialize(msg, type_support, buffer_size)) { + delete msg; + return RTI_FALSE; + } + *key = msg; + + return RTI_TRUE; +} + +static +void +RMW_Connext_TypePlugin_destroy_key(void ** key, void * user_data) +{ + RMW_Connext_MessageTypeSupport * const type_support = + reinterpret_cast(user_data); + + UNUSED_ARG(type_support); + + RMW_Connext_Message * msg = reinterpret_cast(*key); + RMW_Connext_Message_finalize(msg); + delete msg; +} + + /* ---------------------------------------------------------------------------- Callback functions: * ---------------------------------------------------------------------------- */ @@ -332,6 +379,8 @@ RMW_Connext_TypePlugin_on_endpoint_attached( RMW_Connext_NddsParticipantData * const pdata = reinterpret_cast(participant_data); + const bool keyed = pdata->type_plugin->wrapper->keyed(); + PRESTypePluginDefaultEndpointData * const epd = reinterpret_cast( PRESTypePluginDefaultEndpointData_newWithNotification( @@ -343,8 +392,12 @@ RMW_Connext_TypePlugin_on_endpoint_attached( (REDAFastBufferPoolBufferFinalizeFunction) RMW_Connext_TypePlugin_destroy_data, pdata->type_plugin->wrapper, - NULL, NULL, - NULL, NULL)); + (REDAFastBufferPoolBufferInitializeFunction) + (keyed ? RMW_Connext_TypePlugin_create_key : NULL), + (keyed ? pdata->type_plugin->wrapper : NULL), + (REDAFastBufferPoolBufferFinalizeFunction) + (keyed ? RMW_Connext_TypePlugin_destroy_key : NULL), + (keyed ? pdata->type_plugin->wrapper : NULL))); if (epd == NULL) { return NULL; @@ -352,6 +405,20 @@ RMW_Connext_TypePlugin_on_endpoint_attached( epd->userData = pdata->type_plugin->wrapper; + if (keyed) { + const unsigned int serialized_key_max_size = + RMW_Connext_TypePlugin_get_serialized_key_max_size( + epd, RTI_FALSE, RTI_CDR_ENCAPSULATION_ID_CDR_BE, 0); + + if (!PRESTypePluginDefaultEndpointData_createMD5StreamWithInfo( + epd, endpoint_info, serialized_key_max_size, 0)) + { + RMW_CONNEXT_LOG_ERROR_SET("failed to allocate keyed endpoint stream") + PRESTypePluginDefaultEndpointData_delete(epd); + return NULL; + } + } + if (endpoint_info->endpointKind == PRES_TYPEPLUGIN_ENDPOINT_WRITER) { const unsigned int serialized_sample_max_size = RMW_Connext_TypePlugin_get_serialized_sample_max_size( @@ -383,7 +450,9 @@ void RMW_Connext_TypePlugin_on_endpoint_detached( PRESTypePluginEndpointData endpoint_data) { - PRESTypePluginDefaultEndpointData_delete(endpoint_data); + if (nullptr != endpoint_data) { + PRESTypePluginDefaultEndpointData_delete(endpoint_data); + } } static @@ -441,13 +510,9 @@ RMW_Connext_TypePlugin_serialize( UNUSED_ARG(endpoint_plugin_qos); UNUSED_ARG(encapsulation_id); - if (!serialize_encapsulation) { - // currently not supported - return RTI_FALSE; - } - if (!serialize_sample) { - // currently not supported + // Not currently supported. As a result, users cannot enable batching by + // setting writer_qos.batch.enabled to TRUE. return RTI_FALSE; } @@ -470,7 +535,8 @@ RMW_Connext_TypePlugin_serialize( data_buffer.buffer_capacity = data_buffer.buffer_length; if (!msg->serialized) { - rmw_ret_t rc = msg->type_support->serialize(msg->user_data, &data_buffer); + rmw_ret_t rc = msg->type_support->serialize( + msg->user_data, &data_buffer, serialize_encapsulation); if (RMW_RET_OK != rc) { return RTI_FALSE; } @@ -480,8 +546,9 @@ RMW_Connext_TypePlugin_serialize( } else { const rcutils_uint8_array_t * const user_buffer = reinterpret_cast(msg->user_data); + // prevent rcutils_uint8_array_copy from trying to reallocate the target buffer. if (RCUTILS_RET_OK != - rcutils_uint8_array_copy(&data_buffer, user_buffer)) + rcutils_uint8_array_copy(&data_buffer, user_buffer, false /* realloc_if_needed */)) { return RTI_FALSE; } @@ -557,8 +624,10 @@ RMW_Connext_TypePlugin_get_serialized_sample_max_size( RMW_Connext_MessageTypeSupport * const type_support = reinterpret_cast(epd->userData); - // For unbounded types this function will only return the maximum size of - // the "bounded" part of the message. + if (type_support->unbounded()) { + return RTI_CDR_MAX_SERIALIZED_SIZE; + } + current_alignment += type_support->type_serialized_size_max(); if (!include_encapsulation) { @@ -577,6 +646,15 @@ RMW_Connext_TypePlugin_get_serialized_sample_min_size( RTIEncapsulationId encapsulation_id, unsigned int current_alignment) { + // The serialized sample min size is not currently available. As a workaround, + // we set it equal to the serialized sample max size. + // + // This effectively limits the number of samples in a batch to one when + // batching is constrained by max_data_bytes + // (writer_qos.batching.max_data_bytes). + // + // To allow multiple samples per batch, the user must configure batching based + // on the number of samples instead (writer_qos.batching.max_samples). return RMW_Connext_TypePlugin_get_serialized_sample_max_size( endpoint_data, include_encapsulation, @@ -611,9 +689,14 @@ RMW_Connext_TypePlugin_get_serialized_sample_size( // user_data pointer. The only samples which do not use that pointers are // the ones created internally in the DataReader queue, and it would be // unexpected for them to be passed to this function. - RMW_CONNEXT_ASSERT(nullptr != msg->user_data) + RMW_CONNEXT_ASSERT( + nullptr != msg->user_data || + (nullptr != msg->data_buffer.buffer && msg->data_buffer.buffer_length > 0)); - if (msg->serialized) { + if (nullptr == msg->user_data) { + RMW_CONNEXT_ASSERT(msg->data_buffer.buffer_length <= UINT32_MAX - current_alignment); + current_alignment += static_cast(msg->data_buffer.buffer_length); + } else if (msg->serialized) { const rcutils_uint8_array_t * const serialized_msg = reinterpret_cast(msg->user_data); RMW_CONNEXT_ASSERT(serialized_msg->buffer_length <= UINT32_MAX - current_alignment) @@ -638,11 +721,349 @@ Key Management functions: * -------------------------------------------------------------------------------------- */ static PRESTypePluginKeyKind -RMW_Connext_TypePlugin_get_key_kind(void) +RMW_Connext_TypePlugin_get_key_kind_unkeyed(void) { return PRES_TYPEPLUGIN_NO_KEY; } +static +PRESTypePluginKeyKind +RMW_Connext_TypePlugin_get_key_kind_keyed(void) +{ + return PRES_TYPEPLUGIN_USER_KEY; +} + +static +RTIBool +RMW_Connext_TypePlugin_serialize_key( + PRESTypePluginEndpointData endpoint_data, + const void *key, + struct RTICdrStream *stream, + RTIBool serialize_encapsulation, + RTIEncapsulationId encapsulation_id, + RTIBool serialize_key, + void *endpoint_plugin_qos) +{ + UNUSED_ARG(endpoint_data); + UNUSED_ARG(endpoint_plugin_qos); + + if (!serialize_key) { + // Not currently supported. As a result, users cannot enable batching by + // setting writer_qos.batch.enabled to TRUE. + return RTI_FALSE; + } + + // key contains the serialized data of the sample, so we need to deserialize + // it because the TypePlugin deserialization just copies the serialized + // data to the sample buffer. The actual deserialization in a ROS2 message + // is done in upper layers. + const RMW_Connext_Message * const msg = + reinterpret_cast(key); + + if (nullptr != msg->user_data) { + // Writer codepath + rcutils_uint8_array_t data_buffer; + data_buffer.allocator = rcutils_get_default_allocator(); + data_buffer.buffer = + reinterpret_cast(RTICdrStream_getCurrentPosition(stream)); + data_buffer.buffer_length = RTICdrStream_getRemainder(stream); + data_buffer.buffer_capacity = data_buffer.buffer_length; + + if (!msg->serialized) { + rmw_ret_t rc = msg->type_support->serialize_key( + msg->user_data, &data_buffer, encapsulation_id, serialize_encapsulation); + if (RMW_RET_OK != rc) { + return RTI_FALSE; + } + RMW_CONNEXT_LOG_DEBUG_A( + "serialized key: msg=%p, size=%lu", + msg->user_data, data_buffer.buffer_length) + } else { + const rcutils_uint8_array_t * const user_buffer = + reinterpret_cast(msg->user_data); + // prevent rcutils_uint8_array_copy from trying to reallocate the target buffer. + if (RCUTILS_RET_OK != + rcutils_uint8_array_copy(&data_buffer, user_buffer, false /* realloc_if_needed */)) + { + return RTI_FALSE; + } + } + + RTICdrStream_setCurrentPosition( + stream, + reinterpret_cast(data_buffer.buffer) + data_buffer.buffer_length); + } else { + // Reader codepath + const auto allocator = rcutils_get_default_allocator(); + void * deserialized_message = nullptr; + + if (msg->type_support->is_cpp()) { + const auto message_type_support = + static_cast( + msg->type_support->message_type_support()->data); + deserialized_message = allocator.allocate( + message_type_support->size_of_, + nullptr); + if (nullptr == deserialized_message) { + return RTI_FALSE; + } + message_type_support->init_function( + deserialized_message, + rosidl_runtime_cpp::MessageInitialization::SKIP); + } else { + const auto message_type_support = + static_cast( + msg->type_support->message_type_support()->data); + deserialized_message = allocator.allocate( + message_type_support->size_of_, + nullptr); + if (nullptr == deserialized_message) { + return RTI_FALSE; + } + message_type_support->init_function( + deserialized_message, + ROSIDL_RUNTIME_C_MSG_INIT_SKIP); + } + auto retcode = msg->type_support->deserialize( + deserialized_message, + &msg->data_buffer); + if (RMW_RET_OK != retcode) { + allocator.deallocate(deserialized_message, nullptr); + return RTI_FALSE; + } + + rcutils_uint8_array_t data_buffer; + data_buffer.allocator = allocator; + data_buffer.buffer = + reinterpret_cast(RTICdrStream_getCurrentPosition(stream)); + data_buffer.buffer_length = RTICdrStream_getRemainder(stream); + data_buffer.buffer_capacity = data_buffer.buffer_length; + + retcode = msg->type_support->serialize_key( + deserialized_message, + &data_buffer, + encapsulation_id, + serialize_encapsulation); + allocator.deallocate(deserialized_message, nullptr); + if (RMW_RET_OK != retcode) { + return RTI_FALSE; + } + + RTICdrStream_setCurrentPosition( + stream, + reinterpret_cast(data_buffer.buffer) + data_buffer.buffer_length); + } + + return RTI_TRUE; +} + +static +RTIBool +RMW_Connext_TypePlugin_deserialize_key( + PRESTypePluginEndpointData endpoint_data, + void ** key, + RTIBool * drop_key, + struct RTICdrStream *stream, + RTIBool deserialize_encapsulation, + RTIBool deserialize_key, + void *endpoint_plugin_qos) +{ + UNUSED_ARG(endpoint_data); + UNUSED_ARG(drop_key); + UNUSED_ARG(endpoint_plugin_qos); + + if (!deserialize_encapsulation) { + // Currently not supported + return RTI_FALSE; + } + if (!deserialize_key) { + // Currently not supported + return RTI_FALSE; + } + + RMW_Connext_Message * const msg = + reinterpret_cast(*key); + + const size_t deserialize_size = RTICdrStream_getRemainder(stream); + void * const src_ptr = RTICdrStream_getCurrentPosition(stream); + + if (msg->data_buffer.buffer_capacity < deserialize_size) { + if (RCUTILS_RET_OK != + rcutils_uint8_array_resize(&msg->data_buffer, deserialize_size)) + { + return RTI_FALSE; + } + } + + memcpy(msg->data_buffer.buffer, src_ptr, deserialize_size); + msg->data_buffer.buffer_length = deserialize_size; + + RTICdrStream_setCurrentPosition(stream, reinterpret_cast(src_ptr) + deserialize_size); + + return RTI_TRUE; +} + +static +unsigned int +RMW_Connext_TypePlugin_get_serialized_key_max_size( + PRESTypePluginEndpointData endpoint_data, + RTIBool include_encapsulation, + RTIEncapsulationId encapsulation_id, + unsigned int size) +{ + unsigned int initial_alignment = size; + + UNUSED_ARG(encapsulation_id); + + PRESTypePluginDefaultEndpointData * const epd = + reinterpret_cast(endpoint_data); + RMW_Connext_MessageTypeSupport * const type_support = + reinterpret_cast(epd->userData); + + if (type_support->unbounded_key()) { + return RTI_CDR_MAX_SERIALIZED_SIZE; + } + + size += type_support->type_key_serialized_size_max(); + + if (!include_encapsulation) { + size -= + RMW_Connext_MessageTypeSupport::ENCAPSULATION_HEADER_SIZE; + } + + return size - initial_alignment; +} + +static +RTIBool +RMW_Connext_TypePlugin_instance_to_key_hash( + PRESTypePluginEndpointData endpoint_data, + MIGRtpsKeyHash *key_hash, + const void *sample, + RTIEncapsulationId encapsulation_id) +{ + UNUSED_ARG(encapsulation_id); + + PRESTypePluginDefaultEndpointData * const epd = + reinterpret_cast(endpoint_data); + RMW_Connext_MessageTypeSupport * const type_support = + reinterpret_cast(epd->userData); + + char * buffer = NULL; + RTICdrStreamState cdr_state; + RTICdrStreamState_init(&cdr_state); + + RTICdrStream * const md5_stream = + PRESTypePluginDefaultEndpointData_getMD5Stream(endpoint_data); + if (nullptr == md5_stream) { + RMW_CONNEXT_LOG_ERROR_SET("failed to look up md5 stream for endpoint") + return RTI_FALSE; + } + RTICdrStream_resetPosition(md5_stream); + RTICdrStream_setDirtyBit(md5_stream, RTI_TRUE); + + if (!RMW_Connext_TypePlugin_serialize_key( + endpoint_data, + sample, + md5_stream, + RTI_FALSE /* serialize_encapsulation */, + RTI_CDR_ENCAPSULATION_ID_CDR_BE, + RTI_TRUE /* serialize_key */, + NULL /* endpoint_plugin_qos */)) + { + RTICdrStream_pushState(md5_stream, &cdr_state, -1); + + int size = RMW_Connext_TypePlugin_get_serialized_sample_size( + endpoint_data, + RTI_FALSE /* include_encapsulation */, + RTI_CDR_ENCAPSULATION_ID_CDR_BE, + 0 /* current_alignment */, + sample); + + if (size <= RTICdrStream_getBufferLength(md5_stream)) { + RTICdrStream_popState(md5_stream, &cdr_state); + return RTI_FALSE; + } + + RTIOsapiHeap_allocateBuffer(&buffer, size, 0); + + if (nullptr == buffer) { + RTICdrStream_popState(md5_stream, &cdr_state); + return RTI_FALSE; + } + + RTICdrStream_set(md5_stream, buffer, size); + RTIOsapiMemory_zero( + RTICdrStream_getBuffer(md5_stream), + RTICdrStream_getBufferLength(md5_stream)); + RTICdrStream_resetPosition(md5_stream); + RTICdrStream_setDirtyBit(md5_stream, RTI_TRUE); + + if (!RMW_Connext_TypePlugin_serialize_key( + endpoint_data, + sample, + md5_stream, + RTI_FALSE /* serialize_encapsulation */, + RTI_CDR_ENCAPSULATION_ID_CDR_BE, + RTI_TRUE /* serialize_key */, + NULL /* endpoint_plugin_qos */)) + { + RTICdrStream_popState(md5_stream, &cdr_state); + RTIOsapiHeap_freeBuffer(buffer); + return RTI_FALSE; + } + } + + if (MIG_RTPS_KEY_HASH_MAX_LENGTH < type_support->type_key_serialized_size_max() || + PRESTypePluginDefaultEndpointData_forceMD5KeyHash(endpoint_data)) + { + RTICdrStream_computeMD5(md5_stream, key_hash->value); + } else { + RTIOsapiMemory_zero(key_hash->value, MIG_RTPS_KEY_HASH_MAX_LENGTH); + RTIOsapiMemory_copy( + key_hash->value, + RTICdrStream_getBuffer(md5_stream), + RTICdrStream_getCurrentPositionOffset(md5_stream)); + } + + key_hash->length = MIG_RTPS_KEY_HASH_MAX_LENGTH; + + if (NULL != buffer) { + RTICdrStream_popState(md5_stream, &cdr_state); + RTIOsapiHeap_freeBuffer(buffer); + } + + return RTI_TRUE; +} + +// type_plugin->serializedSampleToKeyHashFnc +RTIBool +RMW_Connext_TypePlugin_serialized_sample_to_key_hash( + PRESTypePluginEndpointData endpointData, + struct RTICdrStream *stream, + struct MIGRtpsKeyHash *keyHash, + RTIBool deserializeEncapsulation, + void *endpointPluginQos) +{ + return RTI_FALSE; +} + +// This function is not currently supported. The function is needed when +// dispose messages are sent with a serialized key and without a key hash. +// This can happen if the writer configures writer_qos.protocol.serialize_key_with_dispose +// to TRUE and set the property data_writer.protocol.disable_inline_keyhash_in_dispose +// to TRUE. +// RTIBool +// RMW_Connext_TypePlugin_serialized_key_to_key_hash( +// PRESTypePluginEndpointData endpointData, +// struct RTICdrStream *stream, +// struct MIGRtpsKeyHash *keyHash, +// RTIBool deserializeEncapsulation, +// void *endpointPluginQos) +// { +// return RTI_FALSE; +// } + /* ------------------------------------------------------------------------ * Plug-in Life-cycle * ------------------------------------------------------------------------ */ @@ -651,7 +1072,8 @@ void RMW_Connext_TypePlugin_initialize( struct PRESTypePlugin * const plugin, struct RTICdrTypeCode * const type_code, - const char * const type_name) + const char * const type_name, + const bool keyed) { const struct PRESTypePluginVersion PLUGIN_VERSION = PRES_TYPE_PLUGIN_VERSION_2_0; @@ -662,88 +1084,74 @@ RMW_Connext_TypePlugin_initialize( plugin->endpointTypeName = type_name; plugin->typeCodeName = type_name; - /* Partcipant/endpoint events */ - plugin->onParticipantAttached = - (PRESTypePluginOnParticipantAttachedCallback) - RMW_Connext_TypePlugin_on_participant_attached; - plugin->onParticipantDetached = - (PRESTypePluginOnParticipantDetachedCallback) - RMW_Connext_TypePlugin_on_participant_detached; - plugin->onEndpointAttached = - (PRESTypePluginOnEndpointAttachedCallback) - RMW_Connext_TypePlugin_on_endpoint_attached; - plugin->onEndpointDetached = - (PRESTypePluginOnEndpointDetachedCallback) - RMW_Connext_TypePlugin_on_endpoint_detached; + /* Participant/endpoint events */ + plugin->onParticipantAttached = RMW_Connext_TypePlugin_on_participant_attached; + plugin->onParticipantDetached = RMW_Connext_TypePlugin_on_participant_detached; + plugin->onEndpointAttached = RMW_Connext_TypePlugin_on_endpoint_attached; + plugin->onEndpointDetached = RMW_Connext_TypePlugin_on_endpoint_detached; /* Sample management */ - plugin->copySampleFnc = - (PRESTypePluginCopySampleFunction) - RMW_Connext_TypePlugin_copy_sample; - plugin->createSampleFnc = - (PRESTypePluginCreateSampleFunction) - RMW_Connext_TypePlugin_create_sample; - plugin->destroySampleFnc = - (PRESTypePluginDestroySampleFunction) - RMW_Connext_TypePlugin_destroy_sample; - plugin->getSampleFnc = - (PRESTypePluginGetSampleFunction) - RMW_Connext_TypePlugin_get_sample; - plugin->returnSampleFnc = - (PRESTypePluginReturnSampleFunction) - RMW_Connext_TypePlugin_return_sample; - plugin->finalizeOptionalMembersFnc = - (PRESTypePluginFinalizeOptionalMembersFunction) - NULL /* TODO(asorbini): implement? */; + plugin->copySampleFnc = RMW_Connext_TypePlugin_copy_sample; + plugin->createSampleFnc = RMW_Connext_TypePlugin_create_sample; + plugin->destroySampleFnc = RMW_Connext_TypePlugin_destroy_sample; + plugin->getSampleFnc = RMW_Connext_TypePlugin_get_sample; + plugin->returnSampleFnc = RMW_Connext_TypePlugin_return_sample; + plugin->finalizeOptionalMembersFnc = nullptr /* TODO(asorbini): implement? */; /* Serialization/deserialization */ - plugin->serializeFnc = - (PRESTypePluginSerializeFunction) - RMW_Connext_TypePlugin_serialize; - plugin->deserializeFnc = - (PRESTypePluginDeserializeFunction) - RMW_Connext_TypePlugin_deserialize; - plugin->getSerializedSampleMaxSizeFnc = - (PRESTypePluginGetSerializedSampleMaxSizeFunction) - RMW_Connext_TypePlugin_get_serialized_sample_max_size; - plugin->getSerializedSampleMinSizeFnc = - (PRESTypePluginGetSerializedSampleMinSizeFunction) - RMW_Connext_TypePlugin_get_serialized_sample_min_size; - plugin->getBuffer = - (PRESTypePluginGetBufferFunction) - RMW_Connext_TypePlugin_get_buffer; - plugin->returnBuffer = - (PRESTypePluginReturnBufferFunction) - RMW_Connext_TypePlugin_return_buffer; - plugin->getSerializedSampleSizeFnc = - (PRESTypePluginGetSerializedSampleSizeFunction) - RMW_Connext_TypePlugin_get_serialized_sample_size; + plugin->serializeFnc = RMW_Connext_TypePlugin_serialize; + plugin->deserializeFnc = RMW_Connext_TypePlugin_deserialize; + plugin->getSerializedSampleMaxSizeFnc = RMW_Connext_TypePlugin_get_serialized_sample_max_size; + plugin->getSerializedSampleMinSizeFnc = RMW_Connext_TypePlugin_get_serialized_sample_min_size; + plugin->getBuffer = RMW_Connext_TypePlugin_get_buffer; + plugin->returnBuffer = RMW_Connext_TypePlugin_return_buffer; + plugin->getSerializedSampleSizeFnc = RMW_Connext_TypePlugin_get_serialized_sample_size; + + /* Unused callbacks*/ + plugin->getDeserializedSampleMaxSizeFnc = nullptr; + + /* Deprecated callbacks */ + plugin->getKeyFnc = nullptr; // Deprecated + plugin->returnKeyFnc = nullptr; // Deprecated + plugin->instanceToKeyFnc = nullptr; // Deprecated + plugin->keyToInstanceFnc = nullptr; // Deprecated /* Key management */ - plugin->getKeyKindFnc = - (PRESTypePluginGetKeyKindFunction) - RMW_Connext_TypePlugin_get_key_kind; - plugin->serializeKeyFnc = NULL; - plugin->deserializeKeyFnc = NULL; - plugin->getKeyFnc = NULL; - plugin->returnKeyFnc = NULL; - plugin->instanceToKeyFnc = NULL; - plugin->keyToInstanceFnc = NULL; - plugin->getSerializedKeyMaxSizeFnc = NULL; - plugin->instanceToKeyHashFnc = NULL; - plugin->serializedSampleToKeyHashFnc = NULL; - plugin->serializedKeyToKeyHashFnc = NULL; + if (!keyed) { + plugin->getKeyKindFnc = RMW_Connext_TypePlugin_get_key_kind_unkeyed; + plugin->serializeKeyFnc = nullptr; + plugin->deserializeKeyFnc = nullptr; + plugin->getSerializedKeyMaxSizeFnc = nullptr; + plugin->instanceToKeyHashFnc = nullptr; + plugin->serializedSampleToKeyHashFnc = nullptr; + plugin->serializedKeyToKeyHashFnc = nullptr; + plugin->deserializeKeySampleFnc = nullptr; + } else { + plugin->getKeyKindFnc = RMW_Connext_TypePlugin_get_key_kind_keyed; + plugin->serializeKeyFnc = RMW_Connext_TypePlugin_serialize_key; + // This callback is not supported yet, but the PRES TypePlugin requires it to be non-NULL. For + // that reason we are leaving it like this, as it is not going to be called anyway. + plugin->deserializeKeyFnc = RMW_Connext_TypePlugin_deserialize_key; + plugin->getSerializedKeyMaxSizeFnc = RMW_Connext_TypePlugin_get_serialized_key_max_size; + plugin->instanceToKeyHashFnc = RMW_Connext_TypePlugin_instance_to_key_hash; + // TODO(fgallegosalido): Not implemented yet + plugin->serializedSampleToKeyHashFnc = RMW_Connext_TypePlugin_serialized_sample_to_key_hash; + // TODO(fgallegosalido): Not supported yet + plugin->serializedKeyToKeyHashFnc = nullptr; + // TODO(fgallegosalido) Consider in the future over deserializeKeyFnc + plugin->deserializeKeySampleFnc = nullptr; + } #if !RMW_CONNEXT_DDS_API_PRO_LEGACY /* This functions are not part of the pre-6.x type plugin */ plugin->isMetpType = RTI_FALSE; - plugin->getWriterLoanedSampleFnc = NULL; - plugin->returnWriterLoanedSampleFnc = NULL; - plugin->returnWriterLoanedSampleFromCookieFnc = NULL; - plugin->validateWriterLoanedSampleFnc = NULL; - plugin->setWriterLoanedSampleSerializedStateFnc = NULL; - plugin->getBufferWithParams = NULL; - plugin->returnBufferWithParams = NULL; + plugin->getWriterLoanedSampleFnc = nullptr; + plugin->returnWriterLoanedSampleFnc = nullptr; + plugin->returnWriterLoanedSampleFromCookieFnc = nullptr; + plugin->validateWriterLoanedSampleFnc = nullptr; + plugin->setWriterLoanedSampleSerializedStateFnc = nullptr; + plugin->getBufferWithParams = nullptr; + plugin->returnBufferWithParams = nullptr; #endif /* RMW_CONNEXT_DDS_API_PRO_LEGACY */ } @@ -880,7 +1288,8 @@ rmw_connextdds_register_type_support( RMW_Connext_TypePlugin_initialize( &type_plugin->base, &type_plugin->type_code.base, - type_support->type_name()); + type_support->type_name(), + type_support->keyed()); if (DDS_RETCODE_OK != DDS_DomainParticipant_register_type( diff --git a/rmw_connextdds_common/src/ndds/rmw_typecode.cpp b/rmw_connextdds_common/src/ndds/rmw_typecode.cpp index fd85856a..3d731071 100644 --- a/rmw_connextdds_common/src/ndds/rmw_typecode.cpp +++ b/rmw_connextdds_common/src/ndds/rmw_typecode.cpp @@ -1057,6 +1057,8 @@ rmw_connextdds_convert_type_members( if (nullptr == tc_member->type) { return RMW_RET_ERROR; } + + tc_member->is_key = (member->is_key_) ? RTI_TRUE : RTI_FALSE; } return RMW_RET_OK; diff --git a/rmw_connextdds_common/src/rtime/rmw_type_support_rtime.cpp b/rmw_connextdds_common/src/rtime/rmw_type_support_rtime.cpp index 57086bb6..1016c6c0 100644 --- a/rmw_connextdds_common/src/rtime/rmw_type_support_rtime.cpp +++ b/rmw_connextdds_common/src/rtime/rmw_type_support_rtime.cpp @@ -203,7 +203,7 @@ RMW_Connext_EncapsulationPlugin_serialize( } } else { if (RCUTILS_RET_OK != - rcutils_uint8_array_copy(&data_buffer, user_buffer)) + rcutils_uint8_array_copy(&data_buffer, user_buffer, false /* realloc_if_needed */)) { return RTI_FALSE; }