diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 0b097746b..4e8a67a9c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -164,7 +164,7 @@ test:python: - pytest --verbose . - black --line-length=90 --extend-exclude=".*(\\.pyi|_pb2.py)$" --check . - flake8 --max-line-length=90 --extend-exclude="*.pyi,*_pb2.py" . - - mypy . + - mypy --explicit-package-bases villas/ image: ${DOCKER_IMAGE_DEV}:${DOCKER_TAG} needs: - job: "build:source: [fedora]" diff --git a/CMakeLists.txt b/CMakeLists.txt index ff81c5d46..e44c0f4ac 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -118,7 +118,10 @@ pkg_check_modules(NANOMSG IMPORTED_TARGET nanomsg) if(NOT NANOMSG_FOUND) pkg_check_modules(NANOMSG IMPORTED_TARGET libnanomsg>=1.0.0) endif() - +pkg_check_modules(GRPC IMPORTED_TARGET grpc grpc++) +if (TARGET PkgConfig::GRPC) + set_property(TARGET PkgConfig::GRPC PROPERTY INTERFACE_COMPILE_OPTIONS "") +endif() if (REDISPP_FOUND) file(READ "${REDISPP_INCLUDEDIR}/sw/redis++/tls.h" CONTENTS) @@ -180,6 +183,7 @@ cmake_dependent_option(WITH_SRC "Build executables" cmake_dependent_option(WITH_TESTS "Run tests" "${WITH_DEFAULTS}" "TOPLEVEL_PROJECT" OFF) cmake_dependent_option(WITH_TOOLS "Build auxilary tools" "${WITH_DEFAULTS}" "TOPLEVEL_PROJECT" OFF) cmake_dependent_option(WITH_WEB "Build with internal webserver" "${WITH_DEFAULTS}" "LIBWEBSOCKETS_FOUND" OFF) +cmake_dependent_option(WITH_GRPC "Build with grpc api" "${WITH_DEFAULTS}" "GRPC_FOUND" OFF) cmake_dependent_option(WITH_NODE_AMQP "Build with amqp node-type" "${WITH_DEFAULTS}" "RABBITMQ_C_FOUND" OFF) cmake_dependent_option(WITH_NODE_CAN "Build with can node-type" "${WITH_DEFAULTS}" "" OFF) @@ -290,6 +294,7 @@ add_feature_info(SRC WITH_SRC "Build execu add_feature_info(TESTS WITH_TESTS "Run tests") add_feature_info(TOOLS WITH_TOOLS "Build auxilary tools") add_feature_info(WEB WITH_WEB "Build with internal webserver") +add_feature_info(GRPC_API WITH_GRPC "Build with gRPC API") add_feature_info(NODE_AMQP WITH_NODE_AMQP "Build with amqp node-type") add_feature_info(NODE_CAN WITH_NODE_CAN "Build with can node-type") diff --git a/etc/examples/nodes/gateway.conf b/etc/examples/nodes/gateway.conf new file mode 100644 index 000000000..e4af49d4b --- /dev/null +++ b/etc/examples/nodes/gateway.conf @@ -0,0 +1,14 @@ +# SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University +# SPDX-License-Identifier: Apache-2.0 + +nodes = { + gateway_node = { + type = "gateway" + + format = "protobuf" + # API type + gateway_type = "gRPC" + # Address of remote server + address = "localhost:50051" + } +} diff --git a/flake.nix b/flake.nix index 5f4563838..895e1f761 100644 --- a/flake.nix +++ b/flake.nix @@ -116,6 +116,7 @@ opendssc = pkgs.callPackage (nixDir + "/opendssc.nix") { }; orchestra = pkgs.callPackage (nixDir + "/orchestra.nix") { }; + grpc-server-reflection = pkgs.callPackage (nixDir + "/grpc_server_reflection.nix") { }; }; in { diff --git a/include/villas/nodes/gateway.hpp b/include/villas/nodes/gateway.hpp new file mode 100644 index 000000000..0000b54b7 --- /dev/null +++ b/include/villas/nodes/gateway.hpp @@ -0,0 +1,59 @@ +/* Node type for API gateway. + * + * Author: Jitpanu Maneeratpongsuk + * SPDX-FileCopyrightText: 2025 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include +#include + +namespace villas { +namespace node { + +// Forward declarations +struct Sample; + +class GatewayNode : public Node { +protected: + int parse(json_t *json) override; + + int _read(struct Sample *smps[], unsigned cnt) override; + int _write(struct Sample *smps[], unsigned cnt) override; + +public: + GatewayNode(const uuid_t &id = {}, const std::string &name = ""); + enum ApiType { gRPC }; + + struct Direction { + Sample *sample; + pthread_cond_t cv; + pthread_mutex_t mutex; + char *buf; + size_t buflen; + size_t wbytes; + }; + + Direction read, write; + std::string address; + ApiType type; + + Format::Ptr formatter; + + int prepare() override; + + int check() override; + + int start() override; + + int stop() override; + + ~GatewayNode(); +}; + +} // namespace node +} // namespace villas diff --git a/lib/api/CMakeLists.txt b/lib/api/CMakeLists.txt index 70974bdeb..9373529be 100644 --- a/lib/api/CMakeLists.txt +++ b/lib/api/CMakeLists.txt @@ -42,6 +42,20 @@ if(WITH_GRAPHVIZ) list(APPEND LIBRARIES PkgConfig::CGRAPH PkgConfig::GVC) endif() +if (WITH_GRPC) + find_path(REFLECTION_INCLUDE_DIR + NAMES grpc/reflection/v1alpha/reflection.pb.cc + ) + if (REFLECTION_INCLUDE_DIR) + list(APPEND API_SRC + requests/gateway/grpc.cpp + ${REFLECTION_INCLUDE_DIR}/grpc/reflection/v1alpha/reflection.pb.cc + ${REFLECTION_INCLUDE_DIR}/grpc/reflection/v1alpha/reflection.grpc.pb.cc + ) + list(APPEND LIBRARIES PkgConfig::PROTOBUF PkgConfig::GRPC) + endif() +endif() + add_library(api STATIC ${API_SRC}) target_include_directories(api PUBLIC ${INCLUDE_DIRS}) target_link_libraries(api PUBLIC ${LIBRARIES}) diff --git a/lib/api/requests/gateway/grpc.cpp b/lib/api/requests/gateway/grpc.cpp new file mode 100644 index 000000000..d8e754208 --- /dev/null +++ b/lib/api/requests/gateway/grpc.cpp @@ -0,0 +1,520 @@ +/* Transform HTTP API to gRPC + * + * Author: Jitpanu Maneeratpongsuk + * SPDX-FileCopyrightText: 2025 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +using namespace google::protobuf; + +class ReflectionClient { +public: + ReflectionClient(std::shared_ptr channel) + : stub_(grpc::reflection::v1alpha::ServerReflection::NewStub(channel)) {} + google::protobuf::FileDescriptorSet * + GetFileDescriptor(const std::string &symbol) { + grpc::ClientContext context; + auto stream = stub_->ServerReflectionInfo(&context); + grpc::reflection::v1alpha::ServerReflectionRequest request; + request.set_file_containing_symbol(symbol); + bool streamstatus = stream->Write(request); + if (!streamstatus) + std::cout << "Server not allow reflection" << std::endl; + + grpc::reflection::v1alpha::ServerReflectionResponse response; + google::protobuf::FileDescriptorSet *file_descs = + new google::protobuf::FileDescriptorSet; + if (stream->Read(&response)) { + if (response.has_file_descriptor_response()) { + for (const auto &bytes : + response.file_descriptor_response().file_descriptor_proto()) { + google::protobuf::FileDescriptorProto *file_desc = + file_descs->add_file(); + file_desc->ParseFromString(bytes); + } + } + } + stream->WritesDone(); + grpc::Status status = stream->Finish(); + if (!status.ok()) { + std::cerr << "Reflection failed: " << status.error_message() << std::endl; + } + return file_descs; + }; + +private: + std::unique_ptr stub_; +}; + +namespace villas { +namespace node { +namespace api { + +class grpcRequest : public NodeRequest { +public: + using NodeRequest::NodeRequest; + // using GatewayRequest::GatewayRequest; + + Response *execute() override { + gateway_node = dynamic_cast(node); + + switch (method) { + case Session::Method::GET: + return executeGet(); + case Session::Method::PUT: + case Session::Method::POST: + return executePost(); + default: + throw Error::invalidMethod(this); + } + } + + Response *executeGet() { + std::string address = gateway_node->address; + GatewayNode::ApiType api_type = gateway_node->type; + if (api_type != GatewayNode::ApiType::gRPC) + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "Api type unavailable"); + + auto gRPC_package = matches[2]; + auto gRPC_service = matches[3]; + auto gRPC_method = matches[4]; + std::string methodFullName = + "/" + gRPC_package + "." + gRPC_service + "/" + gRPC_method; + + // Form request + grpc::Slice slice; + pthread_mutex_lock(&gateway_node->write.mutex); + slice = grpc::Slice(gateway_node->write.buf, gateway_node->write.wbytes); + pthread_mutex_unlock(&gateway_node->write.mutex); + grpc::ByteBuffer req_buf(&slice, 1); + + // gRPC call + auto channel = + grpc::CreateChannel(address, grpc::InsecureChannelCredentials()); + grpc::GenericStub generic_stub(channel); + grpc::ClientContext context; + grpc::CompletionQueue cq; + grpc::ByteBuffer resp_buf; + auto call = + generic_stub.PrepareUnaryCall(&context, methodFullName, req_buf, &cq); + grpc::Status status; + call->StartCall(); + int tag = 1; + call->Finish(&resp_buf, &status, &tag); + void *got_tag; + bool ok = false; + cq.Next(&got_tag, &ok); + + if (!status.ok()) { + cq.Shutdown(); + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, + "Cannot connect to gRPC server"); + } + + // Parse response + std::string recv_data; + std::vector slices; + resp_buf.Dump(&slices); + for (const auto &s : slices) + recv_data.append(reinterpret_cast(s.begin()), s.size()); + + json_t *json_response; + json_response = parse_response(resp_buf, recv_data); + + cq.Shutdown(); + return new JsonResponse(session, HTTP_STATUS_OK, json_response); + } + + Response *executePost() { + // Create channel & stub + std::string address = gateway_node->address; + GatewayNode::ApiType api_type = gateway_node->type; + + if (api_type != GatewayNode::ApiType::gRPC) + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "Api type unavailable"); + + auto gRPC_package = matches[2]; + auto gRPC_service = matches[3]; + auto gRPC_method = matches[4]; + std::string methodFullName = + "/" + gRPC_package + "." + gRPC_service + "/" + gRPC_method; + + auto channel = + grpc::CreateChannel(address, grpc::InsecureChannelCredentials()); + ReflectionClient refl_client(channel); + + // Form request + // Request file descriptor from gRPC server + grpc::ByteBuffer req_buf; + FileDescriptorSet *protos = + refl_client.GetFileDescriptor(gRPC_package + "." + gRPC_service); + bool server_reflection; + + // Make descriptor database + SimpleDescriptorDatabase db; + for (int i = 0; i < protos->file_size(); i++) { + db.Add(protos->file(i)); + } + auto pool = std::make_unique(&db); + const Descriptor *resp_desc; + // Check if server reflection complete + if (protos->file_size() == 0) { + server_reflection = false; + req_buf = form_request(); + } else { + server_reflection = true; + const ServiceDescriptor *svc = + pool->FindServiceByName(gRPC_package + "." + gRPC_service); + if (!svc) { + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "gRPC service not found"); + } + const MethodDescriptor *method_desc = svc->FindMethodByName(gRPC_method); + if (!method_desc) { + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, "gRPC method not found"); + } + const Descriptor *req_desc = method_desc->input_type(); + resp_desc = method_desc->output_type(); + if (req_desc->full_name() == "villas.node.Message") { + req_buf = form_request(); + } else { + req_buf = form_request_reflection(req_desc); + } + } + // gRPC all + grpc::GenericStub generic_stub(channel); + grpc::ClientContext context; + grpc::CompletionQueue cq; + grpc::ByteBuffer resp_buf; + auto call = + generic_stub.PrepareUnaryCall(&context, methodFullName, req_buf, &cq); + grpc::Status status; + + call->StartCall(); + int tag = 1; + call->Finish(&resp_buf, &status, &tag); + void *got_tag; + bool ok = false; + cq.Next(&got_tag, &ok); + + if (!status.ok()) { + cq.Shutdown(); + throw Error(HTTP_STATUS_NOT_FOUND, nullptr, + "Cannot connect to gRPC server"); + } + + // Parse response + std::string recv_data; + std::vector slices; + resp_buf.Dump(&slices); + for (const auto &s : slices) + recv_data.append(reinterpret_cast(s.begin()), s.size()); + + // Empty gRPC response + if (recv_data.length() <= 0) { + return new JsonResponse(session, HTTP_STATUS_OK, nullptr); + } + + json_t *json_response; + if (!server_reflection) { + json_response = parse_response(resp_buf, recv_data); + } else { + if (resp_desc->full_name() == "villas.node.Message") { + json_response = parse_response(resp_buf, recv_data); + } else { + json_response = parse_response_reflection(resp_desc, recv_data); + } + } + cq.Shutdown(); + return new JsonResponse(session, HTTP_STATUS_OK, json_response); + } + + grpc::ByteBuffer form_request_reflection(const Descriptor *req_desc) { + grpc::Slice slice; + DynamicMessageFactory dym_factory; + const Message *req_prototype = dym_factory.GetPrototype(req_desc); + std::unique_ptr request(req_prototype->New()); + // Set field + const Reflection *refl = request->GetReflection(); + int field_count = req_desc->field_count(); + for (int i = 0; i < field_count; i++) { + if (const FieldDescriptor *f = req_desc->field(i)) { + json_t *json_val = json_object_get(body, f->name().data()); + if (!json_val) { + if (f->is_required()) { + std::cerr << "Missing required field: " << f->name() << std::endl; + } + continue; + } + if (json_val->type == json_type::JSON_ARRAY) { + if (f->is_repeated()) { + size_t ele_count = json_array_size(json_val); + for (size_t j = 0; j < ele_count; j++) { + json_t *json_array = json_array_get(json_val, j); + switch (f->type()) { + case grpc::protobuf::FieldDescriptor::TYPE_STRING: + refl->AddString(request.get(), f, + json_string_value(json_array)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_INT32: + refl->AddInt32(request.get(), f, + json_integer_value(json_array)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_INT64: + refl->AddInt64(request.get(), f, + json_integer_value(json_array)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_DOUBLE: + refl->AddDouble(request.get(), f, json_real_value(json_array)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_BOOL: + refl->AddBool(request.get(), f, json_boolean_value(json_array)); + break; + default: + break; + } + } + } else { + std::cerr << "Json array in non repeat field not support" + << std::endl; + continue; + } + } else { + switch (f->type()) { + case grpc::protobuf::FieldDescriptor::TYPE_STRING: + refl->SetString(request.get(), f, json_string_value(json_val)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_INT32: + refl->SetInt32(request.get(), f, json_integer_value(json_val)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_INT64: + refl->SetInt64(request.get(), f, json_integer_value(json_val)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_DOUBLE: + refl->SetDouble(request.get(), f, json_real_value(json_val)); + break; + case grpc::protobuf::FieldDescriptor::TYPE_BOOL: + refl->SetBool(request.get(), f, json_boolean_value(json_val)); + break; + default: + break; + } + } + } + } + // Serialize + std::string serialized; + request->SerializeToString(&serialized); + slice = grpc::Slice(serialized); + grpc::ByteBuffer req_buf(&slice, 1); + return req_buf; + } + + grpc::ByteBuffer form_request() { + grpc::Slice slice; + if (body) { + int ret; + json_error_t err; + double timestamp = 0; + json_t *json_value; + ret = json_unpack_ex(body, &err, 0, "{ s: F, s: o}", "timestamp", + ×tamp, "value", &json_value); + if (ret) { + throw Error::badRequest(nullptr, "Malformed body: {}", err.text); + } + if (!json_is_array(json_value)) { + throw Error::badRequest(nullptr, "Value must be an array"); + } + size_t i; + json_t *json_data; + Sample *sample_dummy = sample_alloc_mem(64); + sample_dummy->flags = + (int)SampleFlags::HAS_DATA | (int)SampleFlags::HAS_TS_ORIGIN; + sample_dummy->ts.origin = time_from_double(timestamp); + sample_dummy->length = 0; + auto siglists = std::make_shared(); + json_array_foreach(json_value, i, json_data) { + auto sig = std::make_shared(); + switch (json_data->type) { + case json_type::JSON_INTEGER: + ret = sample_dummy->data[i].parseJson(SignalType::INTEGER, json_data); + sig->type = SignalType::INTEGER; + siglists->push_back(sig); + break; + case json_type::JSON_REAL: + ret = sample_dummy->data[i].parseJson(SignalType::FLOAT, json_data); + sig->type = SignalType::FLOAT; + siglists->push_back(sig); + break; + default: + throw Error::badRequest(nullptr, "Value type unsupported"); + } + if (ret) { + throw Error::badRequest(nullptr, "Value type miss match"); + } + sample_dummy->length++; + } + sample_dummy->signals = siglists; + size_t dummy_buflen = 64 * 1024; + char *dummy_buf = new char[dummy_buflen]; + size_t dummy_wbytes; + ret = gateway_node->formatter->sprint(dummy_buf, dummy_buflen, + &dummy_wbytes, sample_dummy); + if (ret < 0) { + logger->warn("Failed to format request payload"); + } + slice = grpc::Slice(dummy_buf, dummy_wbytes); + sample_free(sample_dummy); + delete[] dummy_buf; + } else { + pthread_mutex_lock(&gateway_node->write.mutex); + slice = grpc::Slice(gateway_node->write.buf, gateway_node->write.wbytes); + pthread_mutex_unlock(&gateway_node->write.mutex); + } + grpc::ByteBuffer req_buf(&slice, 1); + return req_buf; + } + + json_t * + parse_response_reflection(const google::protobuf::Descriptor *resp_desc, + std::string &recv_data) { + google::protobuf::DynamicMessageFactory dym_factory; + json_t *root = json_object(); + const google::protobuf::Message *resp_prototype = + dym_factory.GetPrototype(resp_desc); + std::unique_ptr response(resp_prototype->New()); + if (response->ParseFromString(recv_data)) { + const google::protobuf::Reflection *resp_refl = response->GetReflection(); + int resp_field_count = resp_desc->field_count(); + for (int i = 0; i < resp_field_count; i++) { + if (const google::protobuf::FieldDescriptor *f = resp_desc->field(i)) { + if (f->is_repeated()) { + auto *json_arr = json_array(); + for (int j = 0; j < resp_refl->FieldSize(*response, f); j++) { + switch (f->type()) { + case grpc::protobuf::FieldDescriptor::Type::TYPE_STRING: + json_array_append( + json_arr, + json_string( + resp_refl->GetRepeatedString(*response, f, j).c_str())); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_INT32: + json_array_append( + json_arr, + json_integer(resp_refl->GetRepeatedInt32(*response, f, j))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_INT64: + json_array_append( + json_arr, + json_integer(resp_refl->GetRepeatedInt64(*response, f, j))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_DOUBLE: + json_array_append( + json_arr, + json_real(resp_refl->GetRepeatedDouble(*response, f, j))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_BOOL: + json_array_append( + json_arr, + json_boolean(resp_refl->GetRepeatedBool(*response, f, j))); + break; + default: + break; + } + } + json_object_set_new(root, f->name().data(), json_arr); + } else { + switch (f->type()) { + case grpc::protobuf::FieldDescriptor::Type::TYPE_STRING: + json_object_set_new( + root, f->name().data(), + json_string(resp_refl->GetString(*response, f).c_str())); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_INT32: + json_object_set_new( + root, f->name().data(), + json_integer(resp_refl->GetInt32(*response, f))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_INT64: + json_object_set_new( + root, f->name().data(), + json_integer(resp_refl->GetInt64(*response, f))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_DOUBLE: + json_object_set_new( + root, f->name().data(), + json_real(resp_refl->GetDouble(*response, f))); + break; + case grpc::protobuf::FieldDescriptor::Type::TYPE_BOOL: + json_object_set_new( + root, f->name().data(), + json_boolean(resp_refl->GetBool(*response, f))); + break; + default: + break; + } + } + } + } + } + return root; + } + + json_t *parse_response(grpc::ByteBuffer &resp_buf, std::string &recv_data) { + // auto *json_sigs = json_array(); + json_t *json_sigs; + pthread_mutex_lock(&gateway_node->read.mutex); + // json_t *json_ch; + auto *json_ch = json_array(); + json_error_t err; + size_t rbytes; + + Sample *sample_dummy = sample_alloc_mem(64); + int ret = gateway_node->formatter->sscan( + recv_data.c_str(), resp_buf.Length(), &rbytes, sample_dummy); + if (ret < 0) { + std::cerr << "Formatting Failed: " << ret << std::endl; + } + for (unsigned int i = 0; i < sample_dummy->length; i++) { + json_array_append(json_ch, + sample_dummy->data[i].toJson(SignalType::FLOAT)); + } + json_sigs = json_pack_ex(&err, 0, "{s: f, s: o}", "timestamp", + time_to_double(&sample_dummy->ts.origin), "value", + json_ch); + + if (method == Session::PUT) { + sample_copy(gateway_node->read.sample, sample_dummy); + pthread_cond_signal(&gateway_node->read.cv); + } + + sample_free(sample_dummy); + pthread_mutex_unlock(&gateway_node->read.mutex); + + return json_sigs; + } + +protected: + GatewayNode *gateway_node; +}; + +// Register API requests +static char n[] = "gateway"; +static char r[] = "/gateway/(" RE_NODE_NAME + ")/([A-Za-z0-9_-]+)/([A-Za-z0-9_-]+)/([A-Za-z0-9_-]+)"; +static char d[] = "gRPC api"; +static RequestPlugin p; + +} // namespace api +} // namespace node +} // namespace villas diff --git a/lib/nodes/CMakeLists.txt b/lib/nodes/CMakeLists.txt index 068ad9fba..61609914a 100644 --- a/lib/nodes/CMakeLists.txt +++ b/lib/nodes/CMakeLists.txt @@ -10,6 +10,7 @@ set(NODE_SRC if(WITH_WEB) list(APPEND NODE_SRC api.cpp) + list(APPEND NODE_SRC gateway.cpp) endif() if(LIBNL3_ROUTE_FOUND) diff --git a/lib/nodes/gateway.cpp b/lib/nodes/gateway.cpp new file mode 100644 index 000000000..f9a7e9c32 --- /dev/null +++ b/lib/nodes/gateway.cpp @@ -0,0 +1,136 @@ +/* Node type: API gateway. + * + * Author: Jitpanu Maneeratpongsuk + * SPDX-FileCopyrightText: 2025 Institute for Automation of Complex Power Systems, RWTH Aachen University + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#include + +#include +#include + +#include "villas/sample.hpp" + +using namespace villas; +using namespace villas::node; + +GatewayNode::GatewayNode(const uuid_t &id, const std::string &name) + : Node(id, name), read(), write(), type(), formatter() { + int ret; + auto dirs = std::vector{&read, &write}; + + for (auto dir : dirs) { + ret = pthread_mutex_init(&dir->mutex, nullptr); + if (ret) + throw RuntimeError("failed to initialize mutex"); + + ret = pthread_cond_init(&dir->cv, nullptr); + if (ret) + throw RuntimeError("failed to initialize mutex"); + } +} + +int GatewayNode::prepare() { + + read.sample = sample_alloc_mem(64); + if (!read.sample) + throw MemoryAllocationError(); + + write.sample = sample_alloc_mem(64); + if (!write.sample) + throw MemoryAllocationError(); + + return Node::prepare(); +} + +int GatewayNode::check() { return Node::check(); } + +int GatewayNode::_read(struct Sample *smps[], unsigned cnt) { + assert(cnt == 1); + + pthread_cond_wait(&read.cv, &read.mutex); + sample_copy(smps[0], read.sample); + + return 1; +} + +int GatewayNode::_write(struct Sample *smps[], unsigned cnt) { + assert(cnt == 1); + sample_copy(write.sample, smps[0]); + + int ret = + formatter->sprint(write.buf, write.buflen, &write.wbytes, smps, cnt); + if (ret < 0) { + logger->warn("Failed to format payload: reason={}", ret); + return ret; + } + + pthread_cond_signal(&write.cv); + + return 1; +} + +int GatewayNode::parse(json_t *json) { + int ret = Node::parse(json); + if (ret) + return ret; + + json_t *json_format = nullptr; + // json_t *remote = nullptr; + const char *endpoint_address, *gateway_type; + json_error_t err; + ret = json_unpack_ex(json, &err, 0, "{ s?:o, s:s, s:s}", "format", + &json_format, "gateway_type", &gateway_type, "address", + &endpoint_address); + if (ret) + throw ConfigError(json, err, "node-config-node-gateway"); + + auto *fmt = json_format ? FormatFactory::make(json_format) + : FormatFactory::make("villas.binary"); + formatter = Format::Ptr(fmt); + if (!formatter) + throw ConfigError(json_format, "node-config-node-gateway-format", + "Invalid format configuration"); + + address = endpoint_address; + if (!strcmp(gateway_type, "gRPC")) { + type = ApiType::gRPC; + } else { + throw SystemError("Invalid api type: {}", gateway_type); + } + + return 0; +} + +int GatewayNode::start() { + formatter->start(getInputSignals(false), ~(int)SampleFlags::HAS_OFFSET); + + read.buflen = 64 * 1024; + read.buf = new char[read.buflen]; + write.buflen = 64 * 1024; + write.buf = new char[write.buflen]; + + return Node::start(); +} + +int GatewayNode::stop() { + delete[] read.buf; + delete[] write.buf; + sample_free(read.sample); + sample_free(write.sample); + return 0; +} + +GatewayNode::~GatewayNode() {} + +// Register node +static char n[] = "gateway"; +static char d[] = "A node providing a Gateway"; +static NodePlugin + p; diff --git a/packaging/deps.sh b/packaging/deps.sh index b1b65db09..647b647a2 100644 --- a/packaging/deps.sh +++ b/packaging/deps.sh @@ -602,6 +602,23 @@ if ! cmake --find-package -DNAME=ghc_filesystem -DCOMPILER_ID=GNU -DLANGUAGE=CXX popd fi +# Get and complie gRPC server reflection proto description file +if ! find /usr/{local,}/include -name reflection.pb.cc | grep -q . && + should_build "gRPC reflection" "For API gateway node-type"; then + mkdir -p gRPC_reflection + pushd gRPC_reflection + curl https://raw.githubusercontent.com/grpc/grpc/refs/heads/master/src/proto/grpc/reflection/v1alpha/reflection.proto -o reflection.proto + protoc -I. \ + --cpp_out=. \ + --grpc_out=. \ + --plugin=protoc-gen-grpc=/usr/bin/grpc_cpp_plugin \ + reflection.proto + mkdir -p /usr/local/include/grpc/reflection/v1alpha + cp *.h /usr/local/include/grpc/reflection/v1alpha/ + cp *.cc /usr/local/include/grpc/reflection/v1alpha/ + popd +fi + popd >/dev/null # Update linker cache diff --git a/packaging/docker/Dockerfile.debian b/packaging/docker/Dockerfile.debian index 631e16f86..0943512f3 100644 --- a/packaging/docker/Dockerfile.debian +++ b/packaging/docker/Dockerfile.debian @@ -34,6 +34,7 @@ RUN apt-get update && \ libcurl4-openssl-dev \ libfmt-dev \ libgraphviz-dev \ + libgrpc++-dev \ libhiredis-dev \ libibverbs-dev \ libjansson-dev \ @@ -52,6 +53,7 @@ RUN apt-get update && \ libssl-dev \ libusb-1.0-0-dev \ libzmq3-dev \ + protobuf-compiler-grpc \ uuid-dev # Install unpackaged dependencies from source diff --git a/packaging/docker/Dockerfile.debian-multiarch b/packaging/docker/Dockerfile.debian-multiarch index 6007a9efd..30ccd048c 100644 --- a/packaging/docker/Dockerfile.debian-multiarch +++ b/packaging/docker/Dockerfile.debian-multiarch @@ -41,6 +41,7 @@ RUN apt-get update && \ libconfig-dev:${ARCH} \ libcurl4-openssl-dev:${ARCH} \ libgraphviz-dev:${ARCH} \ + libgrpc++-dev:${ARCH} \ libhiredis-dev:${ARCH} \ libibverbs-dev:${ARCH} \ libjansson-dev:${ARCH} \ @@ -58,6 +59,7 @@ RUN apt-get update && \ libssl-dev:${ARCH} \ libusb-1.0-0-dev:${ARCH} \ libzmq3-dev:${ARCH} \ + protobuf-compiler-grpc:${ARCH} \ uuid-dev:${ARCH} ADD cmake/toolchains/debian-${ARCH}.cmake / diff --git a/packaging/docker/Dockerfile.fedora b/packaging/docker/Dockerfile.fedora index 2beee634f..4df3ae613 100644 --- a/packaging/docker/Dockerfile.fedora +++ b/packaging/docker/Dockerfile.fedora @@ -43,6 +43,7 @@ RUN pip install \ RUN dnf -y install \ fmt-devel \ graphviz-devel \ + grpc-devel \ hiredis-devel \ jansson-devel \ libconfig-devel \ diff --git a/packaging/docker/Dockerfile.rocky b/packaging/docker/Dockerfile.rocky index 384bf6d68..ed55d68ed 100644 --- a/packaging/docker/Dockerfile.rocky +++ b/packaging/docker/Dockerfile.rocky @@ -31,6 +31,7 @@ RUN dnf --allowerasing -y install \ RUN dnf -y install \ openssl-devel \ graphviz-devel \ + grpc-devel \ protobuf-devel \ protobuf-c-devel \ libuuid-devel \ diff --git a/packaging/docker/Dockerfile.rocky9 b/packaging/docker/Dockerfile.rocky9 index f7dfffd40..2ca22b2a4 100644 --- a/packaging/docker/Dockerfile.rocky9 +++ b/packaging/docker/Dockerfile.rocky9 @@ -31,6 +31,7 @@ RUN dnf --allowerasing -y install \ RUN dnf -y install \ openssl-devel \ graphviz-devel \ + grpc-devel \ protobuf-devel \ protobuf-c-devel \ libuuid-devel \ diff --git a/packaging/docker/Dockerfile.ubuntu b/packaging/docker/Dockerfile.ubuntu index 5a4a0c506..2b9548207 100644 --- a/packaging/docker/Dockerfile.ubuntu +++ b/packaging/docker/Dockerfile.ubuntu @@ -39,6 +39,7 @@ RUN apt-get update && \ libfmt-dev \ libglib2.0-dev \ libgraphviz-dev \ + libgrpc++-dev \ libhiredis-dev \ libibverbs-dev \ libjansson-dev \ @@ -59,6 +60,7 @@ RUN apt-get update && \ libusb-1.0-0-dev \ libwebsockets-dev \ libzmq3-dev \ + protobuf-compiler-grpc \ uuid-dev # Install unpackaged dependencies from source diff --git a/packaging/nix/grpc_server_reflection.nix b/packaging/nix/grpc_server_reflection.nix new file mode 100644 index 000000000..1d241328f --- /dev/null +++ b/packaging/nix/grpc_server_reflection.nix @@ -0,0 +1,40 @@ +# SPDX-FileCopyrightText: 2025 Institute for Automation of Complex Power Systems, RWTH Aachen University +# SPDX-License-Identifier: Apache-2.0 +{ + stdenv, + fetchurl, + protobuf, + grpc +}: + +stdenv.mkDerivation { + name = "grpc-server-reflection"; + + dontUnpack = true; + nativeBuildInputs = [ protobuf grpc ]; + + src = fetchurl { + url = "https://raw.githubusercontent.com/grpc/grpc/refs/heads/master/src/proto/grpc/reflection/v1alpha/reflection.proto"; + sha256 = "sha256-c1kQAgFlRRDNFZy7XEaKi+wUtUmXPG+P967poSN67Ec="; + }; + + buildPhase = '' + # Create a temporary build directory + mkdir -p tmp + ln -s $src tmp/reflection.proto + + # Generate code into build_tmp + protoc --cpp_out=tmp \ + --grpc_out=tmp \ + --plugin=protoc-gen-grpc=${grpc}/bin/grpc_cpp_plugin \ + -I tmp \ + tmp/reflection.proto + ''; + + # Install into standard 'include' path so CMake finds it + installPhase = '' + mkdir -p $out/include/grpc/reflection/v1alpha + cp tmp/*.pb.h $out/include/grpc/reflection/v1alpha/ + cp tmp/*.pb.cc $out/include/grpc/reflection/v1alpha/ + ''; +} diff --git a/packaging/nix/villas.nix b/packaging/nix/villas.nix index 246150a67..1223e25ae 100644 --- a/packaging/nix/villas.nix +++ b/packaging/nix/villas.nix @@ -21,6 +21,7 @@ withNodeAmqp ? withAllNodes, withNodeComedi ? withAllNodes, withNodeEthercat ? (withAllNodes && system == "x86_64-linux"), + withNodeGateway ? (withAllNodes && system == "x86_64-linux"), withNodeIec60870 ? withAllNodes, withNodeIec61850 ? withAllNodes, withNodeInfiniband ? withAllNodes, @@ -57,6 +58,8 @@ cyrus_sasl, ethercat, gnugrep, + grpc, + grpc-server-reflection, jansson, lib60870, libconfig, @@ -164,6 +167,8 @@ stdenv.mkDerivation { ++ lib.optionals withNodeAmqp [ rabbitmq-c ] ++ lib.optionals withNodeComedi [ comedilib ] ++ lib.optionals withNodeEthercat [ ethercat ] + ++ lib.optionals withNodeGateway [ grpc ] + ++ lib.optionals withNodeGateway [ grpc-server-reflection ] ++ lib.optionals withNodeIec60870 [ lib60870 ] ++ lib.optionals withNodeIec61850 [ libiec61850 ] ++ lib.optionals withNodeKafka [ diff --git a/tools/cppcheck-supressions.txt b/tools/cppcheck-supressions.txt index 7b4f4812d..8ede3d8d1 100644 --- a/tools/cppcheck-supressions.txt +++ b/tools/cppcheck-supressions.txt @@ -3,6 +3,7 @@ *:*/fpga/thirdparty/* *:*/villas.pb-c.h +*:reflection.pb.h noValidConfiguration duplInheritedMember