Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions include/up-cpp/communication/RpcClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ struct RpcClient {
/// For guidance on the permeission_level and token parameters, see:
/// https://github.com/eclipse-uprotocol/up-spec/blob/main/basics/permissions.adoc
explicit RpcClient(std::shared_ptr<transport::UTransport> transport,
v1::UUri&& method, v1::UPriority priority,
std::chrono::milliseconds ttl,
v1::UPriority priority, std::chrono::milliseconds ttl,
std::optional<v1::UPayloadFormat> payload_format = {},
std::optional<uint32_t> permission_level = {},
std::optional<std::string> token = {});
Expand Down Expand Up @@ -100,6 +99,7 @@ struct RpcClient {

/// @brief Invokes an RPC method by sending a request message.
///
/// @param The method that will be invoked
/// @param A Payload builder containing the payload to be sent with the
/// request.
/// @param A callback that will be called with the result.
Expand All @@ -111,11 +111,13 @@ struct RpcClient {
/// * A UStatus based on the commstatus received in the response
/// message (if not OK).
/// * A UMessage containing the response from the RPC target.
[[nodiscard]] InvokeHandle invokeMethod(datamodel::builder::Payload&&,
[[nodiscard]] InvokeHandle invokeMethod(const v1::UUri&,
datamodel::builder::Payload&&,
Callback&&);

/// @brief Invokes an RPC method by sending a request message.
///
/// @param The method that will be invoked
/// @param A Payload builder containing the payload to be sent with the
/// request.
///
Expand All @@ -128,13 +130,15 @@ struct RpcClient {
/// * A UStatus based on the commstatus received in the response
/// message (if not OK).
/// * A UMessage containing the response from the RPC target.
[[nodiscard]] InvokeFuture invokeMethod(datamodel::builder::Payload&&);
[[nodiscard]] InvokeFuture invokeMethod(const v1::UUri&,
datamodel::builder::Payload&&);

/// @brief Invokes an RPC method by sending a request message.
///
/// Request is sent with an empty payload. Can only be called if no payload
/// format was provided at construction time.
///
/// @param The method that will be invoked
/// @param A callback that will be called with the result.
///
/// @post The provided callback will be called with one of:
Expand All @@ -144,13 +148,14 @@ struct RpcClient {
/// * A UStatus based on the commstatus received in the response
/// message (if not OK).
/// * A UMessage containing the response from the RPC target.
[[nodiscard]] InvokeHandle invokeMethod(Callback&&);
[[nodiscard]] InvokeHandle invokeMethod(const v1::UUri&, Callback&&);

/// @brief Invokes an RPC method by sending a request message.
///
/// Request is sent with an empty payload. Can only be called if no payload
/// format was provided at construction time.
///
/// @param The method that will be invoked
/// @remarks This is a wrapper around the callback form of invokeMethod.
///
/// @returns A promised future that can resolve to one of:
Expand All @@ -160,7 +165,7 @@ struct RpcClient {
/// * A UStatus based on the commstatus received in the response
/// message (if not OK).
/// * A UMessage containing the response from the RPC target.
[[nodiscard]] InvokeFuture invokeMethod();
[[nodiscard]] InvokeFuture invokeMethod(const v1::UUri&);

/// @brief Default move constructor (defined in RpcClient.cpp)
RpcClient(RpcClient&&) noexcept;
Expand Down
15 changes: 15 additions & 0 deletions include/up-cpp/datamodel/builder/UMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,13 @@
#include <string>
#include <vector>

#include "up-cpp/communication/NotificationSink.h"
#include "up-cpp/datamodel/builder/Uuid.h"

namespace uprotocol::communication {
struct RpcClient;
}

namespace uprotocol::datamodel::builder {

/// @brief Interface for composing UMessage objects
Expand All @@ -38,6 +43,7 @@ namespace uprotocol::datamodel::builder {
/// instance can be held and reused by calling .build(payload)
/// for each new set of message data.
struct UMessageBuilder {
friend struct communication::RpcClient;
/// @brief Pre-populates a message builder with the attributes of a
/// "publish" type message.
///
Expand Down Expand Up @@ -103,6 +109,15 @@ struct UMessageBuilder {
/// @returns UMessageBuilder configured to build a "response" message
static UMessageBuilder response(const v1::UMessage& request);

/// @brief Set the method attribute for built messages.
///
/// @param The method to use when building messages.
///
/// @throws std::out_of_range if the value is not a valid method UUri
///
/// @returns A reference to this UMessageBuilder
UMessageBuilder& withMethod(const v1::UUri&);

/// @brief Set the message priority attribute for built messages.
///
/// If not called, the default value as specified in
Expand Down
16 changes: 8 additions & 8 deletions src/client/usubscription/v3/Consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ v1::UStatus Consumer::subscribe(
v1::UPriority priority, std::chrono::milliseconds subscription_request_ttl,
ListenCallback&& callback) {
rpc_client_ = std::make_unique<communication::RpcClient>(
transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(1),
priority, subscription_request_ttl);
transport_, priority, subscription_request_ttl);

auto on_response = [this](const auto& maybe_response) {
if (maybe_response.has_value() &&
Expand All @@ -110,8 +109,9 @@ v1::UStatus Consumer::subscribe(
SubscriptionRequest const subscription_request = buildSubscriptionRequest();
auto payload = datamodel::builder::Payload(subscription_request);

rpc_handle_ =
rpc_client_->invokeMethod(std::move(payload), std::move(on_response));
rpc_handle_ = rpc_client_->invokeMethod(
uSubscriptionUUriBuilder_.getServiceUriWithResourceId(1),
std::move(payload), std::move(on_response));

// Create a L2 subscription
auto result = communication::Subscriber::subscribe(
Expand All @@ -135,8 +135,7 @@ UnsubscribeRequest Consumer::buildUnsubscriptionRequest() {
void Consumer::unsubscribe(v1::UPriority priority,
std::chrono::milliseconds request_ttl) {
rpc_client_ = std::make_unique<communication::RpcClient>(
transport_, uSubscriptionUUriBuilder_.getServiceUriWithResourceId(2),
priority, request_ttl);
transport_, priority, request_ttl);

auto on_response = [](const auto& maybe_response) {
if (!maybe_response.has_value()) {
Expand All @@ -147,8 +146,9 @@ void Consumer::unsubscribe(v1::UPriority priority,
UnsubscribeRequest const unsubscribe_request = buildUnsubscriptionRequest();
auto payload = datamodel::builder::Payload(unsubscribe_request);

rpc_handle_ =
rpc_client_->invokeMethod(std::move(payload), std::move(on_response));
rpc_handle_ = rpc_client_->invokeMethod(
uSubscriptionUUriBuilder_.getServiceUriWithResourceId(2),
std::move(payload), std::move(on_response));

subscriber_.reset();
}
Expand Down
33 changes: 19 additions & 14 deletions src/communication/RpcClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,19 @@ struct RpcClient::ExpireService {

////////////////////////////////////////////////////////////////////////////////
RpcClient::RpcClient(std::shared_ptr<transport::UTransport> transport,
v1::UUri&& method, v1::UPriority priority,
std::chrono::milliseconds ttl,
v1::UPriority priority, std::chrono::milliseconds ttl,
std::optional<v1::UPayloadFormat> payload_format,
std::optional<uint32_t> permission_level,
std::optional<std::string> token)
: transport_(std::move(transport)),
ttl_(ttl),
builder_(datamodel::builder::UMessageBuilder::request(
std::move(method), v1::UUri(transport_->getEntityUri()), priority,
ttl_)),
builder_(datamodel::builder::UMessageBuilder(
v1::UMESSAGE_TYPE_REQUEST, v1::UUri(transport_->getEntityUri()),
v1::UUri{})),
expire_service_(std::make_unique<ExpireService>()) {
builder_.withPriority(priority);
builder_.withTtl(ttl);

if (payload_format) {
builder_.withPayloadFormat(*payload_format);
}
Expand Down Expand Up @@ -204,41 +206,44 @@ RpcClient::InvokeHandle RpcClient::invokeMethod(v1::UMessage&& request,
}

RpcClient::InvokeHandle RpcClient::invokeMethod(
datamodel::builder::Payload&& payload, Callback&& callback) {
return invokeMethod(builder_.build(std::move(payload)),
const v1::UUri& method, datamodel::builder::Payload&& payload,
Callback&& callback) {
return invokeMethod(builder_.withMethod(method).build(std::move(payload)),
std::move(callback));
}

RpcClient::InvokeHandle RpcClient::invokeMethod(Callback&& callback) {
return invokeMethod(builder_.build(), std::move(callback));
RpcClient::InvokeHandle RpcClient::invokeMethod(const v1::UUri& method,
Callback&& callback) {
return invokeMethod(builder_.withMethod(method).build(),
std::move(callback));
}

RpcClient::InvokeFuture RpcClient::invokeMethod(
datamodel::builder::Payload&& payload) {
const v1::UUri& method, datamodel::builder::Payload&& payload) {
// Note: functors need to be copy constructable. We work around this by
// wrapping the promise in a shared_ptr. Unique access to it will be
// assured by the implementation at the core of invokeMethod - it only
// allows exactly one call to the callback via std::call_once.
auto promise = std::make_shared<std::promise<MessageOrStatus>>();
auto future = promise->get_future();
auto handle =
invokeMethod(std::move(payload),
invokeMethod(method, std::move(payload),
[promise](const MessageOrStatus& maybe_message) mutable {
promise->set_value(maybe_message);
});

return {std::move(future), std::move(handle)};
}

RpcClient::InvokeFuture RpcClient::invokeMethod() {
RpcClient::InvokeFuture RpcClient::invokeMethod(const v1::UUri& method) {
// Note: functors need to be copy constructable. We work around this by
// wrapping the promise in a shared_ptr. Unique access to it will be
// assured by the implementation at the core of invokeMethod - it only
// allows exactly one call to the callback via std::call_once.
auto promise = std::make_shared<std::promise<MessageOrStatus>>();
auto future = promise->get_future();
auto handle =
invokeMethod([promise](const MessageOrStatus& maybe_message) mutable {
auto handle = invokeMethod(
method, [promise](const MessageOrStatus& maybe_message) mutable {
promise->set_value(maybe_message);
});

Expand Down
12 changes: 12 additions & 0 deletions src/datamodel/builder/UMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ UMessageBuilder& UMessageBuilder::withPriority(v1::UPriority priority) {
return *this;
}

UMessageBuilder& UMessageBuilder::withMethod(const v1::UUri& method) {
auto [isValid, reason] = validator::uri::isValidRpcMethod(method);
if (!isValid) {
throw std::invalid_argument(
"The UUri provided is not a valid RpcMethod");
}

*attributes_.mutable_sink() = method;

return *this;
}

UMessageBuilder& UMessageBuilder::withTtl(std::chrono::milliseconds ttl) {
if ((ttl.count() <= 0) ||
(ttl.count() > std::numeric_limits<uint32_t>::max())) {
Expand Down
Loading
Loading