Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,15 @@ endif()

file(GLOB_RECURSE SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/src/*.cpp")

add_library(${PROJECT_NAME} ${SRC_FILES})
add_library(${PROJECT_NAME} ${SRC_FILES}
src/core/usubscription/usubscription.cpp
include/up-cpp/core/usubscription/usubscription.h
src/core/usubscription/subscription_manager.cpp
include/up-cpp/core/usubscription/subscription_manager.h
src/core/usubscription/handlers/subscribe.cpp
include/up-cpp/core/usubscription/handlers/subscribe.h
src/core/usubscription/configuration.cpp
include/up-cpp/core/usubscription/configuration.h)
add_library(up-cpp::${PROJECT_NAME} ALIAS ${PROJECT_NAME})

target_include_directories(${PROJECT_NAME}
Expand Down
22 changes: 22 additions & 0 deletions include/up-cpp/core/usubscription/configuration.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
//
// Created by max on 23.04.25.
//

#ifndef CONFIGURATION_H
#define CONFIGURATION_H
#include <string>

namespace uprotocol::core::usubscription::v3 {
struct USubscriptionConfiguration {

USubscriptionConfiguration create(std::string, size_t, size_t);

private:
std::string authority_name;
size_t subscription_command_buffer_size;
size_t notification_command_buffer_size;
};


} // namespace uprotocol::core::usubscription::v3
#endif //CONFIGURATION_H
17 changes: 17 additions & 0 deletions include/up-cpp/core/usubscription/handlers/subscribe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#ifndef SUBSCRIBE_H
#define SUBSCRIBE_H
#include <optional>
#include "up-cpp/communication/NotificationSink.h"
#include "up-cpp/datamodel/builder/Payload.h"
#include "uprotocol/v1/umessage.pb.h"
#include "../subscription_manager.h"

// std::function<std::optional<datamodel::builder::Payload>(
// const v1::UMessage&)>;
namespace uprotocol::core::usubscription::handlers{

utils::Expected<datamodel::builder::Payload, v1::UStatus> subscribe(const v1::UMessage& message);

} // namespace uprotocol::core::usubscription::handlers

#endif //SUBSCRIBE_H
29 changes: 29 additions & 0 deletions include/up-cpp/core/usubscription/subscription_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#ifndef SUBSCRIPTION_MANAGER_H
#define SUBSCRIPTION_MANAGER_H

#include <up-cpp/transport/UTransport.h>
#include <uprotocol/core/usubscription/v3/usubscription.pb.h>
#include "configuration.h"

namespace uprotocol::core::usubscription::v3 {

// Define events
struct AddSubscription {
v1::UUri subscriber;
v1::UUri topic;
// Sender<int> respond_to;
};

struct RemoveSubscription {
v1::UUri subscriber;
v1::UUri topic;
// Sender<int> respond_to;
};

using SubscriptionEvent = std::variant<AddSubscription, RemoveSubscription>;

void handle_message(const USubscriptionConfiguration& configuration, const transport::UTransport& transport, const SubscriptionEvent& command_receiver);

} // namespace uprotocol::core::usubscription::v3

#endif //SUBSCRIPTION_MANAGER_H
30 changes: 30 additions & 0 deletions include/up-cpp/core/usubscription/usubscription.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Created by max on 23.04.25.
//

#ifndef USUBSCRIPTION_H
#define USUBSCRIPTION_H

#include <up-cpp/transport/UTransport.h>
#include <uprotocol/core/usubscription/v3/usubscription.pb.h>

#include "up-cpp/communication/RpcServer.h"

namespace uprotocol::core::usubscription::v3 {

struct USubscriptionStopper;

struct USubscriptionConfig;

struct USubscriptionService : communication::RpcServer{

explicit USubscriptionService(std::shared_ptr<transport::UTransport> transport, USubscriptionConfig config);

//TODO(max) make async
utils::Expected<USubscriptionStopper,v1::UStatus> run();
};


} // namespace uprotocol::core::usubscription::v3

#endif //USUBSCRIPTION_H
2 changes: 1 addition & 1 deletion lint/clang-tidy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ if [ -z "$target_source" ]; then
shopt -s globstar

pushd "$PROJECT_ROOT" > /dev/null
for f in include/**/*.h src/**/*.cpp; do # test/coverage/**/*.cpp test/extra/**/*.cpp test/include/**/*.h; do
for f in include/**/*.h src/**/*.cpp test/coverage/**/*.cpp test/extra/**/*.cpp test/include/**/*.h; do
if [[ ! ("$f" =~ "build/") ]]; then
echo
echo "Checking file '$f'"
Expand Down
1 change: 0 additions & 1 deletion src/communication/RpcClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ namespace detail {

using uprotocol::v1::UCode;
using uprotocol::v1::UStatus;
// using namespace std::chrono_literals;
using ListenHandle = uprotocol::transport::UTransport::ListenHandle;

auto PendingRequest::operator>(const PendingRequest& other) const {
Expand Down
5 changes: 5 additions & 0 deletions src/core/usubscription/configuration.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//
// Created by max on 23.04.25.
//

#include "../../../include/up-cpp/core/usubscription/configuration.h"
110 changes: 110 additions & 0 deletions src/core/usubscription/handlers/subscribe.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "../../../../include/up-cpp/core/usubscription/handlers/subscribe.h"
#include <uprotocol/v1/ustatus.pb.h>
#include "up-cpp/datamodel/builder/Payload.h"


// Exception Type for ServiceInvocationError
class ServiceInvocationError : public std::runtime_error {
public:
explicit ServiceInvocationError(const std::string& message) : std::runtime_error(message) {}

static ServiceInvocationError InvalidArgument(const std::string& message) {
return ServiceInvocationError("InvalidArgument: " + message);
}
};


// From helpers.rs
// template <typename T>
std::pair<uprotocol::datamodel::builder::Payload, uprotocol::v1::UUri> extract_inputs(
uint16_t expected_resource_id,
uint16_t resource_id,
const uprotocol::v1::UMessage& message) {

auto payload = message.has_payload() ? std::make_optional(message.payload()) : std::nullopt;
auto message_attributes = message.has_attributes() ? std::optional<uprotocol::v1::UAttributes>(message.attributes()) : std::nullopt;

if (resource_id != expected_resource_id) {
throw ServiceInvocationError::InvalidArgument(
"Wrong resource ID (expected " + std::to_string(expected_resource_id) + ", got " + std::to_string(resource_id) + ")");
}

if (!payload) {
throw ServiceInvocationError::InvalidArgument("No request payload");
}

std::optional<uprotocol::datamodel::builder::Payload> request;
try {
// TODO(lennart)
// request = payload.extract_protobuf()
} catch (const std::exception& e) {
throw ServiceInvocationError::InvalidArgument(
"Expected NotificationsRequest payload, error when unpacking: " + std::string(e.what()));
}

const auto& source = message_attributes->source();
if (!message_attributes) {
throw ServiceInvocationError::InvalidArgument("No request source uri");
}

// TODO(lennart) get corresponding request Payload Constructor, in Rust MessageFull is used, that originates from: request = payload.extract_protobuf()...
return {uprotocol::datamodel::builder::Payload(request), uprotocol::v1::UUri(source)};

}


namespace uprotocol::core::usubscription::handlers {

utils::Expected<datamodel::builder::Payload, v1::UStatus> subscribe(const v1::UMessage& message) {
uint16_t expected_resource_id = 0;
uint16_t received_resource_id = 0;

auto extracted_data = extract_inputs(expected_resource_id, received_resource_id, message);
auto subscription_request = extracted_data.first;
auto source = extracted_data.second;

// TODO(lennart)
// if (!subscription_request.topic) {
// throw ServiceInvocationError::InvalidArgument("No topic defined in request");
// }

// Interact with subscription manager backend
// let (respond_to, receive_from) = oneshot::channel::<SubscriptionStatus>();
v3::SubscriptionEvent se = v3::AddSubscription{
source,
source // should be topic, please change as soon it exists
// Missing respond_to
};

try {
// Send subscription with se from v3::SubscriptionEvent with subscription_sender from struct SubscriptionRequestHandler
} catch (const std::exception& e) {
std::cerr << "Error communicating with subscription manager: " << e.what() << std::endl;
return ServiceInvocationError("Error processing request"); // UStatus zurückgeben
}
try {
// Receive asynchronos response, retrieve subscription status (UStatus?)
} catch (const std::exception& e) {
std::cerr << "Error processing request: " << e.what() << std::endl;
return ServiceInvocationError("Error processing request");
}

// Notify update channel
// -> NotificationEvent StateChange/ Use notification_sender from struct SubscriptionRequestHandler

// Build and return result
std::string test_payload_str = "test_payload";
std::optional<uprotocol::datamodel::builder::Payload> response_payload;
try {
uprotocol::datamodel::builder::Payload response_payload(
test_payload_str,
uprotocol::v1::UPayloadFormat::UPAYLOAD_FORMAT_TEXT);
} catch (const std::exception& e) {
std::cerr << "Error building response payload: " << e.what() << std::endl;
return ServiceInvocationError("Error building response payload");
}

return response_payload;
};

} // namespace uprotocol::core::usubscription::handlers
10 changes: 10 additions & 0 deletions src/core/usubscription/subscription_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include "../../../include/up-cpp/core/usubscription/subscription_manager.h"


namespace uprotocol::core::usubscription::v3 {

void handle_message(const USubscriptionConfiguration& configuration, const transport::UTransport& transport, const SubscriptionEvent& command_receiver){

};

}// uprotocol::core::usubscription::v3
15 changes: 15 additions & 0 deletions src/core/usubscription/usubscription.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//
// Created by max on 23.04.25.
//

#include "../../../include/up-cpp/core/usubscription/usubscription.h"

#include "up-cpp/transport/UTransport.h"

namespace uprotocol::core::usubscription::v3 {

utils::Expected<USubscriptionStopper,v1::UStatus> run() {

};

}// uprotocol::core::usubscription::v3
1 change: 0 additions & 1 deletion src/datamodel/validator/UMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
namespace uprotocol::datamodel::validator::message {

using uprotocol::v1::UPRIORITY_CS4;
// using uprotocol::datamodel::validator;

std::string_view message(Reason reason) {
switch (reason) {
Expand Down
Loading
Loading