From adc1c317855af645682eaa2b50ffec5c475ff078 Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Fri, 2 Jan 2026 16:05:25 +0100 Subject: [PATCH 01/14] feat(node-gateway): Add gateway node type --- CMakeLists.txt | 3 + include/villas/api/requests/gateway.hpp | 34 ++ include/villas/nodes/gateway.hpp | 57 +++ lib/api/CMakeLists.txt | 14 + lib/api/requests/gateway/grpc.cpp | 470 ++++++++++++++++++++++++ lib/nodes/CMakeLists.txt | 1 + lib/nodes/gateway.cpp | 129 +++++++ 7 files changed, 708 insertions(+) create mode 100644 include/villas/api/requests/gateway.hpp create mode 100644 include/villas/nodes/gateway.hpp create mode 100644 lib/api/requests/gateway/grpc.cpp create mode 100644 lib/nodes/gateway.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index ff81c5d46..14049a275 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -80,6 +80,8 @@ find_package(Criterion) find_package(OpalOrchestra) find_package(LibXml2) find_package(OpalAsyncApi) +find_package(Protobuf) +find_package(gRPC) # Check for tools find_program(PROTOBUFC_COMPILER NAMES protoc-c) @@ -214,6 +216,7 @@ cmake_dependent_option(WITH_NODE_ULDAQ "Build with uldaq node-type" cmake_dependent_option(WITH_NODE_WEBRTC "Build with webrtc node-type" "${WITH_DEFAULTS}" "WITH_WEB; LibDataChannel_FOUND" OFF) cmake_dependent_option(WITH_NODE_WEBSOCKET "Build with websocket node-type" "${WITH_DEFAULTS}" "WITH_WEB" OFF) cmake_dependent_option(WITH_NODE_ZEROMQ "Build with zeromq node-type" "${WITH_DEFAULTS}" "LIBZMQ_FOUND; NOT WITHOUT_GPL" OFF) +cmake_dependent_option(WITH_GRPC "Build with grpc api" "${WITH_DEFAULTS}" "gRPC_FOUND" OFF) # Set a default for the build type if("${CMAKE_BUILD_TYPE}" STREQUAL "") diff --git a/include/villas/api/requests/gateway.hpp b/include/villas/api/requests/gateway.hpp new file mode 100644 index 000000000..b3f74af25 --- /dev/null +++ b/include/villas/api/requests/gateway.hpp @@ -0,0 +1,34 @@ +/* Universal Data-exchange API request. + * + * Author: Steffen Vogel + * SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include + +namespace villas { +namespace node { + +// Forward declarations +class Node; + +namespace api { + +class GatewayRequest : public NodeRequest { + +protected: + GatewayNode *gateway_node; + +public: + using NodeRequest::NodeRequest; + + void prepare() override; +}; + +} // namespace api +} // namespace node +} // namespace villas diff --git a/include/villas/nodes/gateway.hpp b/include/villas/nodes/gateway.hpp new file mode 100644 index 000000000..48a4f0daa --- /dev/null +++ b/include/villas/nodes/gateway.hpp @@ -0,0 +1,57 @@ +#pragma once + +#include + +#include +#include + +namespace villas { +namespace node { + +// Forward declarations +struct Sample; + +class GatewayNode : public Node { +protected: + int parse(json_t *json) override; + + int _read(struct Sample *smps[], unsigned cnt) override; + int _write(struct Sample *smps[], unsigned cnt) override; + +public: + GatewayNode(const uuid_t &id = {}, const std::string &name = ""); + enum ApiType { gRPC }; + + struct Direction { + Sample *sample; + pthread_cond_t cv; + pthread_mutex_t mutex; + char* buf; + size_t buflen; + size_t wbytes; + }; + + + Direction read, write; + std::string address; + ApiType type; + + Format *formatter; + + char *buf; + size_t buflen; + size_t wbytes; + + int prepare() override; + + int check() override; + + int start() override; + + int stop() override; + + ~GatewayNode(); +}; + +} // namespace node +} // namespace villas diff --git a/lib/api/CMakeLists.txt b/lib/api/CMakeLists.txt index 70974bdeb..9f5fa73ee 100644 --- a/lib/api/CMakeLists.txt +++ b/lib/api/CMakeLists.txt @@ -42,6 +42,20 @@ if(WITH_GRAPHVIZ) list(APPEND LIBRARIES PkgConfig::CGRAPH PkgConfig::GVC) endif() +if (WITH_GRPC) + find_path(REFLECTION_INCLUDE_DIR + NAMES grpc/reflection/v1alpha/reflection.pb.cc + ) + if (REFLECTION_INCLUDE_DIR) + list(APPEND API_SRC + requests/gateway/grpc.cpp + ${REFLECTION_INCLUDE_DIR}/grpc/reflection/v1alpha/reflection.pb.cc + ${REFLECTION_INCLUDE_DIR}/grpc/reflection/v1alpha/reflection.grpc.pb.cc + ) + list(APPEND LIBRARIES protobuf::libprotobuf gRPC::grpc++) + endif() +endif() + add_library(api STATIC ${API_SRC}) target_include_directories(api PUBLIC ${INCLUDE_DIRS}) target_link_libraries(api PUBLIC ${LIBRARIES}) diff --git a/lib/api/requests/gateway/grpc.cpp b/lib/api/requests/gateway/grpc.cpp new file mode 100644 index 000000000..589c5b7bf --- /dev/null +++ b/lib/api/requests/gateway/grpc.cpp @@ -0,0 +1,470 @@ +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +using namespace google::protobuf; + +class ReflectionClient { +public: + ReflectionClient(std::shared_ptr channel) : stub_(grpc::reflection::v1alpha::ServerReflection::NewStub(channel)) {} + google::protobuf::FileDescriptorSet *GetFileDescriptor(const std::string& symbol) { + grpc::ClientContext context; + auto stream = stub_->ServerReflectionInfo(&context); + grpc::reflection::v1alpha::ServerReflectionRequest request; + request.set_file_containing_symbol(symbol); + bool streamstatus = stream->Write(request); + if (!streamstatus) + std::cout << "Server not allow reflection" << std::endl; + + grpc::reflection::v1alpha::ServerReflectionResponse response; + google::protobuf::FileDescriptorSet *file_descs = new google::protobuf::FileDescriptorSet; + if (stream->Read(&response)) { + if (response.has_file_descriptor_response()) { + for (const auto& bytes : response.file_descriptor_response().file_descriptor_proto()) { + google::protobuf::FileDescriptorProto *file_desc = file_descs->add_file(); + file_desc->ParseFromString(bytes); + } + } else { + return nullptr; + } + } else { + std::cerr << "Reflection failed" << std::endl; + return nullptr; + } + stream->WritesDone(); + grpc::Status status = stream->Finish(); + if (!status.ok()) { + std::cerr << "Reflection failed: " << status.error_message() << std::endl; + } + return file_descs; + }; + +private: + std::unique_ptr stub_; +}; + +namespace villas { +namespace node { +namespace api { + +class grpcRequest : public NodeRequest { +public: + using NodeRequest::NodeRequest; + // using GatewayRequest::GatewayRequest; + + Response *execute() override { + gateway_node = dynamic_cast(node); + + switch (method) { + case Session::Method::GET: + return executeGet(); + case Session::Method::PUT: + case Session::Method::POST: + return executePost(); + default: + throw Error::invalidMethod(this); + } + } + + Response *executeGet() { + std::string address = gateway_node->address; + GatewayNode::ApiType api_type = gateway_node->type; + if (api_type != GatewayNode::ApiType::gRPC) + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "Api type unavailable"); + + auto gRPC_package = matches[2]; + auto gRPC_service = matches[3]; + auto gRPC_method = matches[4]; + std::string methodFullName = "/" + gRPC_package + "." + gRPC_service + "/" + gRPC_method; + + // Form request + grpc::Slice slice; + pthread_mutex_lock(&gateway_node->write.mutex); + slice = grpc::Slice(gateway_node->write.buf, gateway_node->write.wbytes); + pthread_mutex_unlock(&gateway_node->write.mutex); + grpc::ByteBuffer req_buf(&slice, 1); + + // gRPC call + auto channel = grpc::CreateChannel(address, grpc::InsecureChannelCredentials()); + grpc::GenericStub generic_stub(channel); + grpc::ClientContext context; + grpc::CompletionQueue cq; + grpc::ByteBuffer resp_buf; + auto call = generic_stub.PrepareUnaryCall(&context, methodFullName, req_buf, &cq); + grpc::Status status; + call->StartCall(); + call->Finish(&resp_buf, &status, (void*)1); + void* got_tag; + bool ok = false; + cq.Next(&got_tag, &ok); + + if (!status.ok()) { + cq.Shutdown(); + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "Cannot connect to gRPC server"); + } + + // Parse response + std::string recv_data; + std::vector slices; + resp_buf.Dump(&slices); + for (const auto& s:slices) + recv_data.append(reinterpret_cast(s.begin()), s.size()); + + json_t *json_response; + json_response = parse_response(resp_buf, recv_data); + + cq.Shutdown(); + return new JsonResponse(session, HTTP_STATUS_OK, json_response); + } + + Response *executePost() { + // Create channel & stub + std::string address = gateway_node->address; + GatewayNode::ApiType api_type = gateway_node->type; + + if (api_type != GatewayNode::ApiType::gRPC) + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "Api type unavailable"); + + auto gRPC_package = matches[2]; + auto gRPC_service = matches[3]; + auto gRPC_method = matches[4]; + std::string methodFullName = "/" + gRPC_package + "." + gRPC_service + "/" + gRPC_method; + + auto channel = grpc::CreateChannel(address, grpc::InsecureChannelCredentials()); + ReflectionClient refl_client(channel); + + // Form request + // Request file descriptor from gRPC server + grpc::ByteBuffer req_buf; + FileDescriptorSet *protos = refl_client.GetFileDescriptor(gRPC_package + "." + gRPC_service); + bool server_reflection; + + // Check if server reflection complete + auto pool = std::make_unique(); + const Descriptor* resp_desc; + if (!protos) { + server_reflection = false; + req_buf = form_request(); + } else { + server_reflection = true; + const FileDescriptor* file; + for (int i = protos->file_size() - 1; i >= 0; i--) { + const FileDescriptorProto &fd = protos->file(i); + file = pool->BuildFile(fd); + } + const ServiceDescriptor* svc = file->FindServiceByName(gRPC_service); + if (!svc) { + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "gRPC service not found"); + } + const MethodDescriptor* method_desc = svc->FindMethodByName(gRPC_method); + if (!method_desc) { + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "gRPC method not found"); + } + const Descriptor* req_desc = method_desc->input_type(); + resp_desc = method_desc->output_type(); + if (req_desc->full_name() == "villas.node.Message") { + req_buf = form_request(); + } else { + req_buf = form_request_reflection(req_desc); + } + } + // gRPC all + grpc::GenericStub generic_stub(channel); + grpc::ClientContext context; + grpc::CompletionQueue cq; + grpc::ByteBuffer resp_buf; + auto call = generic_stub.PrepareUnaryCall(&context, methodFullName, req_buf, &cq); + grpc::Status status; + + call->StartCall(); + call->Finish(&resp_buf, &status, (void*)1); + void* got_tag; + bool ok = false; + cq.Next(&got_tag, &ok); + + if (!status.ok()) { + cq.Shutdown(); + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "Cannot connect to gRPC server"); + } + + // Parse response + std::string recv_data; + std::vector slices; + resp_buf.Dump(&slices); + for (const auto& s:slices) + recv_data.append(reinterpret_cast(s.begin()), s.size()); + + // Empty gRPC response + if (recv_data.length() <= 0) { + return new JsonResponse(session, HTTP_STATUS_OK, nullptr);; + } + + json_t *json_response; + if (!server_reflection) { + json_response = parse_response(resp_buf, recv_data); + } else { + if (resp_desc->full_name() == "villas.node.Message") { + json_response = parse_response(resp_buf, recv_data); + } else { + json_response = parse_response_reflection(resp_desc, recv_data); + } + } + cq.Shutdown(); + return new JsonResponse(session, HTTP_STATUS_OK, json_response); + } + + grpc::ByteBuffer form_request_reflection(const Descriptor* req_desc) { + grpc::Slice slice; + DynamicMessageFactory dym_factory; + const Message* req_prototype = dym_factory.GetPrototype(req_desc); + std::unique_ptr request(req_prototype->New()); + // Set field + const Reflection* refl = request->GetReflection(); + int field_count = req_desc->field_count(); + for (int i = 0; i < field_count; i++) { + if (const FieldDescriptor* f = req_desc->field(i)) { + json_t* json_val = json_object_get(body, f->name().c_str()); + if (!json_val) { + if (f->is_required()) { + std::cerr << "Missing required field: " << f->name() << std::endl; + } + continue; + } + if (json_val->type == json_type::JSON_ARRAY) { + if (f->is_repeated()) { + size_t ele_count = json_array_size(json_val); + for (size_t j = 0; j < ele_count; j++) { + json_t *json_array = json_array_get(json_val, j); + switch (f->type()) { + case grpc::protobuf::FieldDescriptor::TYPE_STRING: + refl->AddString(request.get(), f, json_string_value(json_array)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_INT32: + refl->AddInt32(request.get(), f, json_integer_value(json_array)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_INT64: + refl->AddInt64(request.get(), f, json_integer_value(json_array)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_DOUBLE: + refl->AddDouble(request.get(), f, json_real_value(json_array)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_BOOL: + refl->AddBool(request.get(), f, json_boolean_value(json_array)); + break; + default: + break; + } + } + } else { + std::cerr << "Json array in non repeat field not support" << std::endl; + continue; + } + } else { + switch (f->type()) { + case grpc::protobuf::FieldDescriptor::TYPE_STRING: + refl->SetString(request.get(), f, json_string_value(json_val)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_INT32: + refl->SetInt32(request.get(), f, json_integer_value(json_val)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_INT64: + refl->SetInt64(request.get(), f, json_integer_value(json_val)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_DOUBLE: + refl->SetDouble(request.get(), f, json_real_value(json_val)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_BOOL: + refl->SetBool(request.get(), f, json_boolean_value(json_val)); + break; + default: + break; + } + } + } + } + // Serialize + std::string serialized; + request->SerializeToString(&serialized); + slice = grpc::Slice(serialized); + grpc::ByteBuffer req_buf(&slice, 1); + return req_buf; + } + + grpc::ByteBuffer form_request() { + grpc::Slice slice; + if (body) { + int ret; + json_error_t err; + double timestamp = 0; + json_t *json_value; + ret = json_unpack_ex(body, &err, 0, "{ s: F, s: o}", + "timestamp", ×tamp, "value", &json_value); + if (ret) { + throw Error::badRequest(nullptr, "Malformed body: {}", err.text); + } + if (!json_is_array(json_value)) { + throw Error::badRequest(nullptr, "Value must be an array"); + } + size_t i; + json_t *json_data; + Sample *sample_dummy = sample_alloc_mem(64); + sample_dummy->flags = (int)SampleFlags::HAS_DATA | (int)SampleFlags::HAS_TS_ORIGIN; + sample_dummy->ts.origin = time_from_double(timestamp); + sample_dummy->length = 0; + auto siglists = std::make_shared(); + json_array_foreach(json_value, i, json_data) { + auto sig = std::make_shared(); + switch (json_data->type) { + case json_type::JSON_INTEGER: + ret = sample_dummy->data[i].parseJson(SignalType::INTEGER, json_data); + sig->type = SignalType::INTEGER; + siglists->push_back(sig); + break; + case json_type::JSON_REAL: + ret = sample_dummy->data[i].parseJson(SignalType::FLOAT, json_data); + sig->type = SignalType::FLOAT; + siglists->push_back(sig); + break; + default: + throw Error::badRequest(nullptr, "Value type unsupported"); + } + if (ret) { + throw Error::badRequest(nullptr, "Value type miss match"); + } + sample_dummy->length++; + } + sample_dummy->signals = siglists; + size_t dummy_buflen = 64 * 1024; + char *dummy_buf = new char[dummy_buflen]; + size_t dummy_wbytes; + ret = gateway_node->formatter->sprint(dummy_buf, dummy_buflen, &dummy_wbytes, sample_dummy); + if (ret < 0) { + logger->warn("Failed to format request payload"); + } + slice = grpc::Slice(dummy_buf, dummy_wbytes); + sample_free(sample_dummy); + delete [] dummy_buf; + } else { + pthread_mutex_lock(&gateway_node->write.mutex); + slice = grpc::Slice(gateway_node->write.buf, gateway_node->write.wbytes); + pthread_mutex_unlock(&gateway_node->write.mutex); + } + grpc::ByteBuffer req_buf(&slice, 1); + return req_buf; + } + + json_t *parse_response_reflection(const google::protobuf::Descriptor *resp_desc, std::string &recv_data) { + google::protobuf::DynamicMessageFactory dym_factory; + json_t *root = json_object(); + const google::protobuf::Message* resp_prototype = dym_factory.GetPrototype(resp_desc); + std::unique_ptr response(resp_prototype->New()); + if (response->ParseFromString(recv_data)) { + const google::protobuf::Reflection* resp_refl = response->GetReflection(); + int resp_field_count = resp_desc->field_count(); + for (int i = 0; i < resp_field_count; i++) { + if (const google::protobuf::FieldDescriptor* f = resp_desc->field(i)) { + if (f->is_repeated()) { + auto *json_arr = json_array(); + for (int j = 0; j < resp_refl->FieldSize(*response, f); j++) { + switch (f->type()) { + case grpc::protobuf::FieldDescriptor::Type::TYPE_STRING: + json_array_append(json_arr, json_string(resp_refl->GetRepeatedString(*response, f, j).c_str())); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_INT32: + json_array_append(json_arr, json_integer(resp_refl->GetRepeatedInt32(*response, f, j))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_INT64: + json_array_append(json_arr, json_integer(resp_refl->GetRepeatedInt64(*response, f, j))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_DOUBLE: + json_array_append(json_arr, json_real(resp_refl->GetRepeatedDouble(*response, f, j))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_BOOL: + json_array_append(json_arr, json_boolean(resp_refl->GetRepeatedBool(*response, f, j))); + break; + default: + break; + } + } + json_object_set_new(root, f->name().c_str(), json_arr); + } else { + switch (f->type()) { + case grpc::protobuf::FieldDescriptor::Type::TYPE_STRING: + json_object_set_new(root, f->name().c_str(), json_string(resp_refl->GetString(*response, f).c_str())); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_INT32: + json_object_set_new(root, f->name().c_str(), json_integer(resp_refl->GetInt32(*response, f))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_INT64: + json_object_set_new(root, f->name().c_str(), json_integer(resp_refl->GetInt64(*response, f))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_DOUBLE: + json_object_set_new(root, f->name().c_str(), json_real(resp_refl->GetDouble(*response, f))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_BOOL: + json_object_set_new(root, f->name().c_str(), json_boolean(resp_refl->GetBool(*response, f))); + break; + default: + break; + } + } + } + } + } + return root; + } + + json_t *parse_response(grpc::ByteBuffer &resp_buf, std::string &recv_data) { + // auto *json_sigs = json_array(); + json_t *json_sigs; + pthread_mutex_lock(&gateway_node->read.mutex); + // json_t *json_ch; + auto *json_ch = json_array(); + json_error_t err; + size_t rbytes; + + Sample *sample_dummy = sample_alloc_mem(64); + int ret = gateway_node->formatter->sscan(recv_data.c_str(), resp_buf.Length(), &rbytes, sample_dummy); + if (ret < 0) { + std::cerr << "Formatting Failed: " << ret << std::endl; + } + for (unsigned int i = 0; i < sample_dummy->length; i++) { + json_array_append(json_ch, sample_dummy->data[i].toJson(SignalType::FLOAT)); + } + json_sigs = json_pack_ex(&err, 0, "{s: f, s: o}", + "timestamp", time_to_double(&sample_dummy->ts.origin), + "value", json_ch); + + if (method == Session::PUT) { + sample_copy(gateway_node->read.sample, sample_dummy); + gateway_node->read.buf = recv_data.data(); + pthread_cond_signal(&gateway_node->read.cv); + } + + sample_free(sample_dummy); + pthread_mutex_unlock(&gateway_node->read.mutex); + + return json_sigs; + } + +protected: + GatewayNode *gateway_node; +}; + +// Register API requests +static char n[] = "gateway"; +static char r[] = "/gateway/(" RE_NODE_NAME ")/([A-Za-z0-9_-]+)/([A-Za-z0-9_-]+)/([A-Za-z0-9_-]+)"; +static char d[] = "gRPC api"; +static RequestPlugin p; + +} // namespace api +} // namespace node +} // namespace villas + diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index 068ad9fba..61609914a 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -10,6 +10,7 @@ set(NODE_SRC if(WITH_WEB) list(APPEND NODE_SRC api.cpp) + list(APPEND NODE_SRC gateway.cpp) endif() if(LIBNL3_ROUTE_FOUND) diff --git a/lib/nodes/gateway.cpp b/lib/nodes/gateway.cpp new file mode 100644 index 000000000..98fa70fd8 --- /dev/null +++ b/lib/nodes/gateway.cpp @@ -0,0 +1,129 @@ + +#include +#include +#include +#include +#include "villas/sample.hpp" + +using namespace villas; +using namespace villas::node; + +GatewayNode::GatewayNode(const uuid_t &id, const std::string &name) + : Node(id, name), read(), write() { + int ret; + auto dirs = std::vector{&read, &write}; + + for (auto dir : dirs) { + ret = pthread_mutex_init(&dir->mutex, nullptr); + if (ret) + throw RuntimeError("failed to initialize mutex"); + + ret = pthread_cond_init(&dir->cv, nullptr); + if (ret) + throw RuntimeError("failed to initialize mutex"); + } +} + +int GatewayNode::prepare() { + + read.sample = sample_alloc_mem(64); + if (!read.sample) + throw MemoryAllocationError(); + + write.sample = sample_alloc_mem(64); + if (!write.sample) + throw MemoryAllocationError(); + + return Node::prepare(); +} + +int GatewayNode::check() { + return Node::check(); +} + +int GatewayNode::_read(struct Sample *smps[], unsigned cnt) { + assert(cnt == 1); + + pthread_cond_wait(&read.cv, &read.mutex); + sample_copy(smps[0], read.sample); + + return 1; +} + +int GatewayNode::_write(struct Sample *smps[], unsigned cnt) { + assert(cnt == 1); + sample_copy(write.sample, smps[0]); + + int ret = formatter->sprint(write.buf, write.buflen, &write.wbytes, smps, cnt); + if (ret < 0) { + logger->warn("Failed to format payload: reason={}", ret); + return ret; + } + + pthread_cond_signal(&write.cv); + + return 1; +} + +int GatewayNode::parse(json_t *json) { + int ret = Node::parse(json); + if (ret) + return ret; + + json_t *json_format = nullptr; + // json_t *remote = nullptr; + const char *endpoint_address, *gateway_type; + json_error_t err; + ret = json_unpack_ex(json, &err, 0, "{ s?:o, s:s, s:s}", + "format", &json_format, "gateway_type", &gateway_type, "address", &endpoint_address); + if (ret) + throw ConfigError(json, err, "node-config-node-gateway"); + + formatter = json_format ? FormatFactory::make(json_format) + : FormatFactory::make("villas.binary"); + if (!formatter) + throw ConfigError(json_format, "node-config-node-gateway-format", + "Invalid format configuration"); + + + address = endpoint_address; + if (!strcmp(gateway_type, "gRPC")) { + type = ApiType::gRPC; + } else { + throw SystemError("Invalid api type: {}", gateway_type); + } + + return 0; +} + +int GatewayNode::start() { + formatter->start(getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET); + + read.buflen = 64 * 1024; + read.buf = new char[read.buflen]; + write.buflen = 64 * 1024; + write.buf = new char[write.buflen]; + + return Node::start(); +} + +int GatewayNode::stop() { + delete[] read.buf; + delete[] write.buf; + sample_free(read.sample); + sample_free(write.sample); + return 0; +} + +GatewayNode::~GatewayNode() { + +} + +// Register node +static char n[] = "gateway"; +static char d[] = "A node providing a Gateway"; +static NodePlugin + p; From 9ca474a849b502afaf6ed3f300e13a0d5c20ed65 Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Mon, 5 Jan 2026 19:40:26 +0000 Subject: [PATCH 02/14] fix(node-gateway): Clang format --- include/villas/nodes/gateway.hpp | 12 +- lib/api/requests/gateway/grpc.cpp | 312 ++++++++++++++++++------------ lib/nodes/gateway.cpp | 30 +-- 3 files changed, 210 insertions(+), 144 deletions(-) diff --git a/include/villas/nodes/gateway.hpp b/include/villas/nodes/gateway.hpp index 48a4f0daa..8dfc28d89 100644 --- a/include/villas/nodes/gateway.hpp +++ b/include/villas/nodes/gateway.hpp @@ -1,9 +1,16 @@ +/* Node type for API gateway. + * + * Author: Jitpanu Maneeratpongsuk + * SPDX-FileCopyrightText: 2025 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + #pragma once #include -#include #include +#include namespace villas { namespace node { @@ -26,12 +33,11 @@ class GatewayNode : public Node { Sample *sample; pthread_cond_t cv; pthread_mutex_t mutex; - char* buf; + char *buf; size_t buflen; size_t wbytes; }; - Direction read, write; std::string address; ApiType type; diff --git a/lib/api/requests/gateway/grpc.cpp b/lib/api/requests/gateway/grpc.cpp index 589c5b7bf..fa4ce395c 100644 --- a/lib/api/requests/gateway/grpc.cpp +++ b/lib/api/requests/gateway/grpc.cpp @@ -1,20 +1,29 @@ +/* Transform HTTP API to gRPC + * + * Author: Jitpanu Maneeratpongsuk + * SPDX-FileCopyrightText: 2025 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + #include -#include -#include #include #include +#include +#include -#include #include #include +#include #include using namespace google::protobuf; class ReflectionClient { public: - ReflectionClient(std::shared_ptr channel) : stub_(grpc::reflection::v1alpha::ServerReflection::NewStub(channel)) {} - google::protobuf::FileDescriptorSet *GetFileDescriptor(const std::string& symbol) { + ReflectionClient(std::shared_ptr channel) + : stub_(grpc::reflection::v1alpha::ServerReflection::NewStub(channel)) {} + google::protobuf::FileDescriptorSet * + GetFileDescriptor(const std::string &symbol) { grpc::ClientContext context; auto stream = stub_->ServerReflectionInfo(&context); grpc::reflection::v1alpha::ServerReflectionRequest request; @@ -24,16 +33,19 @@ class ReflectionClient { std::cout << "Server not allow reflection" << std::endl; grpc::reflection::v1alpha::ServerReflectionResponse response; - google::protobuf::FileDescriptorSet *file_descs = new google::protobuf::FileDescriptorSet; + google::protobuf::FileDescriptorSet *file_descs = + new google::protobuf::FileDescriptorSet; if (stream->Read(&response)) { - if (response.has_file_descriptor_response()) { - for (const auto& bytes : response.file_descriptor_response().file_descriptor_proto()) { - google::protobuf::FileDescriptorProto *file_desc = file_descs->add_file(); - file_desc->ParseFromString(bytes); - } - } else { - return nullptr; + if (response.has_file_descriptor_response()) { + for (const auto &bytes : + response.file_descriptor_response().file_descriptor_proto()) { + google::protobuf::FileDescriptorProto *file_desc = + file_descs->add_file(); + file_desc->ParseFromString(bytes); } + } else { + return nullptr; + } } else { std::cerr << "Reflection failed" << std::endl; return nullptr; @@ -41,7 +53,7 @@ class ReflectionClient { stream->WritesDone(); grpc::Status status = stream->Finish(); if (!status.ok()) { - std::cerr << "Reflection failed: " << status.error_message() << std::endl; + std::cerr << "Reflection failed: " << status.error_message() << std::endl; } return file_descs; }; @@ -82,7 +94,8 @@ class grpcRequest : public NodeRequest { auto gRPC_package = matches[2]; auto gRPC_service = matches[3]; auto gRPC_method = matches[4]; - std::string methodFullName = "/" + gRPC_package + "." + gRPC_service + "/" + gRPC_method; + std::string methodFullName = + "/" + gRPC_package + "." + gRPC_service + "/" + gRPC_method; // Form request grpc::Slice slice; @@ -92,30 +105,33 @@ class grpcRequest : public NodeRequest { grpc::ByteBuffer req_buf(&slice, 1); // gRPC call - auto channel = grpc::CreateChannel(address, grpc::InsecureChannelCredentials()); + auto channel = + grpc::CreateChannel(address, grpc::InsecureChannelCredentials()); grpc::GenericStub generic_stub(channel); grpc::ClientContext context; grpc::CompletionQueue cq; grpc::ByteBuffer resp_buf; - auto call = generic_stub.PrepareUnaryCall(&context, methodFullName, req_buf, &cq); + auto call = + generic_stub.PrepareUnaryCall(&context, methodFullName, req_buf, &cq); grpc::Status status; call->StartCall(); - call->Finish(&resp_buf, &status, (void*)1); - void* got_tag; + call->Finish(&resp_buf, &status, (void *)1); + void *got_tag; bool ok = false; cq.Next(&got_tag, &ok); if (!status.ok()) { cq.Shutdown(); - throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "Cannot connect to gRPC server"); + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, + "Cannot connect to gRPC server"); } // Parse response std::string recv_data; std::vector slices; resp_buf.Dump(&slices); - for (const auto& s:slices) - recv_data.append(reinterpret_cast(s.begin()), s.size()); + for (const auto &s : slices) + recv_data.append(reinterpret_cast(s.begin()), s.size()); json_t *json_response; json_response = parse_response(resp_buf, recv_data); @@ -135,39 +151,42 @@ class grpcRequest : public NodeRequest { auto gRPC_package = matches[2]; auto gRPC_service = matches[3]; auto gRPC_method = matches[4]; - std::string methodFullName = "/" + gRPC_package + "." + gRPC_service + "/" + gRPC_method; + std::string methodFullName = + "/" + gRPC_package + "." + gRPC_service + "/" + gRPC_method; - auto channel = grpc::CreateChannel(address, grpc::InsecureChannelCredentials()); + auto channel = + grpc::CreateChannel(address, grpc::InsecureChannelCredentials()); ReflectionClient refl_client(channel); // Form request // Request file descriptor from gRPC server grpc::ByteBuffer req_buf; - FileDescriptorSet *protos = refl_client.GetFileDescriptor(gRPC_package + "." + gRPC_service); + FileDescriptorSet *protos = + refl_client.GetFileDescriptor(gRPC_package + "." + gRPC_service); bool server_reflection; // Check if server reflection complete auto pool = std::make_unique(); - const Descriptor* resp_desc; + const Descriptor *resp_desc; if (!protos) { server_reflection = false; req_buf = form_request(); } else { server_reflection = true; - const FileDescriptor* file; + const FileDescriptor *file; for (int i = protos->file_size() - 1; i >= 0; i--) { const FileDescriptorProto &fd = protos->file(i); file = pool->BuildFile(fd); } - const ServiceDescriptor* svc = file->FindServiceByName(gRPC_service); + const ServiceDescriptor *svc = file->FindServiceByName(gRPC_service); if (!svc) { throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "gRPC service not found"); } - const MethodDescriptor* method_desc = svc->FindMethodByName(gRPC_method); + const MethodDescriptor *method_desc = svc->FindMethodByName(gRPC_method); if (!method_desc) { throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "gRPC method not found"); } - const Descriptor* req_desc = method_desc->input_type(); + const Descriptor *req_desc = method_desc->input_type(); resp_desc = method_desc->output_type(); if (req_desc->full_name() == "villas.node.Message") { req_buf = form_request(); @@ -180,30 +199,33 @@ class grpcRequest : public NodeRequest { grpc::ClientContext context; grpc::CompletionQueue cq; grpc::ByteBuffer resp_buf; - auto call = generic_stub.PrepareUnaryCall(&context, methodFullName, req_buf, &cq); + auto call = + generic_stub.PrepareUnaryCall(&context, methodFullName, req_buf, &cq); grpc::Status status; call->StartCall(); - call->Finish(&resp_buf, &status, (void*)1); - void* got_tag; + call->Finish(&resp_buf, &status, (void *)1); + void *got_tag; bool ok = false; cq.Next(&got_tag, &ok); if (!status.ok()) { cq.Shutdown(); - throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "Cannot connect to gRPC server"); + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, + "Cannot connect to gRPC server"); } // Parse response std::string recv_data; std::vector slices; resp_buf.Dump(&slices); - for (const auto& s:slices) - recv_data.append(reinterpret_cast(s.begin()), s.size()); + for (const auto &s : slices) + recv_data.append(reinterpret_cast(s.begin()), s.size()); // Empty gRPC response if (recv_data.length() <= 0) { - return new JsonResponse(session, HTTP_STATUS_OK, nullptr);; + return new JsonResponse(session, HTTP_STATUS_OK, nullptr); + ; } json_t *json_response; @@ -220,17 +242,17 @@ class grpcRequest : public NodeRequest { return new JsonResponse(session, HTTP_STATUS_OK, json_response); } - grpc::ByteBuffer form_request_reflection(const Descriptor* req_desc) { + grpc::ByteBuffer form_request_reflection(const Descriptor *req_desc) { grpc::Slice slice; DynamicMessageFactory dym_factory; - const Message* req_prototype = dym_factory.GetPrototype(req_desc); + const Message *req_prototype = dym_factory.GetPrototype(req_desc); std::unique_ptr request(req_prototype->New()); // Set field - const Reflection* refl = request->GetReflection(); + const Reflection *refl = request->GetReflection(); int field_count = req_desc->field_count(); for (int i = 0; i < field_count; i++) { - if (const FieldDescriptor* f = req_desc->field(i)) { - json_t* json_val = json_object_get(body, f->name().c_str()); + if (const FieldDescriptor *f = req_desc->field(i)) { + json_t *json_val = json_object_get(body, f->name().c_str()); if (!json_val) { if (f->is_required()) { std::cerr << "Missing required field: " << f->name() << std::endl; @@ -243,48 +265,52 @@ class grpcRequest : public NodeRequest { for (size_t j = 0; j < ele_count; j++) { json_t *json_array = json_array_get(json_val, j); switch (f->type()) { - case grpc::protobuf::FieldDescriptor::TYPE_STRING: - refl->AddString(request.get(), f, json_string_value(json_array)); - break; - case grpc::protobuf::FieldDescriptor::TYPE_INT32: - refl->AddInt32(request.get(), f, json_integer_value(json_array)); - break; - case grpc::protobuf::FieldDescriptor::TYPE_INT64: - refl->AddInt64(request.get(), f, json_integer_value(json_array)); - break; - case grpc::protobuf::FieldDescriptor::TYPE_DOUBLE: - refl->AddDouble(request.get(), f, json_real_value(json_array)); - break; - case grpc::protobuf::FieldDescriptor::TYPE_BOOL: - refl->AddBool(request.get(), f, json_boolean_value(json_array)); - break; - default: - break; + case grpc::protobuf::FieldDescriptor::TYPE_STRING: + refl->AddString(request.get(), f, + json_string_value(json_array)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_INT32: + refl->AddInt32(request.get(), f, + json_integer_value(json_array)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_INT64: + refl->AddInt64(request.get(), f, + json_integer_value(json_array)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_DOUBLE: + refl->AddDouble(request.get(), f, json_real_value(json_array)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_BOOL: + refl->AddBool(request.get(), f, json_boolean_value(json_array)); + break; + default: + break; } } } else { - std::cerr << "Json array in non repeat field not support" << std::endl; + std::cerr << "Json array in non repeat field not support" + << std::endl; continue; } } else { switch (f->type()) { - case grpc::protobuf::FieldDescriptor::TYPE_STRING: - refl->SetString(request.get(), f, json_string_value(json_val)); - break; - case grpc::protobuf::FieldDescriptor::TYPE_INT32: - refl->SetInt32(request.get(), f, json_integer_value(json_val)); - break; - case grpc::protobuf::FieldDescriptor::TYPE_INT64: - refl->SetInt64(request.get(), f, json_integer_value(json_val)); - break; - case grpc::protobuf::FieldDescriptor::TYPE_DOUBLE: - refl->SetDouble(request.get(), f, json_real_value(json_val)); - break; - case grpc::protobuf::FieldDescriptor::TYPE_BOOL: - refl->SetBool(request.get(), f, json_boolean_value(json_val)); - break; - default: - break; + case grpc::protobuf::FieldDescriptor::TYPE_STRING: + refl->SetString(request.get(), f, json_string_value(json_val)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_INT32: + refl->SetInt32(request.get(), f, json_integer_value(json_val)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_INT64: + refl->SetInt64(request.get(), f, json_integer_value(json_val)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_DOUBLE: + refl->SetDouble(request.get(), f, json_real_value(json_val)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_BOOL: + refl->SetBool(request.get(), f, json_boolean_value(json_val)); + break; + default: + break; } } } @@ -304,8 +330,8 @@ class grpcRequest : public NodeRequest { json_error_t err; double timestamp = 0; json_t *json_value; - ret = json_unpack_ex(body, &err, 0, "{ s: F, s: o}", - "timestamp", ×tamp, "value", &json_value); + ret = json_unpack_ex(body, &err, 0, "{ s: F, s: o}", "timestamp", + ×tamp, "value", &json_value); if (ret) { throw Error::badRequest(nullptr, "Malformed body: {}", err.text); } @@ -315,25 +341,26 @@ class grpcRequest : public NodeRequest { size_t i; json_t *json_data; Sample *sample_dummy = sample_alloc_mem(64); - sample_dummy->flags = (int)SampleFlags::HAS_DATA | (int)SampleFlags::HAS_TS_ORIGIN; + sample_dummy->flags = + (int)SampleFlags::HAS_DATA | (int)SampleFlags::HAS_TS_ORIGIN; sample_dummy->ts.origin = time_from_double(timestamp); sample_dummy->length = 0; - auto siglists = std::make_shared(); + auto siglists = std::make_shared(); json_array_foreach(json_value, i, json_data) { auto sig = std::make_shared(); switch (json_data->type) { - case json_type::JSON_INTEGER: - ret = sample_dummy->data[i].parseJson(SignalType::INTEGER, json_data); - sig->type = SignalType::INTEGER; - siglists->push_back(sig); - break; - case json_type::JSON_REAL: - ret = sample_dummy->data[i].parseJson(SignalType::FLOAT, json_data); - sig->type = SignalType::FLOAT; - siglists->push_back(sig); - break; - default: - throw Error::badRequest(nullptr, "Value type unsupported"); + case json_type::JSON_INTEGER: + ret = sample_dummy->data[i].parseJson(SignalType::INTEGER, json_data); + sig->type = SignalType::INTEGER; + siglists->push_back(sig); + break; + case json_type::JSON_REAL: + ret = sample_dummy->data[i].parseJson(SignalType::FLOAT, json_data); + sig->type = SignalType::FLOAT; + siglists->push_back(sig); + break; + default: + throw Error::badRequest(nullptr, "Value type unsupported"); } if (ret) { throw Error::badRequest(nullptr, "Value type miss match"); @@ -344,13 +371,14 @@ class grpcRequest : public NodeRequest { size_t dummy_buflen = 64 * 1024; char *dummy_buf = new char[dummy_buflen]; size_t dummy_wbytes; - ret = gateway_node->formatter->sprint(dummy_buf, dummy_buflen, &dummy_wbytes, sample_dummy); + ret = gateway_node->formatter->sprint(dummy_buf, dummy_buflen, + &dummy_wbytes, sample_dummy); if (ret < 0) { logger->warn("Failed to format request payload"); } slice = grpc::Slice(dummy_buf, dummy_wbytes); sample_free(sample_dummy); - delete [] dummy_buf; + delete[] dummy_buf; } else { pthread_mutex_lock(&gateway_node->write.mutex); slice = grpc::Slice(gateway_node->write.buf, gateway_node->write.wbytes); @@ -360,34 +388,48 @@ class grpcRequest : public NodeRequest { return req_buf; } - json_t *parse_response_reflection(const google::protobuf::Descriptor *resp_desc, std::string &recv_data) { + json_t * + parse_response_reflection(const google::protobuf::Descriptor *resp_desc, + std::string &recv_data) { google::protobuf::DynamicMessageFactory dym_factory; json_t *root = json_object(); - const google::protobuf::Message* resp_prototype = dym_factory.GetPrototype(resp_desc); + const google::protobuf::Message *resp_prototype = + dym_factory.GetPrototype(resp_desc); std::unique_ptr response(resp_prototype->New()); if (response->ParseFromString(recv_data)) { - const google::protobuf::Reflection* resp_refl = response->GetReflection(); + const google::protobuf::Reflection *resp_refl = response->GetReflection(); int resp_field_count = resp_desc->field_count(); for (int i = 0; i < resp_field_count; i++) { - if (const google::protobuf::FieldDescriptor* f = resp_desc->field(i)) { + if (const google::protobuf::FieldDescriptor *f = resp_desc->field(i)) { if (f->is_repeated()) { auto *json_arr = json_array(); for (int j = 0; j < resp_refl->FieldSize(*response, f); j++) { switch (f->type()) { case grpc::protobuf::FieldDescriptor::Type::TYPE_STRING: - json_array_append(json_arr, json_string(resp_refl->GetRepeatedString(*response, f, j).c_str())); + json_array_append( + json_arr, + json_string( + resp_refl->GetRepeatedString(*response, f, j).c_str())); break; case grpc::protobuf::FieldDescriptor::Type::TYPE_INT32: - json_array_append(json_arr, json_integer(resp_refl->GetRepeatedInt32(*response, f, j))); + json_array_append( + json_arr, + json_integer(resp_refl->GetRepeatedInt32(*response, f, j))); break; case grpc::protobuf::FieldDescriptor::Type::TYPE_INT64: - json_array_append(json_arr, json_integer(resp_refl->GetRepeatedInt64(*response, f, j))); + json_array_append( + json_arr, + json_integer(resp_refl->GetRepeatedInt64(*response, f, j))); break; case grpc::protobuf::FieldDescriptor::Type::TYPE_DOUBLE: - json_array_append(json_arr, json_real(resp_refl->GetRepeatedDouble(*response, f, j))); + json_array_append( + json_arr, + json_real(resp_refl->GetRepeatedDouble(*response, f, j))); break; - case grpc::protobuf::FieldDescriptor::Type::TYPE_BOOL: - json_array_append(json_arr, json_boolean(resp_refl->GetRepeatedBool(*response, f, j))); + case grpc::protobuf::FieldDescriptor::Type::TYPE_BOOL: + json_array_append( + json_arr, + json_boolean(resp_refl->GetRepeatedBool(*response, f, j))); break; default: break; @@ -396,23 +438,33 @@ class grpcRequest : public NodeRequest { json_object_set_new(root, f->name().c_str(), json_arr); } else { switch (f->type()) { - case grpc::protobuf::FieldDescriptor::Type::TYPE_STRING: - json_object_set_new(root, f->name().c_str(), json_string(resp_refl->GetString(*response, f).c_str())); - break; - case grpc::protobuf::FieldDescriptor::Type::TYPE_INT32: - json_object_set_new(root, f->name().c_str(), json_integer(resp_refl->GetInt32(*response, f))); - break; - case grpc::protobuf::FieldDescriptor::Type::TYPE_INT64: - json_object_set_new(root, f->name().c_str(), json_integer(resp_refl->GetInt64(*response, f))); - break; - case grpc::protobuf::FieldDescriptor::Type::TYPE_DOUBLE: - json_object_set_new(root, f->name().c_str(), json_real(resp_refl->GetDouble(*response, f))); - break; - case grpc::protobuf::FieldDescriptor::Type::TYPE_BOOL: - json_object_set_new(root, f->name().c_str(), json_boolean(resp_refl->GetBool(*response, f))); - break; - default: - break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_STRING: + json_object_set_new( + root, f->name().c_str(), + json_string(resp_refl->GetString(*response, f).c_str())); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_INT32: + json_object_set_new( + root, f->name().c_str(), + json_integer(resp_refl->GetInt32(*response, f))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_INT64: + json_object_set_new( + root, f->name().c_str(), + json_integer(resp_refl->GetInt64(*response, f))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_DOUBLE: + json_object_set_new( + root, f->name().c_str(), + json_real(resp_refl->GetDouble(*response, f))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_BOOL: + json_object_set_new( + root, f->name().c_str(), + json_boolean(resp_refl->GetBool(*response, f))); + break; + default: + break; } } } @@ -431,16 +483,18 @@ class grpcRequest : public NodeRequest { size_t rbytes; Sample *sample_dummy = sample_alloc_mem(64); - int ret = gateway_node->formatter->sscan(recv_data.c_str(), resp_buf.Length(), &rbytes, sample_dummy); + int ret = gateway_node->formatter->sscan( + recv_data.c_str(), resp_buf.Length(), &rbytes, sample_dummy); if (ret < 0) { std::cerr << "Formatting Failed: " << ret << std::endl; } for (unsigned int i = 0; i < sample_dummy->length; i++) { - json_array_append(json_ch, sample_dummy->data[i].toJson(SignalType::FLOAT)); + json_array_append(json_ch, + sample_dummy->data[i].toJson(SignalType::FLOAT)); } - json_sigs = json_pack_ex(&err, 0, "{s: f, s: o}", - "timestamp", time_to_double(&sample_dummy->ts.origin), - "value", json_ch); + json_sigs = json_pack_ex(&err, 0, "{s: f, s: o}", "timestamp", + time_to_double(&sample_dummy->ts.origin), "value", + json_ch); if (method == Session::PUT) { sample_copy(gateway_node->read.sample, sample_dummy); @@ -460,11 +514,11 @@ class grpcRequest : public NodeRequest { // Register API requests static char n[] = "gateway"; -static char r[] = "/gateway/(" RE_NODE_NAME ")/([A-Za-z0-9_-]+)/([A-Za-z0-9_-]+)/([A-Za-z0-9_-]+)"; +static char r[] = "/gateway/(" RE_NODE_NAME + ")/([A-Za-z0-9_-]+)/([A-Za-z0-9_-]+)/([A-Za-z0-9_-]+)"; static char d[] = "gRPC api"; static RequestPlugin p; } // namespace api } // namespace node } // namespace villas - diff --git a/lib/nodes/gateway.cpp b/lib/nodes/gateway.cpp index 98fa70fd8..9221b8999 100644 --- a/lib/nodes/gateway.cpp +++ b/lib/nodes/gateway.cpp @@ -1,8 +1,17 @@ +/* Node type: API gateway. + * + * Author: Jitpanu Maneeratpongsuk + * SPDX-FileCopyrightText: 2025 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ #include + #include + #include #include + #include "villas/sample.hpp" using namespace villas; @@ -37,9 +46,7 @@ int GatewayNode::prepare() { return Node::prepare(); } -int GatewayNode::check() { - return Node::check(); -} +int GatewayNode::check() { return Node::check(); } int GatewayNode::_read(struct Sample *smps[], unsigned cnt) { assert(cnt == 1); @@ -54,7 +61,8 @@ int GatewayNode::_write(struct Sample *smps[], unsigned cnt) { assert(cnt == 1); sample_copy(write.sample, smps[0]); - int ret = formatter->sprint(write.buf, write.buflen, &write.wbytes, smps, cnt); + int ret = + formatter->sprint(write.buf, write.buflen, &write.wbytes, smps, cnt); if (ret < 0) { logger->warn("Failed to format payload: reason={}", ret); return ret; @@ -74,18 +82,18 @@ int GatewayNode::parse(json_t *json) { // json_t *remote = nullptr; const char *endpoint_address, *gateway_type; json_error_t err; - ret = json_unpack_ex(json, &err, 0, "{ s?:o, s:s, s:s}", - "format", &json_format, "gateway_type", &gateway_type, "address", &endpoint_address); + ret = json_unpack_ex(json, &err, 0, "{ s?:o, s:s, s:s}", "format", + &json_format, "gateway_type", &gateway_type, "address", + &endpoint_address); if (ret) throw ConfigError(json, err, "node-config-node-gateway"); formatter = json_format ? FormatFactory::make(json_format) - : FormatFactory::make("villas.binary"); + : FormatFactory::make("villas.binary"); if (!formatter) throw ConfigError(json_format, "node-config-node-gateway-format", "Invalid format configuration"); - address = endpoint_address; if (!strcmp(gateway_type, "gRPC")) { type = ApiType::gRPC; @@ -115,15 +123,13 @@ int GatewayNode::stop() { return 0; } -GatewayNode::~GatewayNode() { - -} +GatewayNode::~GatewayNode() {} // Register node static char n[] = "gateway"; static char d[] = "A node providing a Gateway"; static NodePlugin p; From c45c5fa97228020709abe13119ba9ffed0eefdb3 Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Mon, 5 Jan 2026 19:42:32 +0000 Subject: [PATCH 03/14] fix(node-gateway): Remove unused file --- include/villas/api/requests/gateway.hpp | 34 ------------------------- 1 file changed, 34 deletions(-) delete mode 100644 include/villas/api/requests/gateway.hpp diff --git a/include/villas/api/requests/gateway.hpp b/include/villas/api/requests/gateway.hpp deleted file mode 100644 index b3f74af25..000000000 --- a/include/villas/api/requests/gateway.hpp +++ /dev/null @@ -1,34 +0,0 @@ -/* Universal Data-exchange API request. - * - * Author: Steffen Vogel - * SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University - * SPDX-License-Identifier: Apache-2.0 - */ - -#pragma once - -#include -#include - -namespace villas { -namespace node { - -// Forward declarations -class Node; - -namespace api { - -class GatewayRequest : public NodeRequest { - -protected: - GatewayNode *gateway_node; - -public: - using NodeRequest::NodeRequest; - - void prepare() override; -}; - -} // namespace api -} // namespace node -} // namespace villas From 25dfc5e493ead33d6e7a2a15fd98a6c03f824758 Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Mon, 5 Jan 2026 19:52:25 +0000 Subject: [PATCH 04/14] feat(node-gateway): Add example configuration --- etc/examples/nodes/gateway.conf | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 etc/examples/nodes/gateway.conf diff --git a/etc/examples/nodes/gateway.conf b/etc/examples/nodes/gateway.conf new file mode 100644 index 000000000..e4af49d4b --- /dev/null +++ b/etc/examples/nodes/gateway.conf @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University +# SPDX-License-Identifier: Apache-2.0 + +nodes = { + gateway_node = { + type = "gateway" + + format = "protobuf" + # API type + gateway_type = "gRPC" + # Address of remote server + address = "localhost:50051" + } +} From c963527fbd0e48de5d5b69c181673a82d5c370d4 Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Mon, 5 Jan 2026 20:17:27 +0000 Subject: [PATCH 05/14] feat(node-gateway): Add installation of gRPC to docker file Add download and compile of reflection.proto for gRPC server reflection to deps.sh --- packaging/deps.sh | 17 +++++++++++++++++ packaging/docker/Dockerfile.debian | 1 + packaging/docker/Dockerfile.debian-multiarch | 1 + packaging/docker/Dockerfile.fedora | 1 + packaging/docker/Dockerfile.rocky | 1 + packaging/docker/Dockerfile.rocky9 | 1 + packaging/docker/Dockerfile.ubuntu | 1 + 7 files changed, 23 insertions(+) diff --git a/packaging/deps.sh b/packaging/deps.sh index b1b65db09..647b647a2 100644 --- a/packaging/deps.sh +++ b/packaging/deps.sh @@ -602,6 +602,23 @@ if ! cmake --find-package -DNAME=ghc_filesystem -DCOMPILER_ID=GNU -DLANGUAGE=CXX popd fi +# Get and complie gRPC server reflection proto description file +if ! find /usr/{local,}/include -name reflection.pb.cc | grep -q . && + should_build "gRPC reflection" "For API gateway node-type"; then + mkdir -p gRPC_reflection + pushd gRPC_reflection + curl https://raw.githubusercontent.com/grpc/grpc/refs/heads/master/src/proto/grpc/reflection/v1alpha/reflection.proto -o reflection.proto + protoc -I. \ + --cpp_out=. \ + --grpc_out=. \ + --plugin=protoc-gen-grpc=/usr/bin/grpc_cpp_plugin \ + reflection.proto + mkdir -p /usr/local/include/grpc/reflection/v1alpha + cp *.h /usr/local/include/grpc/reflection/v1alpha/ + cp *.cc /usr/local/include/grpc/reflection/v1alpha/ + popd +fi + popd >/dev/null # Update linker cache diff --git a/packaging/docker/Dockerfile.debian b/packaging/docker/Dockerfile.debian index 631e16f86..a52119e58 100644 --- a/packaging/docker/Dockerfile.debian +++ b/packaging/docker/Dockerfile.debian @@ -34,6 +34,7 @@ RUN apt-get update && \ libcurl4-openssl-dev \ libfmt-dev \ libgraphviz-dev \ + libgrpc++-dev \ libhiredis-dev \ libibverbs-dev \ libjansson-dev \ diff --git a/packaging/docker/Dockerfile.debian-multiarch b/packaging/docker/Dockerfile.debian-multiarch index 6007a9efd..b539966ac 100644 --- a/packaging/docker/Dockerfile.debian-multiarch +++ b/packaging/docker/Dockerfile.debian-multiarch @@ -41,6 +41,7 @@ RUN apt-get update && \ libconfig-dev:${ARCH} \ libcurl4-openssl-dev:${ARCH} \ libgraphviz-dev:${ARCH} \ + libgrpc++-dev:${ARCH} \ libhiredis-dev:${ARCH} \ libibverbs-dev:${ARCH} \ libjansson-dev:${ARCH} \ diff --git a/packaging/docker/Dockerfile.fedora b/packaging/docker/Dockerfile.fedora index 2beee634f..4df3ae613 100644 --- a/packaging/docker/Dockerfile.fedora +++ b/packaging/docker/Dockerfile.fedora @@ -43,6 +43,7 @@ RUN pip install \ RUN dnf -y install \ fmt-devel \ graphviz-devel \ + grpc-devel \ hiredis-devel \ jansson-devel \ libconfig-devel \ diff --git a/packaging/docker/Dockerfile.rocky b/packaging/docker/Dockerfile.rocky index 384bf6d68..ed55d68ed 100644 --- a/packaging/docker/Dockerfile.rocky +++ b/packaging/docker/Dockerfile.rocky @@ -31,6 +31,7 @@ RUN dnf --allowerasing -y install \ RUN dnf -y install \ openssl-devel \ graphviz-devel \ + grpc-devel \ protobuf-devel \ protobuf-c-devel \ libuuid-devel \ diff --git a/packaging/docker/Dockerfile.rocky9 b/packaging/docker/Dockerfile.rocky9 index f7dfffd40..2ca22b2a4 100644 --- a/packaging/docker/Dockerfile.rocky9 +++ b/packaging/docker/Dockerfile.rocky9 @@ -31,6 +31,7 @@ RUN dnf --allowerasing -y install \ RUN dnf -y install \ openssl-devel \ graphviz-devel \ + grpc-devel \ protobuf-devel \ protobuf-c-devel \ libuuid-devel \ diff --git a/packaging/docker/Dockerfile.ubuntu b/packaging/docker/Dockerfile.ubuntu index 5a4a0c506..8ab5bec2f 100644 --- a/packaging/docker/Dockerfile.ubuntu +++ b/packaging/docker/Dockerfile.ubuntu @@ -39,6 +39,7 @@ RUN apt-get update && \ libfmt-dev \ libglib2.0-dev \ libgraphviz-dev \ + libgrpc++-dev \ libhiredis-dev \ libibverbs-dev \ libjansson-dev \ From 718e2ec01c981cb87c7ba51d7ba752b0e6f194bb Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Mon, 5 Jan 2026 21:19:19 +0000 Subject: [PATCH 06/14] fix(node-gateway): Add installation of protobuf gRPC plugin for Ubuntu and Debian --- packaging/docker/Dockerfile.debian | 1 + packaging/docker/Dockerfile.debian-multiarch | 1 + packaging/docker/Dockerfile.ubuntu | 1 + 3 files changed, 3 insertions(+) diff --git a/packaging/docker/Dockerfile.debian b/packaging/docker/Dockerfile.debian index a52119e58..0943512f3 100644 --- a/packaging/docker/Dockerfile.debian +++ b/packaging/docker/Dockerfile.debian @@ -53,6 +53,7 @@ RUN apt-get update && \ libssl-dev \ libusb-1.0-0-dev \ libzmq3-dev \ + protobuf-compiler-grpc \ uuid-dev # Install unpackaged dependencies from source diff --git a/packaging/docker/Dockerfile.debian-multiarch b/packaging/docker/Dockerfile.debian-multiarch index b539966ac..30ccd048c 100644 --- a/packaging/docker/Dockerfile.debian-multiarch +++ b/packaging/docker/Dockerfile.debian-multiarch @@ -59,6 +59,7 @@ RUN apt-get update && \ libssl-dev:${ARCH} \ libusb-1.0-0-dev:${ARCH} \ libzmq3-dev:${ARCH} \ + protobuf-compiler-grpc:${ARCH} \ uuid-dev:${ARCH} ADD cmake/toolchains/debian-${ARCH}.cmake / diff --git a/packaging/docker/Dockerfile.ubuntu b/packaging/docker/Dockerfile.ubuntu index 8ab5bec2f..2b9548207 100644 --- a/packaging/docker/Dockerfile.ubuntu +++ b/packaging/docker/Dockerfile.ubuntu @@ -60,6 +60,7 @@ RUN apt-get update && \ libusb-1.0-0-dev \ libwebsockets-dev \ libzmq3-dev \ + protobuf-compiler-grpc \ uuid-dev # Install unpackaged dependencies from source From 386e44c5542796219985bc7b9d5ce955418e6ebb Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Tue, 6 Jan 2026 20:45:15 +0000 Subject: [PATCH 07/14] fix(node-gateway): Use descriptor database instead of loop build every descriptor files fix pointer cast --- lib/api/requests/gateway/grpc.cpp | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/lib/api/requests/gateway/grpc.cpp b/lib/api/requests/gateway/grpc.cpp index fa4ce395c..e0e66932f 100644 --- a/lib/api/requests/gateway/grpc.cpp +++ b/lib/api/requests/gateway/grpc.cpp @@ -43,12 +43,7 @@ class ReflectionClient { file_descs->add_file(); file_desc->ParseFromString(bytes); } - } else { - return nullptr; } - } else { - std::cerr << "Reflection failed" << std::endl; - return nullptr; } stream->WritesDone(); grpc::Status status = stream->Finish(); @@ -115,7 +110,8 @@ class grpcRequest : public NodeRequest { generic_stub.PrepareUnaryCall(&context, methodFullName, req_buf, &cq); grpc::Status status; call->StartCall(); - call->Finish(&resp_buf, &status, (void *)1); + int tag = 1; + call->Finish(&resp_buf, &status, &tag); void *got_tag; bool ok = false; cq.Next(&got_tag, &ok); @@ -165,20 +161,21 @@ class grpcRequest : public NodeRequest { refl_client.GetFileDescriptor(gRPC_package + "." + gRPC_service); bool server_reflection; - // Check if server reflection complete - auto pool = std::make_unique(); + // Make descriptor database + SimpleDescriptorDatabase db; + for (int i = 0; i < protos->file_size(); i++) { + db.Add(protos->file(i)); + } + auto pool = std::make_unique(&db); const Descriptor *resp_desc; - if (!protos) { + // Check if server reflection complete + if (protos->file_size() == 0) { server_reflection = false; req_buf = form_request(); } else { server_reflection = true; - const FileDescriptor *file; - for (int i = protos->file_size() - 1; i >= 0; i--) { - const FileDescriptorProto &fd = protos->file(i); - file = pool->BuildFile(fd); - } - const ServiceDescriptor *svc = file->FindServiceByName(gRPC_service); + const ServiceDescriptor *svc = + pool->FindServiceByName(gRPC_package + "." + gRPC_service); if (!svc) { throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "gRPC service not found"); } @@ -204,7 +201,8 @@ class grpcRequest : public NodeRequest { grpc::Status status; call->StartCall(); - call->Finish(&resp_buf, &status, (void *)1); + int tag = 1; + call->Finish(&resp_buf, &status, &tag); void *got_tag; bool ok = false; cq.Next(&got_tag, &ok); @@ -225,7 +223,6 @@ class grpcRequest : public NodeRequest { // Empty gRPC response if (recv_data.length() <= 0) { return new JsonResponse(session, HTTP_STATUS_OK, nullptr); - ; } json_t *json_response; From 33843be7fccda1f16ef3f4361c335e33ac75f397 Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Wed, 7 Jan 2026 19:39:34 +0000 Subject: [PATCH 08/14] fix(node-gateway): Fix uninitialized variable --- include/villas/nodes/gateway.hpp | 6 +----- lib/api/requests/gateway/grpc.cpp | 1 - lib/nodes/gateway.cpp | 5 +++-- 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/include/villas/nodes/gateway.hpp b/include/villas/nodes/gateway.hpp index 8dfc28d89..0000b54b7 100644 --- a/include/villas/nodes/gateway.hpp +++ b/include/villas/nodes/gateway.hpp @@ -42,11 +42,7 @@ class GatewayNode : public Node { std::string address; ApiType type; - Format *formatter; - - char *buf; - size_t buflen; - size_t wbytes; + Format::Ptr formatter; int prepare() override; diff --git a/lib/api/requests/gateway/grpc.cpp b/lib/api/requests/gateway/grpc.cpp index e0e66932f..cbbf30f04 100644 --- a/lib/api/requests/gateway/grpc.cpp +++ b/lib/api/requests/gateway/grpc.cpp @@ -495,7 +495,6 @@ class grpcRequest : public NodeRequest { if (method == Session::PUT) { sample_copy(gateway_node->read.sample, sample_dummy); - gateway_node->read.buf = recv_data.data(); pthread_cond_signal(&gateway_node->read.cv); } diff --git a/lib/nodes/gateway.cpp b/lib/nodes/gateway.cpp index 9221b8999..f9a7e9c32 100644 --- a/lib/nodes/gateway.cpp +++ b/lib/nodes/gateway.cpp @@ -18,7 +18,7 @@ using namespace villas; using namespace villas::node; GatewayNode::GatewayNode(const uuid_t &id, const std::string &name) - : Node(id, name), read(), write() { + : Node(id, name), read(), write(), type(), formatter() { int ret; auto dirs = std::vector{&read, &write}; @@ -88,8 +88,9 @@ int GatewayNode::parse(json_t *json) { if (ret) throw ConfigError(json, err, "node-config-node-gateway"); - formatter = json_format ? FormatFactory::make(json_format) + auto *fmt = json_format ? FormatFactory::make(json_format) : FormatFactory::make("villas.binary"); + formatter = Format::Ptr(fmt); if (!formatter) throw ConfigError(json_format, "node-config-node-gateway-format", "Invalid format configuration"); From 9cc575b8d007c7077d8ccad5701b61b4863853e4 Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Wed, 7 Jan 2026 19:40:44 +0000 Subject: [PATCH 09/14] fix(node-gateway): Add protobuf generated file to cppcheck supressions --- tools/cppcheck-supressions.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/cppcheck-supressions.txt b/tools/cppcheck-supressions.txt index 7b4f4812d..8ede3d8d1 100644 --- a/tools/cppcheck-supressions.txt +++ b/tools/cppcheck-supressions.txt @@ -3,6 +3,7 @@ *:*/fpga/thirdparty/* *:*/villas.pb-c.h +*:reflection.pb.h noValidConfiguration duplInheritedMember From c1fc66f42834bfba21f12e03feec835256682cdb Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Fri, 9 Jan 2026 19:54:19 +0000 Subject: [PATCH 10/14] feat(node-gateway): Add installation of gRPC for Nix --- CMakeLists.txt | 10 ++++--- flake.nix | 1 + lib/api/CMakeLists.txt | 2 +- lib/api/requests/gateway/grpc.cpp | 14 ++++----- packaging/nix/grpc_server_reflection.nix | 38 ++++++++++++++++++++++++ packaging/nix/villas.nix | 5 ++++ 6 files changed, 58 insertions(+), 12 deletions(-) create mode 100644 packaging/nix/grpc_server_reflection.nix diff --git a/CMakeLists.txt b/CMakeLists.txt index 14049a275..e44c0f4ac 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -80,8 +80,6 @@ find_package(Criterion) find_package(OpalOrchestra) find_package(LibXml2) find_package(OpalAsyncApi) -find_package(Protobuf) -find_package(gRPC) # Check for tools find_program(PROTOBUFC_COMPILER NAMES protoc-c) @@ -120,7 +118,10 @@ pkg_check_modules(NANOMSG IMPORTED_TARGET nanomsg) if(NOT NANOMSG_FOUND) pkg_check_modules(NANOMSG IMPORTED_TARGET libnanomsg>=1.0.0) endif() - +pkg_check_modules(GRPC IMPORTED_TARGET grpc grpc++) +if (TARGET PkgConfig::GRPC) + set_property(TARGET PkgConfig::GRPC PROPERTY INTERFACE_COMPILE_OPTIONS "") +endif() if (REDISPP_FOUND) file(READ "${REDISPP_INCLUDEDIR}/sw/redis++/tls.h" CONTENTS) @@ -182,6 +183,7 @@ cmake_dependent_option(WITH_SRC "Build executables" cmake_dependent_option(WITH_TESTS "Run tests" "${WITH_DEFAULTS}" "TOPLEVEL_PROJECT" OFF) cmake_dependent_option(WITH_TOOLS "Build auxilary tools" "${WITH_DEFAULTS}" "TOPLEVEL_PROJECT" OFF) cmake_dependent_option(WITH_WEB "Build with internal webserver" "${WITH_DEFAULTS}" "LIBWEBSOCKETS_FOUND" OFF) +cmake_dependent_option(WITH_GRPC "Build with grpc api" "${WITH_DEFAULTS}" "GRPC_FOUND" OFF) cmake_dependent_option(WITH_NODE_AMQP "Build with amqp node-type" "${WITH_DEFAULTS}" "RABBITMQ_C_FOUND" OFF) cmake_dependent_option(WITH_NODE_CAN "Build with can node-type" "${WITH_DEFAULTS}" "" OFF) @@ -216,7 +218,6 @@ cmake_dependent_option(WITH_NODE_ULDAQ "Build with uldaq node-type" cmake_dependent_option(WITH_NODE_WEBRTC "Build with webrtc node-type" "${WITH_DEFAULTS}" "WITH_WEB; LibDataChannel_FOUND" OFF) cmake_dependent_option(WITH_NODE_WEBSOCKET "Build with websocket node-type" "${WITH_DEFAULTS}" "WITH_WEB" OFF) cmake_dependent_option(WITH_NODE_ZEROMQ "Build with zeromq node-type" "${WITH_DEFAULTS}" "LIBZMQ_FOUND; NOT WITHOUT_GPL" OFF) -cmake_dependent_option(WITH_GRPC "Build with grpc api" "${WITH_DEFAULTS}" "gRPC_FOUND" OFF) # Set a default for the build type if("${CMAKE_BUILD_TYPE}" STREQUAL "") @@ -293,6 +294,7 @@ add_feature_info(SRC WITH_SRC "Build execu add_feature_info(TESTS WITH_TESTS "Run tests") add_feature_info(TOOLS WITH_TOOLS "Build auxilary tools") add_feature_info(WEB WITH_WEB "Build with internal webserver") +add_feature_info(GRPC_API WITH_GRPC "Build with gRPC API") add_feature_info(NODE_AMQP WITH_NODE_AMQP "Build with amqp node-type") add_feature_info(NODE_CAN WITH_NODE_CAN "Build with can node-type") diff --git a/flake.nix b/flake.nix index 5f4563838..18dce96a5 100644 --- a/flake.nix +++ b/flake.nix @@ -116,6 +116,7 @@ opendssc = pkgs.callPackage (nixDir + "/opendssc.nix") { }; orchestra = pkgs.callPackage (nixDir + "/orchestra.nix") { }; + grpc-server-reflection = pkgs.callPackage (nixDir + "/opendssc.nix") { }; }; in { diff --git a/lib/api/CMakeLists.txt b/lib/api/CMakeLists.txt index 9f5fa73ee..9373529be 100644 --- a/lib/api/CMakeLists.txt +++ b/lib/api/CMakeLists.txt @@ -52,7 +52,7 @@ if (WITH_GRPC) ${REFLECTION_INCLUDE_DIR}/grpc/reflection/v1alpha/reflection.pb.cc ${REFLECTION_INCLUDE_DIR}/grpc/reflection/v1alpha/reflection.grpc.pb.cc ) - list(APPEND LIBRARIES protobuf::libprotobuf gRPC::grpc++) + list(APPEND LIBRARIES PkgConfig::PROTOBUF PkgConfig::GRPC) endif() endif() diff --git a/lib/api/requests/gateway/grpc.cpp b/lib/api/requests/gateway/grpc.cpp index cbbf30f04..d8e754208 100644 --- a/lib/api/requests/gateway/grpc.cpp +++ b/lib/api/requests/gateway/grpc.cpp @@ -249,7 +249,7 @@ class grpcRequest : public NodeRequest { int field_count = req_desc->field_count(); for (int i = 0; i < field_count; i++) { if (const FieldDescriptor *f = req_desc->field(i)) { - json_t *json_val = json_object_get(body, f->name().c_str()); + json_t *json_val = json_object_get(body, f->name().data()); if (!json_val) { if (f->is_required()) { std::cerr << "Missing required field: " << f->name() << std::endl; @@ -432,32 +432,32 @@ class grpcRequest : public NodeRequest { break; } } - json_object_set_new(root, f->name().c_str(), json_arr); + json_object_set_new(root, f->name().data(), json_arr); } else { switch (f->type()) { case grpc::protobuf::FieldDescriptor::Type::TYPE_STRING: json_object_set_new( - root, f->name().c_str(), + root, f->name().data(), json_string(resp_refl->GetString(*response, f).c_str())); break; case grpc::protobuf::FieldDescriptor::Type::TYPE_INT32: json_object_set_new( - root, f->name().c_str(), + root, f->name().data(), json_integer(resp_refl->GetInt32(*response, f))); break; case grpc::protobuf::FieldDescriptor::Type::TYPE_INT64: json_object_set_new( - root, f->name().c_str(), + root, f->name().data(), json_integer(resp_refl->GetInt64(*response, f))); break; case grpc::protobuf::FieldDescriptor::Type::TYPE_DOUBLE: json_object_set_new( - root, f->name().c_str(), + root, f->name().data(), json_real(resp_refl->GetDouble(*response, f))); break; case grpc::protobuf::FieldDescriptor::Type::TYPE_BOOL: json_object_set_new( - root, f->name().c_str(), + root, f->name().data(), json_boolean(resp_refl->GetBool(*response, f))); break; default: diff --git a/packaging/nix/grpc_server_reflection.nix b/packaging/nix/grpc_server_reflection.nix new file mode 100644 index 000000000..ae53754cf --- /dev/null +++ b/packaging/nix/grpc_server_reflection.nix @@ -0,0 +1,38 @@ +{ + stdenv, + fetchurl, + protobuf, + grpc +}: + +stdenv.mkDerivation { + name = "grpc-server-reflection"; + + dontUnpack = true; + nativeBuildInputs = [ protobuf grpc ]; + + src = fetchurl { + url = "https://raw.githubusercontent.com/grpc/grpc/refs/heads/master/src/proto/grpc/reflection/v1alpha/reflection.proto"; + sha256 = "sha256-c1kQAgFlRRDNFZy7XEaKi+wUtUmXPG+P967poSN67Ec="; + }; + + buildPhase = '' + # Create a temporary build directory + mkdir -p tmp + ln -s $src tmp/reflection.proto + + # Generate code into build_tmp + protoc --cpp_out=tmp \ + --grpc_out=tmp \ + --plugin=protoc-gen-grpc=${grpc}/bin/grpc_cpp_plugin \ + -I tmp \ + tmp/reflection.proto + ''; + + # Install into standard 'include' path so CMake finds it + installPhase = '' + mkdir -p $out/include/grpc/reflection/v1alpha + cp tmp/*.pb.h $out/include/grpc/reflection/v1alpha/ + cp tmp/*.pb.cc $out/include/grpc/reflection/v1alpha/ + ''; +} diff --git a/packaging/nix/villas.nix b/packaging/nix/villas.nix index 246150a67..766650dd7 100644 --- a/packaging/nix/villas.nix +++ b/packaging/nix/villas.nix @@ -21,6 +21,7 @@ withNodeAmqp ? withAllNodes, withNodeComedi ? withAllNodes, withNodeEthercat ? (withAllNodes && system == "x86_64-linux"), + withNodeGateway ? withAllNodes, withNodeIec60870 ? withAllNodes, withNodeIec61850 ? withAllNodes, withNodeInfiniband ? withAllNodes, @@ -57,6 +58,8 @@ cyrus_sasl, ethercat, gnugrep, + grpc, + grpc-server-reflection, jansson, lib60870, libconfig, @@ -164,6 +167,8 @@ stdenv.mkDerivation { ++ lib.optionals withNodeAmqp [ rabbitmq-c ] ++ lib.optionals withNodeComedi [ comedilib ] ++ lib.optionals withNodeEthercat [ ethercat ] + ++ lib.optionals withNodeGateway [ grpc ] + ++ lib.optionals withNodeGateway [ grpc-server-reflection ] ++ lib.optionals withNodeIec60870 [ lib60870 ] ++ lib.optionals withNodeIec61850 [ libiec61850 ] ++ lib.optionals withNodeKafka [ From 587811794085572eaeb83b3b3b0d88f158de1a6d Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Mon, 12 Jan 2026 18:23:20 +0000 Subject: [PATCH 11/14] fix(node-gateway): Add copyright and licensing information --- packaging/nix/grpc_server_reflection.nix | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packaging/nix/grpc_server_reflection.nix b/packaging/nix/grpc_server_reflection.nix index ae53754cf..1d241328f 100644 --- a/packaging/nix/grpc_server_reflection.nix +++ b/packaging/nix/grpc_server_reflection.nix @@ -1,3 +1,5 @@ +# SPDX-FileCopyrightText: 2025 Institute for Automation of Complex Power Systems, RWTH Aachen University +# SPDX-License-Identifier: Apache-2.0 { stdenv, fetchurl, From 390db4c44b1c8791f8d5e7363d9fd3f5723d37e4 Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Tue, 13 Jan 2026 18:59:07 +0000 Subject: [PATCH 12/14] fix(node-gateway): Correct added file name in flake.nix --- flake.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flake.nix b/flake.nix index 18dce96a5..895e1f761 100644 --- a/flake.nix +++ b/flake.nix @@ -116,7 +116,7 @@ opendssc = pkgs.callPackage (nixDir + "/opendssc.nix") { }; orchestra = pkgs.callPackage (nixDir + "/orchestra.nix") { }; - grpc-server-reflection = pkgs.callPackage (nixDir + "/opendssc.nix") { }; + grpc-server-reflection = pkgs.callPackage (nixDir + "/grpc_server_reflection.nix") { }; }; in { From 8bdc67be9cb013d9bf6da5588e8889e4320efb01 Mon Sep 17 00:00:00 2001 From: Jitpanu Maneeratpongsuk Date: Wed, 14 Jan 2026 20:34:39 +0000 Subject: [PATCH 13/14] fix(node-gateway): fix nix build --- packaging/nix/villas.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packaging/nix/villas.nix b/packaging/nix/villas.nix index 766650dd7..1223e25ae 100644 --- a/packaging/nix/villas.nix +++ b/packaging/nix/villas.nix @@ -21,7 +21,7 @@ withNodeAmqp ? withAllNodes, withNodeComedi ? withAllNodes, withNodeEthercat ? (withAllNodes && system == "x86_64-linux"), - withNodeGateway ? withAllNodes, + withNodeGateway ? (withAllNodes && system == "x86_64-linux"), withNodeIec60870 ? withAllNodes, withNodeIec61850 ? withAllNodes, withNodeInfiniband ? withAllNodes, From 0b8c2109a88f74d6321d27c811d60d984aa3a45b Mon Sep 17 00:00:00 2001 From: Alexandra Date: Thu, 15 Jan 2026 13:17:25 +0000 Subject: [PATCH 14/14] fix(node-gateway): find python module Signed-off-by: Alexandra --- .gitlab-ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 0b097746b..4e8a67a9c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -164,7 +164,7 @@ test:python: - pytest --verbose . - black --line-length=90 --extend-exclude=".*(\\.pyi|_pb2.py)$" --check . - flake8 --max-line-length=90 --extend-exclude="*.pyi,*_pb2.py" . - - mypy . + - mypy --explicit-package-bases villas/ image: ${DOCKER_IMAGE_DEV}:${DOCKER_TAG} needs: - job: "build:source: [fedora]"