diff --git a/include/up-cpp/client/usubscription/v3/Consumer.h b/include/up-cpp/client/usubscription/v3/Consumer.h index 2fed6ab4b..b88f2aafc 100644 --- a/include/up-cpp/client/usubscription/v3/Consumer.h +++ b/include/up-cpp/client/usubscription/v3/Consumer.h @@ -16,97 +16,15 @@ #include #include #include -#include #include -#include -#include +#include "RequestBuilder.h" +#include "USubscriptionUUriBuilder.h" namespace uprotocol::client::usubscription::v3 { using uprotocol::core::usubscription::v3::SubscriptionRequest; using uprotocol::core::usubscription::v3::UnsubscribeRequest; using uprotocol::core::usubscription::v3::Update; -using uprotocol::core::usubscription::v3::uSubscription; - -/** - * @struct ConsumerOptions - * @brief Additional details for uSubscription service. - * - * Each member represents an optional parameter for the uSubscription service. - */ -struct ConsumerOptions { - /// Permission level of the subscription request - std::optional permission_level; - /// TAP token for access. - std::optional token; - /// Expiration time of the subscription. - std::optional when_expire; - /// Sample period for the subscription messages in milliseconds. - std::optional sample_period_ms; - /// Details of the subscriber. - std::optional subscriber_details; - /// Details of the subscription. - std::optional subscription_details; -}; - -/// @struct uSubscriptionUUriBuilder -/// @brief Structure to build uSubscription request URIs. -/// -/// This structure is used to build URIs for uSubscription service. It uses the -/// service options from uSubscription proto to set the authority name, ue_id, -/// ue_version_major, and the notification topic resource ID in the URI. -struct USubscriptionUUriBuilder { -private: - /// URI for the uSubscription service - v1::UUri uri_; - /// Resource ID of the notification topic - uint32_t sink_resource_id_; - -public: - /// @brief Constructor for uSubscriptionUUriBuilder. - USubscriptionUUriBuilder() { - // Get the service descriptor - const google::protobuf::ServiceDescriptor* service = - uSubscription::descriptor(); - const auto& service_options = service->options(); - - // Get the service options - const auto& service_name = - service_options.GetExtension(uprotocol::service_name); - const auto& service_version_major = - service_options.GetExtension(uprotocol::service_version_major); - const auto& service_id = - service_options.GetExtension(uprotocol::service_id); - const auto& notification_topic = - service_options.GetExtension(uprotocol::notification_topic, 0); - - // Set the values in the URI - uri_.set_authority_name(service_name); - uri_.set_ue_id(service_id); - uri_.set_ue_version_major(service_version_major); - sink_resource_id_ = notification_topic.id(); - } - - /// @brief Get the URI with a specific resource ID. - /// - /// @param resource_id The resource ID to set in the URI. - /// - /// @return The URI with the specified resource ID. - v1::UUri getServiceUriWithResourceId(uint32_t resource_id) const { - v1::UUri uri = uri_; // Copy the base URI - uri.set_resource_id(resource_id); - return uri; - } - - /// @brief Get the notification URI. - /// - /// @return The notification URI. - v1::UUri getNotificationUri() const { - v1::UUri uri = uri_; // Copy the base URI - uri.set_resource_id(sink_resource_id_); - return uri; - } -}; /// @brief Interface for uEntities to create subscriptions. /// @@ -133,7 +51,7 @@ struct Consumer { const v1::UUri& subscription_topic, ListenCallback&& callback, v1::UPriority priority, std::chrono::milliseconds subscription_request_ttl, - ConsumerOptions consumer_options); + core::usubscription::v3::CallOptions consumer_options); /// @brief Unsubscribe from the topic and call uSubscription service to /// close the subscription. @@ -160,7 +78,7 @@ struct Consumer { /// @param subscriber_details Additional details about the subscriber. Consumer(std::shared_ptr transport, v1::UUri subscription_topic, - ConsumerOptions consumer_options = {}); + core::usubscription::v3::CallOptions consumer_options = {}); private: // Transport @@ -169,10 +87,10 @@ struct Consumer { // Topic to subscribe to const v1::UUri subscription_topic_; // Additional details about uSubscription service - ConsumerOptions consumer_options_; + core::usubscription::v3::CallOptions consumer_options_; // URI info about the uSubscription service - USubscriptionUUriBuilder uSubscriptionUUriBuilder_; + core::usubscription::v3::USubscriptionUUriBuilder uSubscriptionUUriBuilder_; // Subscription updates std::unique_ptr noficationSinkHandle_; @@ -191,10 +109,10 @@ struct Consumer { friend std::unique_ptr std::make_unique, const uprotocol::v1::UUri, - uprotocol::client::usubscription::v3::ConsumerOptions>( + uprotocol::core::usubscription::v3::CallOptions>( std::shared_ptr&&, const uprotocol::v1::UUri&&, - uprotocol::client::usubscription::v3::ConsumerOptions&&); + uprotocol::core::usubscription::v3::CallOptions&&); /// @brief Build SubscriptionRequest for subscription request SubscriptionRequest buildSubscriptionRequest(); diff --git a/include/up-cpp/client/usubscription/v3/RequestBuilder.h b/include/up-cpp/client/usubscription/v3/RequestBuilder.h new file mode 100644 index 000000000..479be152b --- /dev/null +++ b/include/up-cpp/client/usubscription/v3/RequestBuilder.h @@ -0,0 +1,101 @@ +// SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#ifndef UP_CPP_CLIENT_USUBSCRIPTION_V3_REQUESTBUILDER_H +#define UP_CPP_CLIENT_USUBSCRIPTION_V3_REQUESTBUILDER_H +#include +#include + +#include + +namespace uprotocol::core::usubscription::v3 { + +/// @struct CallOptions +/// @brief Additional details for uSubscription service. +/// +/// Each member represents an optional parameter for the uSubscription service. +struct CallOptions { + /// Permission level of the subscription request + std::optional permission_level; + /// TAP token for access. + std::optional token; + /// Expiration time of the subscription. + std::optional when_expire; + /// Sample period for the subscription messages in milliseconds. + std::optional sample_period_ms; + /// Details of the subscriber. + std::optional subscriber_details; + /// Details of the subscription. + std::optional subscription_details; +}; + +/// @brief Builds different requests using specified options. +/// +/// This struct facilitates the construction of requests based on +/// `USubscriptionOptions`, providing methods to build different requests. +struct RequestBuilder { + /// @brief Builds a subscription request for a given topic. + /// + /// @param topic The `v1::UUri` representing the topic for the subscription. + /// + /// @return A `SubscriptionRequest` configured for the specified topic. + static SubscriptionRequest buildSubscriptionRequest( + const v1::UUri& topic, const CallOptions& options = {}); + + /// @brief Builds an unsubscription request for a given topic. + /// + /// @param topic The `v1::UUri` representing the topic to unsubscribe from. + /// + /// @return An `UnsubscribeRequest` configured for the specified topic. + static UnsubscribeRequest buildUnsubscribeRequest(const v1::UUri& topic); + + /// @brief Build fetch subscritions request for a given topic. + /// + /// @param topic The `v1::UUri` representing the topic to fetch. + /// + /// @return A `FetchSubscriptionsRequest` configured for the specified + /// topic. + static FetchSubscriptionsRequest buildFetchSubscriptionsRequest( + const v1::UUri& topic); + + /// @brief Build fetch subscritions request for a given subscriber. + /// + /// @param subscriber The `SubscriberInfo` representing the subscriber to + /// fetch. + /// + /// @return A `FetchSubscriptionsRequest` configured for the specified + /// subscriber. + static FetchSubscriptionsRequest buildFetchSubscriptionsRequest( + const SubscriberInfo& subscriber); + + /// @brief Build fetch subscribers request for a given topic. + /// + /// @param topic The `v1::UUri` representing the topic to fetch. + /// + /// @return A `FetchSubscribersRequest` configured for the specified topic. + static FetchSubscribersRequest buildFetchSubscribersRequest( + const v1::UUri& topic); + + /// @brief Build a notifications request for a given topic. Subscription + /// change + /// notifications MUST use topic SubscriptionsChange with resource id + /// 0x8000, as per the protobuf definition. + /// + /// @param topic The `v1::UUri` representing the topic to (un)register + /// for/from. + /// + /// @return A `NotificationsRequest` configured for the specified topic. + static NotificationsRequest buildNotificationsRequest( + const v1::UUri& topic); +}; + +} // namespace uprotocol::core::usubscription::v3 +#endif // UP_CPP_CLIENT_USUBSCRIPTION_V3_REQUESTBUILDER_H diff --git a/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h new file mode 100644 index 000000000..e6e9f4ef5 --- /dev/null +++ b/include/up-cpp/client/usubscription/v3/RpcClientUSubscription.h @@ -0,0 +1,134 @@ +// SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#ifndef UP_CPP_CLIENT_USUBSCRIPTION_V3_RPCCLIENTUSUBSCRIPTION_H +#define UP_CPP_CLIENT_USUBSCRIPTION_V3_RPCCLIENTUSUBSCRIPTION_H + +#include +#include +#include +#include + +#include "up-cpp/client/usubscription/v3/USubscription.h" +#include "up-cpp/client/usubscription/v3/USubscriptionUUriBuilder.h" + +/// The uEntity (type) identifier of the uSubscription service. +constexpr uint32_t USUBSCRIPTION_TYPE_ID = 0x00000000; +/// The (latest) major version of the uSubscription service. +constexpr uint8_t UE_VERSION_MAJOR = 0x03; +/// The resource identifier of uSubscription's _subscribe_ operation. +constexpr uint16_t RESOURCE_ID_SUBSCRIBE = 0x0001; +/// The resource identifier of uSubscription's _unsubscribe_ operation. +constexpr uint16_t RESOURCE_ID_UNSUBSCRIBE = 0x0002; +/// The resource identifier of uSubscription's _fetch subscriptions_ operation. +constexpr uint16_t RESOURCE_ID_FETCH_SUBSCRIPTIONS = 0x0003; +/// The resource identifier of uSubscription's _register for notifications_ +/// operation. +constexpr uint16_t RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS = 0x0006; +/// The resource identifier of uSubscription's _unregister for notifications_ +/// operation. +constexpr uint16_t RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS = 0x0007; +/// The resource identifier of uSubscription's _fetch subscribers_ operation. +constexpr uint16_t RESOURCE_ID_FETCH_SUBSCRIBERS = 0x0008; + +constexpr auto USUBSCRIPTION_REQUEST_TTL = std::chrono::milliseconds(5000); + +namespace uprotocol::core::usubscription::v3 { +using v3::SubscriptionRequest; +using v3::UnsubscribeRequest; + +struct USubscriptionOptions { + std::string authority_name; + uint16_t instance_id = 0x0000; +}; + +/// @brief Client which implements the USubscription interface +struct RpcClientUSubscription : USubscription { + using RpcClientUSubscriptionOrStatus = + utils::Expected, v1::UStatus>; + using ListenCallback = transport::UTransport::ListenCallback; + using ListenHandle = transport::UTransport::ListenHandle; + + /// @brief Subscribes from a given topic + /// + /// @param subscription_request The request object containing the topic to + /// subscribe to + /// @return Returns a future that reslves to a SubscriptionResponse on + /// success and a UStatus else + communication::RpcClient::InvokeProtoFuture subscribe( + const SubscriptionRequest& subscription_request) override; + + /// @brief Unsubscribes from a given topic + /// + /// @param unsubscribe_request The request object containing the topic to + /// unsubscribe from + /// @return Returns an UnsubscribeResponse on success and a UStatus else + communication::RpcClient::InvokeProtoFuture + unsubscribe(const UnsubscribeRequest& unsubscribe_request) override; + + /// @brief Fetches the list of topics the client is subscribed to + /// + /// @param fetch_subscriptions_request The request object + /// @return Returns a future that reslves to a FetchSubscriptionsResponse on + /// success and a UStatus else + communication::RpcClient::InvokeProtoFuture + fetch_subscriptions( + const FetchSubscriptionsRequest& fetch_subscriptions_request) override; + + /// @brief Fetches the list of subscribers for a given topic + /// + /// @param fetch_subscribers_request The request object containing the topic + /// for which the subscribers are to be fetched + /// @return Returns a FetchSubscribersResponse on success and a UStatus else + communication::RpcClient::InvokeProtoFuture + fetch_subscribers( + const FetchSubscribersRequest& fetch_subscribers_request) override; + + /// @brief Registers to receive notifications + /// + /// @param register_notifications_request The request object containing + /// the details to register for notifications + /// @return Returns a future that resolves to a NotificationResponse on + /// success and a UStatus else + communication::RpcClient::InvokeProtoFuture + register_for_notifications( + const NotificationsRequest& register_notifications_request) override; + + /// @brief Unregisters from receiving notifications. + /// + /// @param unregister_notifications_request The request object containing + /// the details needed to stop receiving notifications. + /// @return Returns future that resolves to a NotificationResponse on + /// success and a UStatus else + communication::RpcClient::InvokeProtoFuture + unregister_for_notifications( + const NotificationsRequest& unregister_notifications_request) override; + + /// @brief Constructor + /// + /// @param transport Transport used to send messages + /// @param options Struct containing all options for the USubscription + /// client + explicit RpcClientUSubscription( + std::shared_ptr transport, + const USubscriptionOptions& options); + + ~RpcClientUSubscription() override = default; + +private: + std::shared_ptr transport_; + std::shared_ptr rpc_client_; + USubscriptionUUriBuilder uuri_builder_; +}; + +} // namespace uprotocol::core::usubscription::v3 + +#endif // UP_CPP_CLIENT_USUBSCRIPTION_V3_RPCCLIENTUSUBSCRIPTION_H diff --git a/include/up-cpp/client/usubscription/v3/USubscription.h b/include/up-cpp/client/usubscription/v3/USubscription.h new file mode 100644 index 000000000..c0b5d48a3 --- /dev/null +++ b/include/up-cpp/client/usubscription/v3/USubscription.h @@ -0,0 +1,92 @@ +// SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#ifndef UP_CPP_CLIENT_USUBSCRIPTION_V3_USUBSCRIPTION_H +#define UP_CPP_CLIENT_USUBSCRIPTION_V3_USUBSCRIPTION_H +#include +#include + +#include "up-cpp/communication/RpcClient.h" +#include "up-cpp/utils/Expected.h" + +namespace uprotocol::core::usubscription::v3 { + +/// @brief Interface for uEntities to create subscriptions. +/// +/// Like all L3 client APIs, the RpcClientUSubscription is a wrapper on top of +/// the L2 Communication APIs and USubscription service. +struct USubscription { + template + using ResponseOrStatus = utils::Expected; + + virtual ~USubscription() = default; + + /// @brief sends a subscription request to a USubscription backend and a + /// response on success or else a status code + /// + /// @param subscription_request containing a topic to subscribe to + /// @return future that resolves to a SubscriptionReponse on success and + /// UStatus else + virtual communication::RpcClient::InvokeProtoFuture + subscribe(const SubscriptionRequest& subscription_request) = 0; + + /// @brief sends an unsubscribe request to a USubscription backend and a + /// response on success or else a status code + /// + /// @param unsubscribe_request containing a topic to unsubscribe + /// @return future that resolves to UnsubscribeResponse on success and + /// UStatus else + virtual communication::RpcClient::InvokeProtoFuture + unsubscribe(const UnsubscribeRequest& unsubscribe_request) = 0; + + /// @brief fetches all topics the client is subscribed to from the backend + /// + /// @param fetch_subscriptions_request + /// @return future that resolves to FetchSubscriptionsResponse on success + /// and UStatus else + virtual communication::RpcClient::InvokeProtoFuture< + FetchSubscriptionsResponse> + fetch_subscriptions( + const FetchSubscriptionsRequest& fetch_subscriptions_request) = 0; + + /// @brief registers for notifications to a USubscription backend + /// + /// @param register_notifications_request + /// @return future that resolves to NotificationResponse on success and + /// UStatus else + virtual communication::RpcClient::InvokeProtoFuture + register_for_notifications( + const NotificationsRequest& register_notifications_request) = 0; + + /// @brief unregisters for notifications to a USubscription backend + /// + /// @param unregister_notifications_request + /// @return future that resolves to NotificationResponse on success and + /// UStatus else + virtual communication::RpcClient::InvokeProtoFuture + unregister_for_notifications( + const NotificationsRequest& unregister_notifications_request) = 0; + + /// @brief fetches all subscribers for a given topic from the backend + /// + /// @param fetch_subscriptions_request containing the topic for which the + /// subscribers are fetched + /// @return future that resolves to FetchSubscriptionsResponse on success + /// and UStatus else + virtual communication::RpcClient::InvokeProtoFuture< + FetchSubscribersResponse> + fetch_subscribers( + const FetchSubscribersRequest& fetch_subscribers_request) = 0; +}; + +} // namespace uprotocol::core::usubscription::v3 + +#endif // UP_CPP_CLIENT_USUBSCRIPTION_V3_USUBSCRIPTION_H diff --git a/include/up-cpp/client/usubscription/v3/USubscriptionUUriBuilder.h b/include/up-cpp/client/usubscription/v3/USubscriptionUUriBuilder.h new file mode 100644 index 000000000..28f718e38 --- /dev/null +++ b/include/up-cpp/client/usubscription/v3/USubscriptionUUriBuilder.h @@ -0,0 +1,61 @@ +// SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#ifndef UP_CPP_CLIENT_USUBSCRIPTION_V3_USUBSCRIPTIONUURIBUILDER_H +#define UP_CPP_CLIENT_USUBSCRIPTION_V3_USUBSCRIPTIONUURIBUILDER_H + +#include +#include + +namespace uprotocol::core::usubscription::v3 { +/// @struct uSubscriptionUUriBuilder +/// @brief Structure to build uSubscription request URIs. +/// +/// This structure is used to build URIs for uSubscription service. It uses the +/// service options from uSubscription proto to set the authority name, ue_id, +/// ue_version_major, and the notification topic resource ID in the URI. +struct USubscriptionUUriBuilder { +private: + /// URI for the uSubscription service + v1::UUri base_uri_; + /// Resource ID of the notification topic + uint32_t sink_resource_id_; + +public: + /// @brief Constructor for uSubscriptionUUriBuilder. + USubscriptionUUriBuilder(); + + USubscriptionUUriBuilder& setAuthorityName( + const std::string& authority_name); + + USubscriptionUUriBuilder& setUEntityId(u_int32_t ue_id); + + USubscriptionUUriBuilder& setInstanceId(u_int16_t instance_id); + + USubscriptionUUriBuilder& setServiceId(u_int16_t service_id); + + USubscriptionUUriBuilder& setResourceId(u_int32_t resource_id); + /// @brief Get the URI with a specific resource ID. + /// + /// @param resource_id The resource ID to set in the URI. + /// + /// @return The URI with the specified resource ID. + v1::UUri getServiceUriWithResourceId(uint32_t resource_id) const; + + /// @brief Get the notification URI. + /// + /// @return The notification URI. + v1::UUri getNotificationUri(); +}; + +} // namespace uprotocol::core::usubscription::v3 + +#endif // UP_CPP_CLIENT_USUBSCRIPTION_V3_USUBSCRIPTIONUURIBUILDER_H diff --git a/include/up-cpp/communication/RpcClient.h b/include/up-cpp/communication/RpcClient.h index ce5115ab1..b800ae1c0 100644 --- a/include/up-cpp/communication/RpcClient.h +++ b/include/up-cpp/communication/RpcClient.h @@ -12,11 +12,14 @@ #ifndef UP_CPP_COMMUNICATION_RPCCLIENT_H #define UP_CPP_COMMUNICATION_RPCCLIENT_H +#include #include #include #include #include +#include #include +#include #include #include @@ -25,6 +28,9 @@ #include namespace uprotocol::communication { +template +using ResponseOrStatus = utils::Expected; +using UnexpectedStatus = utils::Unexpected; /// @brief Interface for uEntities to invoke RPC methods. /// @@ -69,17 +75,22 @@ struct RpcClient { /// for the duration of an RPC call. using InvokeHandle = Connection::Handle; - /// @brief Extension to std::future that also holds a callback handle - class InvokeFuture { + /// @brief Extension to std::future with template type that also holds a + /// callback handle + template + class InvokeProtoFuture { InvokeHandle callback_handle_; - std::future future_; + std::future> future_; public: - InvokeFuture(); - InvokeFuture(InvokeFuture&&) noexcept; - InvokeFuture(std::future&&, InvokeHandle&&) noexcept; + InvokeProtoFuture() = default; + InvokeProtoFuture(InvokeProtoFuture&& other) noexcept = default; + InvokeProtoFuture& operator=(InvokeProtoFuture&& other) noexcept = + default; - InvokeFuture& operator=(InvokeFuture&&) noexcept; + InvokeProtoFuture(std::future>&& future, + InvokeHandle&& handle) noexcept + : callback_handle_(std::move(handle)), future_(std::move(future)) {} /// @name Passthroughs for std::future /// @{ @@ -97,6 +108,8 @@ struct RpcClient { /// @} }; + using InvokeFuture = InvokeProtoFuture; + /// @brief Invokes an RPC method by sending a request message. /// /// @param The method that will be invoked @@ -167,6 +180,82 @@ struct RpcClient { /// * A UMessage containing the response from the RPC target. [[nodiscard]] InvokeFuture invokeMethod(const v1::UUri&); + template + InvokeHandle invokeMethodToProto(const v1::UUri& method, + const R& request_message, + Callback&& callback) { + auto payload_or_status = + uprotocol::utils::ProtoConverter::protoToPayload(request_message); + + if (!payload_or_status.has_value()) { + return {}; + } + + datamodel::builder::Payload tmp_payload(payload_or_status.value()); + auto handle = invokeMethod( + builder_.withMethod(method).build(std::move(tmp_payload)), + std::move(callback)); + + return handle; + } + + template + InvokeProtoFuture invokeMethodToProto(const v1::UUri& method, + const R& request_message) { + auto result_promise = + std::make_shared>>(); + auto future = result_promise->get_future(); + auto handle = invokeMethodToProto( + method, request_message, + [result_promise](const MessageOrStatus& message_or_status) { + if (!message_or_status.has_value()) { + result_promise->set_value(ResponseOrStatus( + UnexpectedStatus(message_or_status.error()))); + return; + } + auto response_or_status = + utils::ProtoConverter::extractFromProtobuf( + message_or_status.value()); + + if (!response_or_status.has_value()) { + spdlog::error( + "invokeProtoMethod: Error when extracting response " + "from " + "protobuf."); + result_promise->set_value(response_or_status); + return; + } + + result_promise->set_value( + ResponseOrStatus(response_or_status.value())); + }); + + return {std::move(future), std::move(handle)}; + } + + template + InvokeFuture invokeMethodToProto(const v1::UUri& method, + const R& request_message) { + auto result_promise = + std::make_shared>>(); + auto future = result_promise->get_future(); + + auto handle = invokeMethodToProto( + method, request_message, + [result_promise](const MessageOrStatus& message_or_status) { + if (!message_or_status.has_value()) { + result_promise->set_value(ResponseOrStatus( + UnexpectedStatus(message_or_status.error()))); + return; + } + + result_promise->set_value( + ResponseOrStatus(message_or_status.value())); + }); + + return {std::move(future), std::move(handle)}; + } + /// @brief Default move constructor (defined in RpcClient.cpp) RpcClient(RpcClient&&) noexcept; @@ -190,4 +279,4 @@ struct RpcClient { } // namespace uprotocol::communication -#endif // UP_CPP_COMMUNICATION_RPCCLIENT_H +#endif // UP_CPP_COMMUNICATION_RPCCLIENT_H \ No newline at end of file diff --git a/include/up-cpp/datamodel/builder/UMessage.h b/include/up-cpp/datamodel/builder/UMessage.h index 34863b2e3..408fd1744 100644 --- a/include/up-cpp/datamodel/builder/UMessage.h +++ b/include/up-cpp/datamodel/builder/UMessage.h @@ -234,6 +234,16 @@ struct UMessageBuilder { /// @return A built message with no payload populated. [[nodiscard]] v1::UMessage build() const; + /// @brief Creates a UMessage based on the builder's current state. + /// + /// @param A UUri of the method that should be called + /// + /// @throws UnexpectedFormat if withPayloadFormat() has been previously + /// called. + /// + /// @return A built message with no payload populated. + [[nodiscard]] v1::UMessage build(const v1::UUri&) const; + /// @brief Creates a UMessage with a provided payload based on the /// builder's current state. /// @@ -247,6 +257,21 @@ struct UMessageBuilder { /// @return A built message with the provided payload data embedded. [[nodiscard]] v1::UMessage build(builder::Payload&&) const; + /// @brief Creates a UMessage with a provided payload based on the + /// builder's current state. + /// + /// @param A UUri of the method that should be called + /// + /// @param A Payload builder containing a payload to embed in the message. + /// + /// @note The contents of the payload builder will be moved. + /// + /// @throws UnexpectedFormat if withPayloadFormat() has been previously + /// called and the format in the payload builder does not match. + /// + /// @return A built message with the provided payload data embedded. + [[nodiscard]] v1::UMessage build(const v1::UUri&, builder::Payload&&) const; + /// @brief Access the attributes of the message being built. /// @return A reference to the attributes of the message being built. [[deprecated( diff --git a/include/up-cpp/utils/ProtoConverter.h b/include/up-cpp/utils/ProtoConverter.h index ee4123936..3103f3e77 100644 --- a/include/up-cpp/utils/ProtoConverter.h +++ b/include/up-cpp/utils/ProtoConverter.h @@ -3,11 +3,24 @@ #include #include +#include +#include #include #include +#include "up-cpp/datamodel/builder/Payload.h" +#include "up-cpp/utils/Expected.h" + namespace uprotocol::utils { +template +using TOrStatus = utils::Expected; +using UnexpectedStatus = utils::Unexpected; +using PayloadOrStatus = + utils::Expected; +using core::usubscription::v3::FetchSubscribersRequest; +using core::usubscription::v3::FetchSubscriptionsRequest; +using core::usubscription::v3::NotificationsRequest; using uprotocol::core::usubscription::v3::SubscribeAttributes; using uprotocol::core::usubscription::v3::SubscriberInfo; using uprotocol::core::usubscription::v3::SubscriptionRequest; @@ -53,6 +66,125 @@ struct ProtoConverter { /// @return the built UnsubscribeRequest static UnsubscribeRequest BuildUnSubscribeRequest( const v1::UUri& subscription_topic); + + /// @brief Builds a FetchSubscriptionsRequest from the given topic + /// + /// @param topic the UUri of the topic to fetch subscriptions for + /// @return the built FetchSubscriptionsRequest + static FetchSubscriptionsRequest BuildFetchSubscriptionsRequest( + const v1::UUri& topic); + + /// @brief Builds a FetchSubscriptionsRequest from the given subscriber + /// information + /// + /// @param subscriber the SubscriberInfo containing details of the + /// subscriber + /// @return the built FetchSubscriptionsRequest + static FetchSubscriptionsRequest BuildFetchSubscriptionsRequest( + const SubscriberInfo& subscriber); + + /// @brief Builds a FetchSubscribersRequest from the given topic + /// + /// @param topic the UUri of the topic to fetch subscribers for + /// @return the built FetchSubscribersRequest + static FetchSubscribersRequest BuildFetchSubscribersRequest( + const v1::UUri& topic); + + /// @brief Builds a NotificationsRequest from the given topic + /// + /// @param topic the UUri of the topic to build a notification request for + /// @return the built NotificationsRequest + static NotificationsRequest BuildNotificationsRequest( + const v1::UUri& topic); + + /// @brief Deserializes a protobuf message from a given payload. + /// + /// @tparam T The type to deserialize the message into. + /// @param message The `v1::UMessage` containing the payload. + /// @return `TOrStatus` with the deserialized object or an error status. + template + static TOrStatus extractFromProtobuf(const v1::UMessage& message) { + switch (message.attributes().payload_format()) { + case v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF: { + T response; + if (!response.ParseFromString(message.payload())) { + v1::UStatus status; + status.set_code(v1::UCode::INTERNAL); + status.set_message( + "extractFromProtobuf: Error when parsing payload from " + "protobuf."); + return TOrStatus(UnexpectedStatus(status)); + } + return TOrStatus(response); + } + case v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY: { + google::protobuf::Any any; + if (!any.ParseFromString(message.payload())) { + v1::UStatus status; + status.set_code(v1::UCode::INTERNAL); + status.set_message( + "extractFromProtobuf: Error when parsing payload from " + "protobuf any."); + return TOrStatus(UnexpectedStatus(status)); + } + T response; + if (!any.UnpackTo(&response)) { + v1::UStatus status; + status.set_code(v1::UCode::INTERNAL); + status.set_message( + "extractFromProtobuf: Error when unpacking any."); + return TOrStatus(UnexpectedStatus(status)); + } + return TOrStatus(response); + } + case v1::UPayloadFormat::UPAYLOAD_FORMAT_UNSPECIFIED: + case v1::UPayloadFormat::UPAYLOAD_FORMAT_JSON: + case v1::UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP: + case v1::UPayloadFormat::UPAYLOAD_FORMAT_SOMEIP_TLV: + case v1::UPayloadFormat::UPAYLOAD_FORMAT_RAW: + case v1::UPayloadFormat::UPAYLOAD_FORMAT_TEXT: + case v1::UPayloadFormat::UPAYLOAD_FORMAT_SHM: + case v1::UPayloadFormat:: + UPayloadFormat_INT_MIN_SENTINEL_DO_NOT_USE_: + case v1::UPayloadFormat:: + UPayloadFormat_INT_MAX_SENTINEL_DO_NOT_USE_: { + v1::UStatus status; + status.set_code(v1::UCode::UNIMPLEMENTED); + status.set_message("Unimplemented payload format."); + return TOrStatus(UnexpectedStatus(status)); + } + default: { + v1::UStatus status; + status.set_code(v1::UCode::INVALID_ARGUMENT); + status.set_message( + "Unknown/invalid/unsupported payload format."); + return TOrStatus(UnexpectedStatus(status)); + } + } + } + + /// @brief Serializes a protobuf object into a payload. + /// + /// @tparam T The type of the protobuf object to serialize. + /// @param proto The protobuf object to be converted into a payload. + /// @return `PayloadOrStatus` containing the payload or an error status. + template + static PayloadOrStatus protoToPayload(const T& proto) { + google::protobuf::Any any; + + if (!any.PackFrom(proto)) { + v1::UStatus status; + status.set_code(v1::UCode::INTERNAL); + status.set_message( + "protoToPayload: There was an error when serializing the " + "subscription request."); + return PayloadOrStatus(UnexpectedStatus(status)); + } + + const datamodel::builder::Payload payload(any); + + return PayloadOrStatus(payload); + } }; }; // namespace uprotocol::utils #endif // UP_CPP_UTILS_PROTOCONVERTER_H diff --git a/src/client/usubscription/v3/Consumer.cpp b/src/client/usubscription/v3/Consumer.cpp index 9bc5f8769..83012f7d6 100644 --- a/src/client/usubscription/v3/Consumer.cpp +++ b/src/client/usubscription/v3/Consumer.cpp @@ -13,28 +13,31 @@ #include +#include "up-cpp/client/usubscription/v3/RequestBuilder.h" + namespace uprotocol::client::usubscription::v3 { Consumer::Consumer(std::shared_ptr transport, v1::UUri subscription_topic, - ConsumerOptions consumer_options) + core::usubscription::v3::CallOptions consumer_options) : transport_(std::move(transport)), subscription_topic_(std::move(subscription_topic)), consumer_options_(std::move(consumer_options)), rpc_client_(nullptr) { // Initialize uSubscriptionUUriBuilder_ - uSubscriptionUUriBuilder_ = USubscriptionUUriBuilder(); + uSubscriptionUUriBuilder_ = + core::usubscription::v3::USubscriptionUUriBuilder(); } [[nodiscard]] Consumer::ConsumerOrStatus Consumer::create( std::shared_ptr transport, const v1::UUri& subscription_topic, ListenCallback&& callback, v1::UPriority priority, std::chrono::milliseconds subscription_request_ttl, - ConsumerOptions consumer_options) { + core::usubscription::v3::CallOptions consumer_options) { auto consumer = std::make_unique( std::forward>(transport), std::forward(subscription_topic), - std::forward(consumer_options)); + std::forward(consumer_options)); // Attempt to connect create notification sink for updates. auto status = consumer->createNotificationSink(); diff --git a/src/client/usubscription/v3/RequestBuilder.cpp b/src/client/usubscription/v3/RequestBuilder.cpp new file mode 100644 index 000000000..555f125e3 --- /dev/null +++ b/src/client/usubscription/v3/RequestBuilder.cpp @@ -0,0 +1,52 @@ +// SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#include "up-cpp/client/usubscription/v3/RequestBuilder.h" + +#include + +namespace uprotocol::core::usubscription::v3 { + +SubscriptionRequest RequestBuilder::buildSubscriptionRequest( + const v1::UUri& topic, const CallOptions& options) { + auto attributes = utils::ProtoConverter::BuildSubscribeAttributes( + options.when_expire, options.subscription_details, + options.sample_period_ms); + + return utils::ProtoConverter::BuildSubscriptionRequest(topic, attributes); +} + +UnsubscribeRequest RequestBuilder::buildUnsubscribeRequest( + const v1::UUri& topic) { + return utils::ProtoConverter::BuildUnSubscribeRequest(topic); +} + +FetchSubscriptionsRequest RequestBuilder::buildFetchSubscriptionsRequest( + const v1::UUri& topic) { + return utils::ProtoConverter::BuildFetchSubscriptionsRequest(topic); +} + +FetchSubscriptionsRequest RequestBuilder::buildFetchSubscriptionsRequest( + const SubscriberInfo& subscriber) { + return utils::ProtoConverter::BuildFetchSubscriptionsRequest(subscriber); +} + +FetchSubscribersRequest RequestBuilder::buildFetchSubscribersRequest( + const v1::UUri& topic) { + return utils::ProtoConverter::BuildFetchSubscribersRequest(topic); +} + +NotificationsRequest RequestBuilder::buildNotificationsRequest( + const v1::UUri& topic) { + return utils::ProtoConverter::BuildNotificationsRequest(topic); +} + +} // namespace uprotocol::core::usubscription::v3 diff --git a/src/client/usubscription/v3/RpcClientUSubscription.cpp b/src/client/usubscription/v3/RpcClientUSubscription.cpp new file mode 100644 index 000000000..d882b8e8b --- /dev/null +++ b/src/client/usubscription/v3/RpcClientUSubscription.cpp @@ -0,0 +1,88 @@ +// SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#include +#include + +#include + +#include "up-cpp/communication/RpcClient.h" + +auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; // MUST be >= 4 + +namespace uprotocol::core::usubscription::v3 { + +RpcClientUSubscription::RpcClientUSubscription( + std::shared_ptr transport, + const USubscriptionOptions& options) + : transport_(std::move(transport)) { + uuri_builder_.setAuthorityName(options.authority_name) + .setInstanceId(options.instance_id); + + rpc_client_ = std::make_shared( + transport_, priority, USUBSCRIPTION_REQUEST_TTL); +} + +communication::RpcClient::InvokeProtoFuture +RpcClientUSubscription::subscribe( + const SubscriptionRequest& subscription_request) { + auto method = + uuri_builder_.getServiceUriWithResourceId(RESOURCE_ID_SUBSCRIBE); + return rpc_client_->invokeMethodToProto( + method, subscription_request); +} + +communication::RpcClient::InvokeProtoFuture +RpcClientUSubscription::unsubscribe( + const UnsubscribeRequest& unsubscribe_request) { + auto method = + uuri_builder_.getServiceUriWithResourceId(RESOURCE_ID_UNSUBSCRIBE); + return rpc_client_->invokeMethodToProto( + method, unsubscribe_request); +} + +communication::RpcClient::InvokeProtoFuture +RpcClientUSubscription::fetch_subscriptions( + const FetchSubscriptionsRequest& fetch_subscriptions_request) { + auto method = uuri_builder_.getServiceUriWithResourceId( + RESOURCE_ID_FETCH_SUBSCRIPTIONS); + return rpc_client_->invokeMethodToProto( + method, fetch_subscriptions_request); +} + +communication::RpcClient::InvokeProtoFuture +RpcClientUSubscription::fetch_subscribers( + const FetchSubscribersRequest& fetch_subscribers_request) { + auto method = uuri_builder_.getServiceUriWithResourceId( + RESOURCE_ID_FETCH_SUBSCRIBERS); + return rpc_client_->invokeMethodToProto( + method, fetch_subscribers_request); +} + +communication::RpcClient::InvokeProtoFuture +RpcClientUSubscription::register_for_notifications( + const NotificationsRequest& register_notifications_request) { + auto method = uuri_builder_.getServiceUriWithResourceId( + RESOURCE_ID_REGISTER_FOR_NOTIFICATIONS); + return rpc_client_->invokeMethodToProto( + method, register_notifications_request); +} + +communication::RpcClient::InvokeProtoFuture +RpcClientUSubscription::unregister_for_notifications( + const NotificationsRequest& unregister_notifications_request) { + auto method = uuri_builder_.getServiceUriWithResourceId( + RESOURCE_ID_UNREGISTER_FOR_NOTIFICATIONS); + return rpc_client_->invokeMethodToProto( + method, unregister_notifications_request); +} + +} // namespace uprotocol::core::usubscription::v3 diff --git a/src/client/usubscription/v3/USubscriptionUUriBuilder.cpp b/src/client/usubscription/v3/USubscriptionUUriBuilder.cpp new file mode 100644 index 000000000..63cbd0725 --- /dev/null +++ b/src/client/usubscription/v3/USubscriptionUUriBuilder.cpp @@ -0,0 +1,86 @@ +// SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#include "up-cpp/client/usubscription/v3/USubscriptionUUriBuilder.h" + +namespace uprotocol::core::usubscription::v3 { + +constexpr uint32_t SERVICE_ID_BITMASK = 0x0000FFFF; +constexpr uint32_t INSTANCE_ID_BITMASK = 0xFFFF0000; + +USubscriptionUUriBuilder::USubscriptionUUriBuilder() { + // Get the service descriptor + const google::protobuf::ServiceDescriptor* service = + uSubscription::descriptor(); + const auto& service_options = service->options(); + + // Get the service options + const auto& service_version_major = + service_options.GetExtension(uprotocol::service_version_major); + const auto& service_id = + service_options.GetExtension(uprotocol::service_id); + const auto& notification_topic = + service_options.GetExtension(uprotocol::notification_topic, 0); + + // Set the values in the URI + base_uri_.set_ue_id(service_id); + base_uri_.set_ue_version_major(service_version_major); + sink_resource_id_ = notification_topic.id(); +} + +USubscriptionUUriBuilder& USubscriptionUUriBuilder::setAuthorityName( + const std::string& authority_name) { + base_uri_.set_authority_name(authority_name); + return *this; +} + +USubscriptionUUriBuilder& USubscriptionUUriBuilder::setUEntityId( + u_int32_t ue_id) { + base_uri_.set_ue_id(ue_id); + return *this; +} + +constexpr uint16_t BITS_UINT_16 = 16; +USubscriptionUUriBuilder& USubscriptionUUriBuilder::setInstanceId( + u_int16_t instance_id) { + auto updated_ue_id = (SERVICE_ID_BITMASK & base_uri_.ue_id()) + + (static_cast(instance_id) << BITS_UINT_16); + base_uri_.set_ue_id(updated_ue_id); + return *this; +} + +USubscriptionUUriBuilder& USubscriptionUUriBuilder::setServiceId( + u_int16_t service_id) { + auto updated_ue_id = (INSTANCE_ID_BITMASK & base_uri_.ue_id()) + service_id; + base_uri_.set_ue_id(updated_ue_id); + return *this; +} + +USubscriptionUUriBuilder& USubscriptionUUriBuilder::setResourceId( + u_int32_t resource_id) { + base_uri_.set_resource_id(resource_id); + return *this; +} + +v1::UUri USubscriptionUUriBuilder::getServiceUriWithResourceId( + uint32_t resource_id) const { + v1::UUri uri = base_uri_; // Copy the base URI + uri.set_resource_id(resource_id); + return uri; +} + +v1::UUri USubscriptionUUriBuilder::getNotificationUri() { + v1::UUri uri = base_uri_; // Copy the base URI + uri.set_resource_id(sink_resource_id_); + return uri; +} + +} // namespace uprotocol::core::usubscription::v3 diff --git a/src/communication/RpcClient.cpp b/src/communication/RpcClient.cpp index 8fd100a48..f4dc0c148 100644 --- a/src/communication/RpcClient.cpp +++ b/src/communication/RpcClient.cpp @@ -208,14 +208,13 @@ RpcClient::InvokeHandle RpcClient::invokeMethod(v1::UMessage&& request, RpcClient::InvokeHandle RpcClient::invokeMethod( const v1::UUri& method, datamodel::builder::Payload&& payload, Callback&& callback) { - return invokeMethod(builder_.withMethod(method).build(std::move(payload)), + return invokeMethod(builder_.build(method, std::move(payload)), std::move(callback)); } RpcClient::InvokeHandle RpcClient::invokeMethod(const v1::UUri& method, Callback&& callback) { - return invokeMethod(builder_.withMethod(method).build(), - std::move(callback)); + return invokeMethod(builder_.build(method), std::move(callback)); } RpcClient::InvokeFuture RpcClient::invokeMethod( @@ -253,15 +252,6 @@ RpcClient::InvokeFuture RpcClient::invokeMethod(const v1::UUri& method) { RpcClient::RpcClient(RpcClient&&) noexcept = default; RpcClient::~RpcClient() = default; -RpcClient::InvokeFuture::InvokeFuture() = default; -RpcClient::InvokeFuture::InvokeFuture(InvokeFuture&& other) noexcept = default; -RpcClient::InvokeFuture& RpcClient::InvokeFuture::operator=( - InvokeFuture&& other) noexcept = default; - -RpcClient::InvokeFuture::InvokeFuture(std::future&& future, - InvokeHandle&& handle) noexcept - : callback_handle_(std::move(handle)), future_(std::move(future)) {} - } // namespace uprotocol::communication /////////////////////////////////////////////////////////////////////////////// diff --git a/src/datamodel/builder/UMessage.cpp b/src/datamodel/builder/UMessage.cpp index c3408ecf7..a27612680 100644 --- a/src/datamodel/builder/UMessage.cpp +++ b/src/datamodel/builder/UMessage.cpp @@ -218,6 +218,21 @@ v1::UMessage UMessageBuilder::build() const { return message; } +v1::UMessage UMessageBuilder::build(const v1::UUri& method) const { + v1::UMessage message; + if (expectedPayloadFormat_.has_value()) { + throw UnexpectedFormat( + "Tried to build with no payload when a payload format has been set " + "using withPayloadFormat()"); + } + + *message.mutable_attributes() = attributes_; + *message.mutable_attributes()->mutable_sink() = method; + *(message.mutable_attributes()->mutable_id()) = uuidBuilder_.build(); + + return message; +} + v1::UMessage UMessageBuilder::build(builder::Payload&& payload) const { v1::UMessage message; @@ -236,6 +251,26 @@ v1::UMessage UMessageBuilder::build(builder::Payload&& payload) const { return message; } +v1::UMessage UMessageBuilder::build(const v1::UUri& method, + builder::Payload&& payload) const { + v1::UMessage message; + + *message.mutable_attributes() = attributes_; + *message.mutable_attributes()->mutable_sink() = method; + *(message.mutable_attributes()->mutable_id()) = uuidBuilder_.build(); + auto [payloadData, payloadFormat] = std::move(payload).buildMove(); + if (expectedPayloadFormat_.has_value()) { + if (payloadFormat != expectedPayloadFormat_) { + throw UnexpectedFormat( + "Payload format does not match the expected format"); + } + } + *message.mutable_payload() = std::move(payloadData); + message.mutable_attributes()->set_payload_format(payloadFormat); + + return message; +} + UMessageBuilder::UMessageBuilder(v1::UMessageType msg_type, v1::UUri&& source, std::optional&& sink, std::optional&& request_id) diff --git a/src/utils/ProtoConverter.cpp b/src/utils/ProtoConverter.cpp index c4159ee9f..b6ee5403a 100644 --- a/src/utils/ProtoConverter.cpp +++ b/src/utils/ProtoConverter.cpp @@ -1,5 +1,13 @@ #include "up-cpp/utils/ProtoConverter.h" +#include +#include + +#include + +#include "up-cpp/datamodel/builder/Payload.h" +#include "up-cpp/utils/Expected.h" + namespace uprotocol::utils { google::protobuf::Timestamp ProtoConverter::ConvertToProtoTimestamp( const std::chrono::system_clock::time_point& tp) { @@ -76,4 +84,36 @@ UnsubscribeRequest ProtoConverter::BuildUnSubscribeRequest( return unsubscribe_request; } +FetchSubscriptionsRequest ProtoConverter::BuildFetchSubscriptionsRequest( + const v1::UUri& topic) { + FetchSubscriptionsRequest fetch_subscriptions_request; + *fetch_subscriptions_request.mutable_topic() = topic; + + return fetch_subscriptions_request; +} + +FetchSubscriptionsRequest ProtoConverter::BuildFetchSubscriptionsRequest( + const SubscriberInfo& subscriber) { + FetchSubscriptionsRequest fetch_subscriptions_request; + *fetch_subscriptions_request.mutable_subscriber() = subscriber; + + return fetch_subscriptions_request; +} + +FetchSubscribersRequest ProtoConverter::BuildFetchSubscribersRequest( + const v1::UUri& topic) { + FetchSubscribersRequest fetch_subscribers_request; + *fetch_subscribers_request.mutable_topic() = topic; + + return fetch_subscribers_request; +} + +NotificationsRequest ProtoConverter::BuildNotificationsRequest( + const v1::UUri& topic) { + NotificationsRequest notifications_request; + *notifications_request.mutable_topic() = topic; + + return notifications_request; +} + } // namespace uprotocol::utils diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 0dd7940e7..1adee2504 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -92,6 +92,11 @@ add_coverage_test("NotificationSourceTest" coverage/communication/NotificationSo # client add_coverage_test("ConsumerTest" coverage/client/usubscription/v3/ConsumerTest.cpp) +add_coverage_test("RequestBuilderTest" coverage/client/usubscription/v3/RequestBuilderTest.cpp) +add_coverage_test("RpcClientUSubscriptionTest" coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp) +add_coverage_test("USubscriptionUUriBuilderTest" coverage/client/usubscription/v3/USubscriptionUUriBuilderTest.cpp) + +# core ########################## EXTRAS ############################################# add_extra_test("PublisherSubscriberTest" extra/PublisherSubscriberTest.cpp) diff --git a/test/coverage/client/usubscription/v3/ConsumerTest.cpp b/test/coverage/client/usubscription/v3/ConsumerTest.cpp index 6c6c133e4..78e0ced79 100644 --- a/test/coverage/client/usubscription/v3/ConsumerTest.cpp +++ b/test/coverage/client/usubscription/v3/ConsumerTest.cpp @@ -17,6 +17,7 @@ #include #include "UTransportMock.h" +#include "up-cpp/client/usubscription/v3/RequestBuilder.h" namespace { using MsgDiff = google::protobuf::util::MessageDifferencer; @@ -106,7 +107,7 @@ TEST_F(ConsumerTest, ConstructorTestSuccess) { // NOLINT auto subscribe_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; - auto options = uprotocol::client::usubscription::v3::ConsumerOptions(); + auto options = uprotocol::core::usubscription::v3::CallOptions(); auto consumer_or_status = uprotocol::client::usubscription::v3::Consumer::create( @@ -131,7 +132,7 @@ TEST_F(ConsumerTest, SubscribeTestSuccess) { // NOLINT auto subscribe_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; - auto options = uprotocol::client::usubscription::v3::ConsumerOptions(); + auto options = uprotocol::core::usubscription::v3::CallOptions(); auto consumer_or_status = uprotocol::client::usubscription::v3::Consumer::create( @@ -177,7 +178,7 @@ TEST_F(ConsumerTest, UnsubscribeTestSuccess) { // NOLINT auto subscribe_request_ttl = std::chrono::milliseconds(REQUEST_TTL_TIME); auto priority = uprotocol::v1::UPriority::UPRIORITY_CS4; - auto options = uprotocol::client::usubscription::v3::ConsumerOptions(); + auto options = uprotocol::core::usubscription::v3::CallOptions(); auto consumer_or_status = uprotocol::client::usubscription::v3::Consumer::create( diff --git a/test/coverage/client/usubscription/v3/RequestBuilderTest.cpp b/test/coverage/client/usubscription/v3/RequestBuilderTest.cpp new file mode 100644 index 000000000..a0faaa4d4 --- /dev/null +++ b/test/coverage/client/usubscription/v3/RequestBuilderTest.cpp @@ -0,0 +1,154 @@ +// SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#include +#include +#include + +#include +#include + +#include "up-cpp/client/usubscription/v3/RequestBuilder.h" + +constexpr uint32_t SOURCE_UE_ID = 0x00011101; +constexpr uint32_t SOURCE_UE_VERSION_MAJOR = 0xF8; +constexpr uint32_t SOURCE_RESOURCE_ID = 0x8101; + +namespace uprotocol::core::usubscription::v3 { + +class RequestBuilderTest : public ::testing::Test { +private: + v1::UUri source_; + CallOptions options_; + +protected: + const v1::UUri& getSource() const { return source_; } + const CallOptions& getOptions() const { return options_; } + + void SetUp() override { + // Create a UUri object for testing + source_.set_authority_name("10.0.0.1"); + source_.set_ue_id(SOURCE_UE_ID); + source_.set_ue_version_major(SOURCE_UE_VERSION_MAJOR); + source_.set_resource_id(SOURCE_RESOURCE_ID); + + options_.permission_level = 2; + options_.token = "sample_token"; + options_.when_expire = + std::chrono::system_clock::now() + std::chrono::milliseconds(1); + options_.sample_period_ms = std::chrono::seconds(1); + options_.subscriber_details = google::protobuf::Any(); + options_.subscription_details = google::protobuf::Any(); + } + void TearDown() override {} + + // Run once per execution of the test application. + // Used for setup of all tests. Has access to this instance. + RequestBuilderTest() = default; + + // Run once per execution of the test application. + // Used only for global setup outside of tests. + static void SetUpTestSuite() {} + static void TearDownTestSuite() {} + +public: + ~RequestBuilderTest() override = default; +}; + +TEST_F(RequestBuilderTest, BuildSubscriptionRequestWithOptions) { // NOLINT + const v1::UUri topic = getSource(); + + SubscriptionRequest request; + ASSERT_NO_THROW( // NOLINT + request = + RequestBuilder::buildSubscriptionRequest(topic, getOptions())); + + // Verify the attributes in the request + // TODO(max) there should probably be some test that explicitely checks data + // from the options + EXPECT_TRUE(request.has_topic()); + EXPECT_TRUE(request.has_attributes()); + EXPECT_EQ(request.topic().SerializeAsString(), topic.SerializeAsString()); + EXPECT_EQ(request.GetTypeName(), + "uprotocol.core.usubscription.v3.SubscriptionRequest"); +} + +TEST_F(RequestBuilderTest, BuildUnsubscribeRequest) { // NOLINT + const v1::UUri topic = getSource(); + + UnsubscribeRequest request; + ASSERT_NO_THROW( // NOLINT + request = RequestBuilder::buildUnsubscribeRequest(topic)); + + // Verify the attributes in the request + EXPECT_TRUE(request.has_topic()); + EXPECT_EQ(request.topic().SerializeAsString(), topic.SerializeAsString()); + EXPECT_EQ(request.GetTypeName(), + "uprotocol.core.usubscription.v3.UnsubscribeRequest"); +} + +TEST_F(RequestBuilderTest, BuildFetchSubscriptionsRequestWithTopic) { // NOLINT + const v1::UUri topic = getSource(); + + FetchSubscriptionsRequest request; + ASSERT_NO_THROW(request = // NOLINT + RequestBuilder::buildFetchSubscriptionsRequest(topic)); + + // Verify the attributes in the request + EXPECT_TRUE(request.has_topic()); + EXPECT_EQ(request.topic().SerializeAsString(), topic.SerializeAsString()); + EXPECT_EQ(request.GetTypeName(), + "uprotocol.core.usubscription.v3.FetchSubscriptionsRequest"); +} + +TEST_F(RequestBuilderTest, // NOLINT + BuildFetchSubscriptionsRequestWithSubscriberInfo) { + const SubscriberInfo subscriber; + + FetchSubscriptionsRequest request; + ASSERT_NO_THROW( // NOLINT + request = RequestBuilder::buildFetchSubscriptionsRequest(subscriber)); + + // Verify the attributes in the request + EXPECT_FALSE(request.has_topic()); + EXPECT_EQ(request.GetTypeName(), + "uprotocol.core.usubscription.v3.FetchSubscriptionsRequest"); +} + +TEST_F(RequestBuilderTest, BuildFetchSubscribersRequest) { // NOLINT + const v1::UUri topic = getSource(); + + FetchSubscribersRequest request; + ASSERT_NO_THROW(request = // NOLINT + RequestBuilder::buildFetchSubscribersRequest(topic)); + + // Verify the attributes in the request + EXPECT_TRUE(request.has_topic()); + EXPECT_EQ(request.topic().SerializeAsString(), topic.SerializeAsString()); + EXPECT_EQ(request.GetTypeName(), + "uprotocol.core.usubscription.v3.FetchSubscribersRequest"); +} + +TEST_F(RequestBuilderTest, BuildNotificationsRequest) { // NOLINT + const v1::UUri topic = getSource(); + + NotificationsRequest request; + ASSERT_NO_THROW( // NOLINT + request = RequestBuilder::buildNotificationsRequest(topic)); + + // Verify the attributes in the request + EXPECT_TRUE(request.has_topic()); + EXPECT_EQ(request.topic().SerializeAsString(), topic.SerializeAsString()); + EXPECT_EQ(request.GetTypeName(), + "uprotocol.core.usubscription.v3.NotificationsRequest"); +} + +} // namespace uprotocol::core::usubscription::v3 diff --git a/test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp b/test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp new file mode 100644 index 000000000..226280a10 --- /dev/null +++ b/test/coverage/client/usubscription/v3/RpcClientUSubscriptionTest.cpp @@ -0,0 +1,903 @@ +// SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#include +#include + +#include "UTransportMock.h" +#include "up-cpp/client/usubscription/v3/RequestBuilder.h" +#include "up-cpp/client/usubscription/v3/RpcClientUSubscription.h" +#include "up-cpp/communication/RpcServer.h" +#include "up-cpp/utils/ProtoConverter.h" +#include "uprotocol/v1/uri.pb.h" + +using UMessage = uprotocol::v1::UMessage; +using Payload = uprotocol::datamodel::builder::Payload; +using ProtoConverter = uprotocol::utils::ProtoConverter; +using SubscriptionRequest = + uprotocol::core::usubscription::v3::SubscriptionRequest; +using SubscriptionResponse = + uprotocol::core::usubscription::v3::SubscriptionResponse; +using RequestBuilder = uprotocol::core::usubscription::v3::RequestBuilder; + +namespace { + +constexpr uint32_t UE_VERSION_MAJOR = 3; +constexpr uint32_t CLIENT_UE_ID = 23492; + +constexpr int ITERATIONS_TILL_TIMEOUT = 10; +constexpr std::chrono::milliseconds MILLISECONDS_PER_ITERATION = + std::chrono::milliseconds(50); + +class RpcClientUSubscriptionTest : public testing::Test { +protected: + // Run once per TEST_F.s + // Used to set up clean environments per test. + void SetUp() override { + uprotocol::v1::UUri client_uuri; + client_uuri.set_authority_name("client.usubscription"); + client_uuri.set_ue_id(CLIENT_UE_ID); + client_uuri.set_ue_version_major(UE_VERSION_MAJOR); + client_uuri.set_resource_id(0); + + client_transport_ = + std::make_shared(client_uuri); + + uprotocol::v1::UUri server_uuri; + server_uuri.set_authority_name("core.usubscription"); + server_uuri.set_ue_id(1); + server_uuri.set_ue_version_major(UE_VERSION_MAJOR); + server_uuri.set_resource_id(0); + + server_transport_ = + std::make_shared(server_uuri); + + constexpr uint32_t SERVER_RESOURCE_ID = 32600; + server_method_uuri_.set_authority_name("core.usubscription"); + server_method_uuri_.set_ue_id(1); + server_method_uuri_.set_ue_version_major(UE_VERSION_MAJOR); + server_method_uuri_.set_resource_id(SERVER_RESOURCE_ID); + + constexpr uint32_t TOPIC_UE = 2342; + constexpr uint32_t TOPIC_RESOURCE_ID = 12340; + subscription_topic_.set_authority_name("topic.usubscription"); + subscription_topic_.set_ue_id(TOPIC_UE); + subscription_topic_.set_ue_version_major(UE_VERSION_MAJOR); + subscription_topic_.set_resource_id(TOPIC_RESOURCE_ID); + } + + void TearDown() override {} + + // Run once per execution of the test application. + // Used for setup of all tests. Has access to this instance. + RpcClientUSubscriptionTest() = default; + + // Run once per execution of the test application. + // Used only for global setup outside of tests. + static void SetUpTestSuite() {} + static void TearDownTestSuite() {} + + std::shared_ptr getClientTransport() { + return client_transport_; + } + + std::shared_ptr getServerTransport() { + return server_transport_; + } + + uprotocol::v1::UUri getServerMethodUuri() { return server_method_uuri_; } + + uprotocol::v1::UUri getSubscriptionTopic() { return subscription_topic_; } + + uprotocol::core::usubscription::v3::USubscriptionOptions + getUSubscriptionOptions() { + return options_; + } + +private: + std::shared_ptr client_transport_; + std::shared_ptr server_transport_; + uprotocol::v1::UUri server_method_uuri_; + uprotocol::v1::UUri subscription_topic_; + uprotocol::core::usubscription::v3::USubscriptionOptions options_ = { + "core.usubscription", + 0x0000, + }; + +public: + ~RpcClientUSubscriptionTest() override = default; +}; + +// +// Tests for subscribe method +// + +TEST_F(RpcClientUSubscriptionTest, // NOLINT + SubscribeRoundtripWithValidProtoPayload) { + bool server_callback_executed = false; + SubscriptionRequest server_capture; + SubscriptionResponse server_response; + *server_response.mutable_topic() = getSubscriptionTopic(); + auto server_or_status = uprotocol::communication::RpcServer::create( + getServerTransport(), getServerMethodUuri(), + [&server_callback_executed, &server_capture, + &server_response](const UMessage& message) -> std::optional { + server_callback_executed = true; + auto request_or_status = + ProtoConverter::extractFromProtobuf( + message); + if (!request_or_status.has_value()) { + return std::nullopt; + } + server_capture = request_or_status.value(); + Payload response_payload(server_response); + return response_payload; + }, + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF); + + ASSERT_TRUE(server_or_status.has_value()); + ASSERT_NE(server_or_status.value(), nullptr); + EXPECT_TRUE(getServerTransport()->getListener()); + + auto client = uprotocol::core::usubscription::v3::RpcClientUSubscription( + getClientTransport(), getUSubscriptionOptions()); + + const auto subscription_request = + RequestBuilder::buildSubscriptionRequest(getSubscriptionTopic()); + + auto response_or_status_future = client.subscribe(subscription_request); + + // wait to give the client time to send the request. Otherwise this would + // cause a race condition + int counter = ITERATIONS_TILL_TIMEOUT; + while (counter > 0 && getClientTransport()->getSendCount() == 0) { + counter--; + std::this_thread::sleep_for(MILLISECONDS_PER_ITERATION); + } + ASSERT_EQ(getClientTransport()->getSendCount(), 1); + EXPECT_TRUE(getClientTransport()->getListener()); + + (*getServerTransport()->getListener())(getClientTransport()->getMessage()); + EXPECT_TRUE(server_callback_executed); + EXPECT_EQ(server_capture.SerializeAsString(), + subscription_request.SerializeAsString()); + + getClientTransport()->mockMessage(getServerTransport()->getMessage()); + EXPECT_TRUE(getClientTransport()->getListener()); + EXPECT_EQ(getClientTransport()->getSendCount(), 1); + auto response_or_status = response_or_status_future.get(); + ASSERT_TRUE(response_or_status.has_value()); + EXPECT_EQ(response_or_status.value().SerializeAsString(), + server_response.SerializeAsString()); +} + +TEST_F(RpcClientUSubscriptionTest, // NOLINT + SubscribeRoundtripWithValidProtoAnyPayload) { + bool server_callback_executed = false; + SubscriptionRequest server_capture; + SubscriptionResponse server_response; + *server_response.mutable_topic() = getSubscriptionTopic(); + auto server_or_status = uprotocol::communication::RpcServer::create( + getServerTransport(), getServerMethodUuri(), + [&server_callback_executed, &server_capture, + &server_response](const UMessage& message) -> std::optional { + server_callback_executed = true; + auto request_or_status = + ProtoConverter::extractFromProtobuf( + message); + if (!request_or_status.has_value()) { + return std::nullopt; + } + server_capture = request_or_status.value(); + google::protobuf::Any any; + if (!any.PackFrom(server_response)) { + return std::nullopt; + } + Payload response_payload(any); + return response_payload; + }, + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY); + + ASSERT_TRUE(server_or_status.has_value()); + ASSERT_NE(server_or_status.value(), nullptr); + EXPECT_TRUE(getServerTransport()->getListener()); + + auto client = uprotocol::core::usubscription::v3::RpcClientUSubscription( + getClientTransport(), getUSubscriptionOptions()); + + const auto subscription_request = + RequestBuilder::buildSubscriptionRequest(getSubscriptionTopic()); + + auto response_or_status_future = client.subscribe(subscription_request); + + // wait to give the client time to send the request. Otherwise this would + // cause a race condition + int counter = ITERATIONS_TILL_TIMEOUT; + while (counter > 0 && getClientTransport()->getSendCount() == 0) { + counter--; + std::this_thread::sleep_for(MILLISECONDS_PER_ITERATION); + } + ASSERT_EQ(getClientTransport()->getSendCount(), 1); + EXPECT_TRUE(getClientTransport()->getListener()); + + (*getServerTransport()->getListener())(getClientTransport()->getMessage()); + EXPECT_TRUE(server_callback_executed); + EXPECT_EQ(server_capture.SerializeAsString(), + subscription_request.SerializeAsString()); + + getClientTransport()->mockMessage(getServerTransport()->getMessage()); + EXPECT_TRUE(getClientTransport()->getListener()); + EXPECT_EQ(getClientTransport()->getSendCount(), 1); + auto response_or_status = response_or_status_future.get(); + ASSERT_TRUE(response_or_status.has_value()); + EXPECT_EQ(response_or_status.value().SerializeAsString(), + server_response.SerializeAsString()); +} + +//////////////////////////////// +// Tests for unsubscribe method// +//////////////////////////////// + +using UnsubscibeRequest = + uprotocol::core::usubscription::v3::UnsubscribeRequest; +using UnsubscribeResponse = + uprotocol::core::usubscription::v3::UnsubscribeResponse; + +TEST_F(RpcClientUSubscriptionTest, // NOLINT + UnsubscribeRoundtripWithValidProtoPayload) { + bool server_callback_executed = false; + UnsubscibeRequest server_capture; + UnsubscribeResponse server_response; + auto server_or_status = uprotocol::communication::RpcServer::create( + getServerTransport(), getServerMethodUuri(), + [&server_callback_executed, &server_capture, + &server_response](const UMessage& message) -> std::optional { + server_callback_executed = true; + auto request_or_status = + ProtoConverter::extractFromProtobuf(message); + if (!request_or_status.has_value()) { + return std::nullopt; + } + server_capture = request_or_status.value(); + Payload response_payload(server_response); + return response_payload; + }, + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF); + + ASSERT_TRUE(server_or_status.has_value()); + ASSERT_NE(server_or_status.value(), nullptr); + EXPECT_TRUE(getServerTransport()->getListener()); + + auto client = uprotocol::core::usubscription::v3::RpcClientUSubscription( + getClientTransport(), getUSubscriptionOptions()); + + const auto unsubscribe_request = + RequestBuilder::buildUnsubscribeRequest(getSubscriptionTopic()); + + auto response_or_status_future = client.unsubscribe(unsubscribe_request); + + // wait to give the client time to send the request. Otherwise this would + // cause a race condition + int counter = ITERATIONS_TILL_TIMEOUT; + while (counter > 0 && getClientTransport()->getSendCount() == 0) { + counter--; + std::this_thread::sleep_for(MILLISECONDS_PER_ITERATION); + } + ASSERT_EQ(getClientTransport()->getSendCount(), 1); + EXPECT_TRUE(getClientTransport()->getListener()); + + (*getServerTransport()->getListener())(getClientTransport()->getMessage()); + EXPECT_TRUE(server_callback_executed); + EXPECT_EQ(server_capture.SerializeAsString(), + unsubscribe_request.SerializeAsString()); + + getClientTransport()->mockMessage(getServerTransport()->getMessage()); + EXPECT_TRUE(getClientTransport()->getListener()); + EXPECT_EQ(getClientTransport()->getSendCount(), 1); + auto response_or_status = response_or_status_future.get(); + ASSERT_TRUE(response_or_status.has_value()); + EXPECT_EQ(response_or_status.value().SerializeAsString(), + server_response.SerializeAsString()); +} + +TEST_F(RpcClientUSubscriptionTest, // NOLINT + UnsubscribeRoundtripWithValidProtoAnyPayload) { + bool server_callback_executed = false; + UnsubscibeRequest server_capture; + UnsubscribeResponse server_response; + auto server_or_status = uprotocol::communication::RpcServer::create( + getServerTransport(), getServerMethodUuri(), + [&server_callback_executed, &server_capture, + &server_response](const UMessage& message) -> std::optional { + server_callback_executed = true; + auto request_or_status = + ProtoConverter::extractFromProtobuf(message); + if (!request_or_status.has_value()) { + return std::nullopt; + } + server_capture = request_or_status.value(); + google::protobuf::Any any; + if (!any.PackFrom(server_response)) { + return std::nullopt; + } + Payload response_payload(any); + return response_payload; + }, + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY); + + ASSERT_TRUE(server_or_status.has_value()); + ASSERT_NE(server_or_status.value(), nullptr); + EXPECT_TRUE(getServerTransport()->getListener()); + + auto client = uprotocol::core::usubscription::v3::RpcClientUSubscription( + getClientTransport(), getUSubscriptionOptions()); + + const auto unsubscribe_request = + RequestBuilder::buildUnsubscribeRequest(getSubscriptionTopic()); + + auto response_or_status_future = client.unsubscribe(unsubscribe_request); + + // wait to give the client time to send the request. Otherwise this would + // cause a race condition + int counter = ITERATIONS_TILL_TIMEOUT; + while (counter > 0 && getClientTransport()->getSendCount() == 0) { + counter--; + std::this_thread::sleep_for(MILLISECONDS_PER_ITERATION); + } + ASSERT_EQ(getClientTransport()->getSendCount(), 1); + EXPECT_TRUE(getClientTransport()->getListener()); + + (*getServerTransport()->getListener())(getClientTransport()->getMessage()); + EXPECT_TRUE(server_callback_executed); + EXPECT_EQ(server_capture.SerializeAsString(), + unsubscribe_request.SerializeAsString()); + + getClientTransport()->mockMessage(getServerTransport()->getMessage()); + EXPECT_TRUE(getClientTransport()->getListener()); + EXPECT_EQ(getClientTransport()->getSendCount(), 1); + auto response_or_status = response_or_status_future.get(); + ASSERT_TRUE(response_or_status.has_value()); + EXPECT_EQ(response_or_status.value().SerializeAsString(), + server_response.SerializeAsString()); +} + +//////////////////////////////// +// Tests for fetch_subscribers method// +//////////////////////////////// + +using FetchSubscribersRequest = + uprotocol::core::usubscription::v3::FetchSubscribersRequest; +using FetchSubscribersResponse = + uprotocol::core::usubscription::v3::FetchSubscribersResponse; + +TEST_F(RpcClientUSubscriptionTest, // NOLINT + fetchSubscriberRoundtripWithValidProtoPayload) { + bool server_callback_executed = false; + FetchSubscribersRequest server_capture; + FetchSubscribersResponse server_response; + auto server_or_status = uprotocol::communication::RpcServer::create( + getServerTransport(), getServerMethodUuri(), + [&server_callback_executed, &server_capture, + &server_response](const UMessage& message) -> std::optional { + server_callback_executed = true; + auto request_or_status = + ProtoConverter::extractFromProtobuf( + message); + if (!request_or_status.has_value()) { + return std::nullopt; + } + server_capture = request_or_status.value(); + Payload response_payload(server_response); + return response_payload; + }, + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF); + + ASSERT_TRUE(server_or_status.has_value()); + ASSERT_NE(server_or_status.value(), nullptr); + EXPECT_TRUE(getServerTransport()->getListener()); + + auto client = uprotocol::core::usubscription::v3::RpcClientUSubscription( + getClientTransport(), getUSubscriptionOptions()); + + const auto fetch_subscribers_request = + RequestBuilder::buildFetchSubscribersRequest(getSubscriptionTopic()); + + auto response_or_status_future = + client.fetch_subscribers(fetch_subscribers_request); + + // wait to give the client time to send the request. Otherwise this would + // cause a race condition + int counter = ITERATIONS_TILL_TIMEOUT; + while (counter > 0 && getClientTransport()->getSendCount() == 0) { + counter--; + std::this_thread::sleep_for(MILLISECONDS_PER_ITERATION); + } + ASSERT_EQ(getClientTransport()->getSendCount(), 1); + EXPECT_TRUE(getClientTransport()->getListener()); + + (*getServerTransport()->getListener())(getClientTransport()->getMessage()); + EXPECT_TRUE(server_callback_executed); + EXPECT_EQ(server_capture.SerializeAsString(), + fetch_subscribers_request.SerializeAsString()); + + getClientTransport()->mockMessage(getServerTransport()->getMessage()); + EXPECT_TRUE(getClientTransport()->getListener()); + EXPECT_EQ(getClientTransport()->getSendCount(), 1); + auto response_or_status = response_or_status_future.get(); + ASSERT_TRUE(response_or_status.has_value()); + EXPECT_EQ(response_or_status.value().SerializeAsString(), + server_response.SerializeAsString()); +} + +TEST_F(RpcClientUSubscriptionTest, // NOLINT + FetchSubscriberRoundtripWithValidProtoAnyPayload) { + bool server_callback_executed = false; + FetchSubscribersRequest server_capture; + FetchSubscribersResponse server_response; + auto server_or_status = uprotocol::communication::RpcServer::create( + getServerTransport(), getServerMethodUuri(), + [&server_callback_executed, &server_capture, + &server_response](const UMessage& message) -> std::optional { + server_callback_executed = true; + auto request_or_status = + ProtoConverter::extractFromProtobuf( + message); + if (!request_or_status.has_value()) { + return std::nullopt; + } + server_capture = request_or_status.value(); + google::protobuf::Any any; + if (!any.PackFrom(server_response)) { + return std::nullopt; + } + Payload response_payload(any); + return response_payload; + }, + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY); + + ASSERT_TRUE(server_or_status.has_value()); + ASSERT_NE(server_or_status.value(), nullptr); + EXPECT_TRUE(getServerTransport()->getListener()); + + auto client = uprotocol::core::usubscription::v3::RpcClientUSubscription( + getClientTransport(), getUSubscriptionOptions()); + + const auto fetch_subscribers_request = + RequestBuilder::buildFetchSubscribersRequest(getSubscriptionTopic()); + + auto response_or_status_future = + client.fetch_subscribers(fetch_subscribers_request); + + // wait to give the client time to send the request. Otherwise this would + // cause a race condition + int counter = ITERATIONS_TILL_TIMEOUT; + while (counter > 0 && getClientTransport()->getSendCount() == 0) { + counter--; + std::this_thread::sleep_for(MILLISECONDS_PER_ITERATION); + } + ASSERT_EQ(getClientTransport()->getSendCount(), 1); + EXPECT_TRUE(getClientTransport()->getListener()); + + (*getServerTransport()->getListener())(getClientTransport()->getMessage()); + EXPECT_TRUE(server_callback_executed); + EXPECT_EQ(server_capture.SerializeAsString(), + fetch_subscribers_request.SerializeAsString()); + + getClientTransport()->mockMessage(getServerTransport()->getMessage()); + EXPECT_TRUE(getClientTransport()->getListener()); + EXPECT_EQ(getClientTransport()->getSendCount(), 1); + auto response_or_status = response_or_status_future.get(); + ASSERT_TRUE(response_or_status.has_value()); + EXPECT_EQ(response_or_status.value().SerializeAsString(), + server_response.SerializeAsString()); +} + +//////////////////////////////// +// Tests for fetch_subscriptions method// +//////////////////////////////// + +using FetchSubscriptionsRequest = + uprotocol::core::usubscription::v3::FetchSubscriptionsRequest; +using FetchSubscriptionsResponse = + uprotocol::core::usubscription::v3::FetchSubscriptionsResponse; + +TEST_F(RpcClientUSubscriptionTest, // NOLINT + fetchSubscriptionsRoundtripWithValidProtoPayload) { + bool server_callback_executed = false; + FetchSubscriptionsRequest server_capture; + FetchSubscriptionsResponse server_response; + auto server_or_status = uprotocol::communication::RpcServer::create( + getServerTransport(), getServerMethodUuri(), + [&server_callback_executed, &server_capture, + &server_response](const UMessage& message) -> std::optional { + server_callback_executed = true; + auto request_or_status = + ProtoConverter::extractFromProtobuf( + message); + if (!request_or_status.has_value()) { + return std::nullopt; + } + server_capture = request_or_status.value(); + Payload response_payload(server_response); + return response_payload; + }, + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF); + + ASSERT_TRUE(server_or_status.has_value()); + ASSERT_NE(server_or_status.value(), nullptr); + EXPECT_TRUE(getServerTransport()->getListener()); + + auto client = uprotocol::core::usubscription::v3::RpcClientUSubscription( + getClientTransport(), getUSubscriptionOptions()); + + const uprotocol::core::usubscription::v3::SubscriberInfo subscriber_info; + const auto fetch_subscriptions_request = + RequestBuilder::buildFetchSubscriptionsRequest(subscriber_info); + + auto response_or_status_future = + client.fetch_subscriptions(fetch_subscriptions_request); + + // wait to give the client time to send the request. Otherwise this would + // cause a race condition + int counter = ITERATIONS_TILL_TIMEOUT; + while (counter > 0 && getClientTransport()->getSendCount() == 0) { + counter--; + std::this_thread::sleep_for(MILLISECONDS_PER_ITERATION); + } + ASSERT_EQ(getClientTransport()->getSendCount(), 1); + EXPECT_TRUE(getClientTransport()->getListener()); + + (*getServerTransport()->getListener())(getClientTransport()->getMessage()); + EXPECT_TRUE(server_callback_executed); + EXPECT_EQ(server_capture.SerializeAsString(), + fetch_subscriptions_request.SerializeAsString()); + + getClientTransport()->mockMessage(getServerTransport()->getMessage()); + EXPECT_TRUE(getClientTransport()->getListener()); + EXPECT_EQ(getClientTransport()->getSendCount(), 1); + auto response_or_status = response_or_status_future.get(); + ASSERT_TRUE(response_or_status.has_value()); + EXPECT_EQ(response_or_status.value().SerializeAsString(), + server_response.SerializeAsString()); +} + +TEST_F(RpcClientUSubscriptionTest, // NOLINT + fetchSubscriptionRoundtripWithValidProtoAnyPayload) { + bool server_callback_executed = false; + FetchSubscriptionsRequest server_capture; + FetchSubscriptionsResponse server_response; + auto server_or_status = uprotocol::communication::RpcServer::create( + getServerTransport(), getServerMethodUuri(), + [&server_callback_executed, &server_capture, + &server_response](const UMessage& message) -> std::optional { + server_callback_executed = true; + auto request_or_status = + ProtoConverter::extractFromProtobuf( + message); + if (!request_or_status.has_value()) { + return std::nullopt; + } + server_capture = request_or_status.value(); + google::protobuf::Any any; + if (!any.PackFrom(server_response)) { + return std::nullopt; + } + Payload response_payload(any); + return response_payload; + }, + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY); + + ASSERT_TRUE(server_or_status.has_value()); + ASSERT_NE(server_or_status.value(), nullptr); + EXPECT_TRUE(getServerTransport()->getListener()); + + auto client = uprotocol::core::usubscription::v3::RpcClientUSubscription( + getClientTransport(), getUSubscriptionOptions()); + + const uprotocol::core::usubscription::v3::SubscriberInfo subscriber_info; + const auto fetch_subscribers_request = + RequestBuilder::buildFetchSubscriptionsRequest(subscriber_info); + + auto response_or_status_future = + client.fetch_subscriptions(fetch_subscribers_request); + + // wait to give the client time to send the request. Otherwise this would + // cause a race condition + int counter = ITERATIONS_TILL_TIMEOUT; + while (counter > 0 && getClientTransport()->getSendCount() == 0) { + counter--; + std::this_thread::sleep_for(MILLISECONDS_PER_ITERATION); + } + ASSERT_EQ(getClientTransport()->getSendCount(), 1); + EXPECT_TRUE(getClientTransport()->getListener()); + + (*getServerTransport()->getListener())(getClientTransport()->getMessage()); + EXPECT_TRUE(server_callback_executed); + EXPECT_EQ(server_capture.SerializeAsString(), + fetch_subscribers_request.SerializeAsString()); + + getClientTransport()->mockMessage(getServerTransport()->getMessage()); + EXPECT_TRUE(getClientTransport()->getListener()); + EXPECT_EQ(getClientTransport()->getSendCount(), 1); + auto response_or_status = response_or_status_future.get(); + ASSERT_TRUE(response_or_status.has_value()); + EXPECT_EQ(response_or_status.value().SerializeAsString(), + server_response.SerializeAsString()); +} + +//////////////////////////////// +// Tests for register_for_notification method// +//////////////////////////////// + +using NotificationsRequest = + uprotocol::core::usubscription::v3::NotificationsRequest; +using NotificationsResponse = + uprotocol::core::usubscription::v3::NotificationsResponse; + +TEST_F(RpcClientUSubscriptionTest, // NOLINT + registerNotificationRoundtripWithValidProtoPayload) { + bool server_callback_executed = false; + NotificationsRequest server_capture; + NotificationsResponse server_response; + auto server_or_status = uprotocol::communication::RpcServer::create( + getServerTransport(), getServerMethodUuri(), + [&server_callback_executed, &server_capture, + &server_response](const UMessage& message) -> std::optional { + server_callback_executed = true; + auto request_or_status = + ProtoConverter::extractFromProtobuf( + message); + if (!request_or_status.has_value()) { + return std::nullopt; + } + server_capture = request_or_status.value(); + Payload response_payload(server_response); + return response_payload; + }, + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF); + + ASSERT_TRUE(server_or_status.has_value()); + ASSERT_NE(server_or_status.value(), nullptr); + EXPECT_TRUE(getServerTransport()->getListener()); + + auto client = uprotocol::core::usubscription::v3::RpcClientUSubscription( + getClientTransport(), getUSubscriptionOptions()); + + const auto notifications_request = + RequestBuilder::buildNotificationsRequest(getSubscriptionTopic()); + + auto response_or_status_future = + client.register_for_notifications(notifications_request); + + // wait to give the client time to send the request. Otherwise this would + // cause a race condition + int counter = ITERATIONS_TILL_TIMEOUT; + while (counter > 0 && getClientTransport()->getSendCount() == 0) { + counter--; + std::this_thread::sleep_for(MILLISECONDS_PER_ITERATION); + } + ASSERT_EQ(getClientTransport()->getSendCount(), 1); + EXPECT_TRUE(getClientTransport()->getListener()); + + (*getServerTransport()->getListener())(getClientTransport()->getMessage()); + EXPECT_TRUE(server_callback_executed); + EXPECT_EQ(server_capture.SerializeAsString(), + notifications_request.SerializeAsString()); + + getClientTransport()->mockMessage(getServerTransport()->getMessage()); + EXPECT_TRUE(getClientTransport()->getListener()); + EXPECT_EQ(getClientTransport()->getSendCount(), 1); + auto response_or_status = response_or_status_future.get(); + ASSERT_TRUE(response_or_status.has_value()); + EXPECT_EQ(response_or_status.value().SerializeAsString(), + server_response.SerializeAsString()); +} + +TEST_F(RpcClientUSubscriptionTest, // NOLINT + registerNotificationRoundtripWithValidProtoAnyPayload) { + bool server_callback_executed = false; + NotificationsRequest server_capture; + NotificationsResponse server_response; + auto server_or_status = uprotocol::communication::RpcServer::create( + getServerTransport(), getServerMethodUuri(), + [&server_callback_executed, &server_capture, + &server_response](const UMessage& message) -> std::optional { + server_callback_executed = true; + auto request_or_status = + ProtoConverter::extractFromProtobuf( + message); + if (!request_or_status.has_value()) { + return std::nullopt; + } + server_capture = request_or_status.value(); + google::protobuf::Any any; + if (!any.PackFrom(server_response)) { + return std::nullopt; + } + Payload response_payload(any); + return response_payload; + }, + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY); + + ASSERT_TRUE(server_or_status.has_value()); + ASSERT_NE(server_or_status.value(), nullptr); + EXPECT_TRUE(getServerTransport()->getListener()); + + auto client = uprotocol::core::usubscription::v3::RpcClientUSubscription( + getClientTransport(), getUSubscriptionOptions()); + + const auto notifications_request = + RequestBuilder::buildNotificationsRequest(getSubscriptionTopic()); + + auto response_or_status_future = + client.register_for_notifications(notifications_request); + + // wait to give the client time to send the request. Otherwise this would + // cause a race condition + int counter = ITERATIONS_TILL_TIMEOUT; + while (counter > 0 && getClientTransport()->getSendCount() == 0) { + counter--; + std::this_thread::sleep_for(MILLISECONDS_PER_ITERATION); + } + ASSERT_EQ(getClientTransport()->getSendCount(), 1); + EXPECT_TRUE(getClientTransport()->getListener()); + + (*getServerTransport()->getListener())(getClientTransport()->getMessage()); + EXPECT_TRUE(server_callback_executed); + EXPECT_EQ(server_capture.SerializeAsString(), + notifications_request.SerializeAsString()); + + getClientTransport()->mockMessage(getServerTransport()->getMessage()); + EXPECT_TRUE(getClientTransport()->getListener()); + EXPECT_EQ(getClientTransport()->getSendCount(), 1); + auto response_or_status = response_or_status_future.get(); + ASSERT_TRUE(response_or_status.has_value()); + EXPECT_EQ(response_or_status.value().SerializeAsString(), + server_response.SerializeAsString()); +} + +//////////////////////////////// +// Tests for unregister_for_notification method// +//////////////////////////////// + +using NotificationsRequest = + uprotocol::core::usubscription::v3::NotificationsRequest; +using NotificationsResponse = + uprotocol::core::usubscription::v3::NotificationsResponse; + +TEST_F(RpcClientUSubscriptionTest, // NOLINT + unregisterNotificationRoundtripWithValidProtoPayload) { + bool server_callback_executed = false; + NotificationsRequest server_capture; + NotificationsResponse server_response; + auto server_or_status = uprotocol::communication::RpcServer::create( + getServerTransport(), getServerMethodUuri(), + [&server_callback_executed, &server_capture, + &server_response](const UMessage& message) -> std::optional { + server_callback_executed = true; + auto request_or_status = + ProtoConverter::extractFromProtobuf( + message); + if (!request_or_status.has_value()) { + return std::nullopt; + } + server_capture = request_or_status.value(); + Payload response_payload(server_response); + return response_payload; + }, + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF); + + ASSERT_TRUE(server_or_status.has_value()); + ASSERT_NE(server_or_status.value(), nullptr); + EXPECT_TRUE(getServerTransport()->getListener()); + + auto client = uprotocol::core::usubscription::v3::RpcClientUSubscription( + getClientTransport(), getUSubscriptionOptions()); + + const auto notifications_request = + RequestBuilder::buildNotificationsRequest(getSubscriptionTopic()); + + auto response_or_status_future = + client.unregister_for_notifications(notifications_request); + + // wait to give the client time to send the request. Otherwise this would + // cause a race condition + int counter = ITERATIONS_TILL_TIMEOUT; + while (counter > 0 && getClientTransport()->getSendCount() == 0) { + counter--; + std::this_thread::sleep_for(MILLISECONDS_PER_ITERATION); + } + ASSERT_EQ(getClientTransport()->getSendCount(), 1); + EXPECT_TRUE(getClientTransport()->getListener()); + + (*getServerTransport()->getListener())(getClientTransport()->getMessage()); + EXPECT_TRUE(server_callback_executed); + EXPECT_EQ(server_capture.SerializeAsString(), + notifications_request.SerializeAsString()); + + getClientTransport()->mockMessage(getServerTransport()->getMessage()); + EXPECT_TRUE(getClientTransport()->getListener()); + EXPECT_EQ(getClientTransport()->getSendCount(), 1); + auto response_or_status = response_or_status_future.get(); + if (!response_or_status.has_value()) { + std::cout << response_or_status.error().DebugString() << std::endl; + } + ASSERT_TRUE(response_or_status.has_value()); + EXPECT_EQ(response_or_status.value().SerializeAsString(), + server_response.SerializeAsString()); +} + +TEST_F(RpcClientUSubscriptionTest, // NOLINT + unregisterNotificationRoundtripWithValidProtoAnyPayload) { + bool server_callback_executed = false; + NotificationsRequest server_capture; + NotificationsResponse server_response; + auto server_or_status = uprotocol::communication::RpcServer::create( + getServerTransport(), getServerMethodUuri(), + [&server_callback_executed, &server_capture, + &server_response](const UMessage& message) -> std::optional { + server_callback_executed = true; + auto request_or_status = + ProtoConverter::extractFromProtobuf( + message); + if (!request_or_status.has_value()) { + return std::nullopt; + } + server_capture = request_or_status.value(); + google::protobuf::Any any; + if (!any.PackFrom(server_response)) { + return std::nullopt; + } + Payload response_payload(any); + return response_payload; + }, + uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_PROTOBUF_WRAPPED_IN_ANY); + + ASSERT_TRUE(server_or_status.has_value()); + ASSERT_NE(server_or_status.value(), nullptr); + EXPECT_TRUE(getServerTransport()->getListener()); + + auto test = getUSubscriptionOptions().authority_name; + std::cout << "the authority name is " << test << std::endl; + auto client = uprotocol::core::usubscription::v3::RpcClientUSubscription( + getClientTransport(), getUSubscriptionOptions()); + + const auto notifications_request = + RequestBuilder::buildNotificationsRequest(getSubscriptionTopic()); + + auto response_or_status_future = + client.unregister_for_notifications(notifications_request); + + // wait to give the client time to send the request. Otherwise this would + // cause a race condition + int counter = ITERATIONS_TILL_TIMEOUT; + while (counter > 0 && getClientTransport()->getSendCount() == 0) { + counter--; + std::this_thread::sleep_for(MILLISECONDS_PER_ITERATION); + } + ASSERT_EQ(getClientTransport()->getSendCount(), 1); + EXPECT_TRUE(getClientTransport()->getListener()); + + (*getServerTransport()->getListener())(getClientTransport()->getMessage()); + EXPECT_TRUE(server_callback_executed); + EXPECT_EQ(server_capture.SerializeAsString(), + notifications_request.SerializeAsString()); + + getClientTransport()->mockMessage(getServerTransport()->getMessage()); + EXPECT_TRUE(getClientTransport()->getListener()); + EXPECT_EQ(getClientTransport()->getSendCount(), 1); + auto response_or_status = response_or_status_future.get(); + ASSERT_TRUE(response_or_status.has_value()); + EXPECT_EQ(response_or_status.value().SerializeAsString(), + server_response.SerializeAsString()); +} + +}; // namespace diff --git a/test/coverage/client/usubscription/v3/USubscriptionUUriBuilderTest.cpp b/test/coverage/client/usubscription/v3/USubscriptionUUriBuilderTest.cpp new file mode 100644 index 000000000..2cd17be40 --- /dev/null +++ b/test/coverage/client/usubscription/v3/USubscriptionUUriBuilderTest.cpp @@ -0,0 +1,68 @@ +// SPDX-FileCopyrightText: 2025 Contributors to the Eclipse Foundation +// +// See the NOTICE file(s) distributed with this work for additional +// information regarding copyright ownership. +// +// This program and the accompanying materials are made available under the +// terms of the Apache License Version 2.0 which is available at +// https://www.apache.org/licenses/LICENSE-2.0 +// +// SPDX-License-Identifier: Apache-2.0 + +#include +#include + +#include "up-cpp/client/usubscription/v3/USubscriptionUUriBuilder.h" + +constexpr uint16_t RESOURCE_ID_TEST = 0x0001; +constexpr uint16_t RESOURCE_ID_NOTIFICATION_ID = 0x8000; +// test ue id with instance id 1 and service id 2 +constexpr uint32_t TEST_UE_ID = 0x00010002; + +namespace uprotocol::core::usubscription::v3 { +class USubscriptionUUriBuilderTest : public ::testing::Test { +private: + v1::UUri expected_uri_; + +protected: + v1::UUri getExpectedUri() const { return expected_uri_; } + + void SetUp() override { + expected_uri_.set_authority_name("core.usubscription"); + expected_uri_.set_ue_id(TEST_UE_ID); + expected_uri_.set_ue_version_major(3); + } + + void TearDown() override {} +}; + +TEST_F(USubscriptionUUriBuilderTest, GetServiceUriWithResourceId) { // NOLINT + // Example test case for building a subscription UUri + auto expected_uri = getExpectedUri(); + expected_uri.set_resource_id(RESOURCE_ID_TEST); + USubscriptionUUriBuilder builder; + builder.setAuthorityName("core.usubscription") + .setInstanceId(1) + .setServiceId(2); + const v1::UUri actual_uri = + builder.getServiceUriWithResourceId(RESOURCE_ID_TEST); + + EXPECT_TRUE(actual_uri.IsInitialized()); + EXPECT_EQ(actual_uri.GetTypeName(), "uprotocol.v1.UUri"); + EXPECT_EQ(actual_uri.SerializeAsString(), expected_uri.SerializeAsString()); +} + +TEST_F(USubscriptionUUriBuilderTest, GetNotificationUri) { // NOLINT + auto expected_uri = getExpectedUri(); + expected_uri.set_resource_id(RESOURCE_ID_NOTIFICATION_ID); + USubscriptionUUriBuilder builder; + builder.setAuthorityName("core.usubscription") + .setInstanceId(1) + .setServiceId(2); + v1::UUri actual_uri = builder.getNotificationUri(); + EXPECT_TRUE(actual_uri.IsInitialized()); + EXPECT_EQ(actual_uri.GetTypeName(), "uprotocol.v1.UUri"); + EXPECT_EQ(actual_uri.SerializeAsString(), expected_uri.SerializeAsString()); +} + +} // namespace uprotocol::core::usubscription::v3