diff --git a/include/aca_async_grpc_subscribe_server.h b/include/aca_async_grpc_subscribe_server.h new file mode 100644 index 00000000..13406eab --- /dev/null +++ b/include/aca_async_grpc_subscribe_server.h @@ -0,0 +1,58 @@ +// MIT License +// Copyright(c) 2020 Futurewei Cloud +// +// Permission is hereby granted, +// free of charge, to any person obtaining a copy of this software and associated documentation files(the "Software"), to deal in the Software without restriction, +// including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and / or sell copies of the Software, and to permit persons +// to whom the Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +#include + +#include +#include +#include "subscribeinfoprovisioner.grpc.pb.h" + +using grpc::Server; +using grpc::ServerAsyncResponseWriter; +using grpc::ServerBuilder; +using grpc::ServerCompletionQueue; +using grpc::ServerContext; +using grpc::Status; + +class Aca_Async_GRPC_Subscribe_Server final { + public: + ~Aca_Async_GRPC_Subscribe_Server(); + Aca_Async_GRPC_Subscribe_Server(); + void Run(); + void StopServer(); + + private: + class CallData { +public: + CallData(alcor::schema::SubscribeInfoProvisioner::AsyncService *service, + ServerCompletionQueue *cq); + void Proceed(); + +private: + alcor::schema::SubscribeInfoProvisioner::AsyncService *service_; + ServerCompletionQueue *cq_; + ServerContext ctx_; + alcor::schema::GoalState request_; + alcor::schema::GoalStateOperationReply reply_; + ServerAsyncResponseWriter responder_; + + enum CallStatus { CREATE, PROCESS, FINISH }; + CallStatus status_; + }; + + void HandleRpcs(); + std::unique_ptr cq_; + alcor::schema::GoalStateProvisioner::AsyncService service_; + std::unique_ptr server_; +}; \ No newline at end of file diff --git a/include/aca_comm_mgr.h b/include/aca_comm_mgr.h index 05734ee5..75fa9e9d 100644 --- a/include/aca_comm_mgr.h +++ b/include/aca_comm_mgr.h @@ -16,6 +16,7 @@ #define ACA_COMM_MGR_H #include "goalstateprovisioner.grpc.pb.h" +#include "subscribeinfoprovisioner.grpc.pb.h" using std::string; @@ -37,6 +38,9 @@ class Aca_Comm_Manager { int update_goal_state(alcor::schema::GoalStateV2 &goal_state_message, alcor::schema::GoalStateOperationReply &gsOperationReply); + int update_subscribe_info(alcor::schema::NodeSubscribeInfo &subscribe_info_message, + alcor::schema::SubscribeOperationReply &subscribeOperationReply); + // compiler will flag error when below is called Aca_Comm_Manager(Aca_Comm_Manager const &) = delete; void operator=(Aca_Comm_Manager const &) = delete; diff --git a/include/aca_grpc_subscribe.h b/include/aca_grpc_subscribe.h new file mode 100644 index 00000000..32248c43 --- /dev/null +++ b/include/aca_grpc_subscribe.h @@ -0,0 +1,96 @@ +// MIT License +// Copyright(c) 2020 Futurewei Cloud +// +// Permission is hereby granted, +// free of charge, to any person obtaining a copy of this software and associated documentation files(the "Software"), to deal in the Software without restriction, +// including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and / or sell copies of the Software, and to permit persons +// to whom the Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + +#include + +#include +#include +#include "subscribeinfoprovisioner.grpc.pb.h" +#include "ctpl/ctpl_stl.h" + +using namespace alcor::schema; +using grpc::Server; +using grpc::ServerAsyncReader; +using grpc::ServerAsyncReaderWriter; +using grpc::ServerAsyncResponseWriter; +using grpc::ServerBuilder; +using grpc::ServerCompletionQueue; +using grpc::ServerContext; +using grpc::ServerReader; +using grpc::ServerReaderWriter; +using grpc::ServerWriter; +using grpc::Status; + +class SubscribeInfoProvisionerAsyncServer { + public: + ~SubscribeInfoProvisionerAsyncServer() + { + this->keepReadingFromCq_ = false; + } + + /* + Base class that represents a gRPC call. + When you have a new kind of rpc, add the corresponding enum to CallType + */ + struct AsyncSubscribeInfoProvionerCallBase { + /* + Currently there are two types of CallStatus, INIT and SENT + At the INIT state, a streaming/unary rpc call creates a new streaming/unary call instance, + requests the call and then processes the received data; + AT the SENT state, a streaming call doesn't do anything; but a unary call deletes its own instance, + since this call is already done. + */ + enum CallStatus { INIT, SENT, DESTROY }; + CallStatus status_; + grpc::ServerContext ctx_; + }; + + // struct for PushNodeSubscribeInfoAsyncCall, which is a unary gRPC call + // when adding a new unary rpc call, create a new struct just like PushNodeSubscribeInfoAsyncCall + struct PushNodeSubscribeInfoAsyncCall : public AsyncSubscribeInfoProvionerCallBase { + // Received SubscribeInfo + NodeSubscribeInfo subscribeInfo_; + // Reply to be sent + SubscribeOperationReply subscribeOperationReply_; + + // Object to send reply to client + grpc::ServerAsyncResponseWriter responder_; + + // Constructor + PushNodeSubscribeInfoAsyncCall() : responder_(&ctx_) + { + } + }; + + std::unique_ptr stub_; + std::shared_ptr chan_; + + Status ShutDownServer(); + void RunServer(int thread_pool_size); + void AsyncWorkder(); + /* + Add a corresponding function here to process a new kind of rpc call. + For unary rpcs, please refer to ProcessNodeSubscribeInfoAsyncCall + + */ + void ProcessPushNodeSubscribeInfoAsyncCall(AsyncSubscribeInfoProvionerCallBase *baseCall, + bool ok); + + private: + bool keepReadingFromCq_ = true; + std::unique_ptr server_; + std::unique_ptr cq_; + SubscribeInfoProvisioner::AsyncService service_; + ctpl::thread_pool thread_pool_; +}; diff --git a/include/aca_message_pulsar_consumer.h b/include/aca_message_pulsar_consumer.h index 4572c454..04db7881 100644 --- a/include/aca_message_pulsar_consumer.h +++ b/include/aca_message_pulsar_consumer.h @@ -26,6 +26,7 @@ using namespace pulsar; using std::string; +using std::vector; namespace aca_message_pulsar { @@ -37,7 +38,7 @@ class ACA_Message_Pulsar_Consumer { string unicast_subscription_name; // Subscription name of the unicast pulsar consumer string multicast_topic_name; //A string representation of the topic to be consumed, for example: /hostid/00000000-0000-0000-0000-000000000000/netwconf/ - string unicast_topic_name; + vector unicast_topic_name = vector(); ConsumerConfiguration multicast_consumer_config; //Configuration of the mulitcast pulsar consumer ConsumerConfiguration unicast_consumer_config; //Configuration of the unicast pulsar consumer @@ -59,12 +60,16 @@ class ACA_Message_Pulsar_Consumer { void setUnicastTopicName(string topic); - public: - ACA_Message_Pulsar_Consumer(string topic, string brokers, string subscription_name); - ~ACA_Message_Pulsar_Consumer(); + static ACA_Message_Pulsar_Consumer &get_instance(); + + ACA_Message_Pulsar_Consumer(); + + ~ACA_Message_Pulsar_Consumer(); + + void init(string topic, string brokers, string subscription_name); string getBrokers() const; @@ -76,13 +81,15 @@ class ACA_Message_Pulsar_Consumer { string getUnicastSubscriptionName() const; + static string getRecoveredTopicName(); + bool multicastConsumerDispatched(); bool unicastConsumerDispatched(int stickyHash); - //static void listener(Consumer consumer, const Message& message); + bool unicastUnsubscribeAll(); - + bool unicastResubscribe(bool unSubscribe, string topic="", string stickHash=""); }; } // namespace aca_message_pulsar diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 5a0f4992..36552570 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,6 +7,7 @@ set(SOURCES ./comm/aca_message_pulsar_producer.cpp ./comm/aca_comm_mgr.cpp ./comm/aca_grpc.cpp + ./comm/aca_grpc_subscribe.cpp ./comm/aca_grpc_client.cpp ./dp_abstraction/aca_goal_state_handler.cpp ./dp_abstraction/aca_dataplane_ovs.cpp diff --git a/src/aca_main.cpp b/src/aca_main.cpp index 8e4b52fe..d0e12f69 100644 --- a/src/aca_main.cpp +++ b/src/aca_main.cpp @@ -15,6 +15,7 @@ #include "aca_log.h" #include "aca_util.h" #include "aca_message_pulsar_consumer.h" +#include "aca_grpc_subscribe.h" #include "aca_grpc.h" #include "aca_grpc_client.h" @@ -29,6 +30,7 @@ #include "aca_ovs_control.h" #include "goalstateprovisioner.grpc.pb.h" +#include "subscribeinfoprovisioner.grpc.pb.h" #include #include #include /* for getopt */ @@ -44,23 +46,27 @@ using std::string; static char EMPTY_STRING[] = ""; static char BROKER_LIST[] = "pulsar://localhost:6650"; static char PULSAR_TOPIC[] = "Host-ts-1"; -static char PULSAR_SUBSCRIPTION_NAME[] = "Test-Subscription"; +static char PULSAR_SUBSCRIPTION_NAME[] = "test-subscription"; static char GRPC_SERVER_PORT[] = "50001"; +static char GRPC_SUBSCRIBE_SERVER_PORT[] = "50002"; static char OFCTL_COMMAND[] = "monitor"; static char OFCTL_TARGET[] = "br-int"; -using namespace std; +using namespace std; // Global variables std::thread *g_grpc_server_thread = NULL; +std::thread *g_grpc_subscribe_server_thread = NULL; std::thread *g_grpc_client_thread = NULL; GoalStateProvisionerAsyncServer *g_grpc_server = NULL; +SubscribeInfoProvisionerAsyncServer *g_grpc_subscribe_server = NULL; GoalStateProvisionerClientImpl *g_grpc_client = NULL; string g_broker_list = EMPTY_STRING; string g_pulsar_topic = EMPTY_STRING; string g_pulsar_subsription_name = EMPTY_STRING; -string g_pulsar_hashed_key = "0"; +string g_pulsar_hashed_key = "49775"; string g_grpc_server_port = EMPTY_STRING; +string g_grpc_subscribe_server_port = EMPTY_STRING; string g_ofctl_command = EMPTY_STRING; string g_ofctl_target = EMPTY_STRING; string g_ofctl_options = EMPTY_STRING; @@ -138,6 +144,23 @@ static void aca_cleanup() ACA_LOG_ERROR("%s", "Unable to call delete, grpc server thread pointer is null.\n"); } + if (g_grpc_subscribe_server != NULL) { + g_grpc_subscribe_server->ShutDownServer(); + delete g_grpc_subscribe_server; + g_grpc_subscribe_server = NULL; + ACA_LOG_INFO("%s", "Cleaned up grpc subscribe server.\n"); + } else { + ACA_LOG_ERROR("%s", "Unable to call delete, grpc subscribe server pointer is null.\n"); + } + + if (g_grpc_subscribe_server_thread != NULL) { + delete g_grpc_subscribe_server_thread; + g_grpc_subscribe_server_thread = NULL; + ACA_LOG_INFO("%s", "Cleaned up grpc subscribe server thread.\n"); + } else { + ACA_LOG_ERROR("%s", "Unable to call delete, grpc subscribe server thread pointer is null.\n"); + } + // Stop the grpc client if (g_grpc_client != NULL) { delete g_grpc_client; @@ -253,6 +276,9 @@ int main(int argc, char *argv[]) if (g_grpc_server_port == EMPTY_STRING) { g_grpc_server_port = GRPC_SERVER_PORT; } + if (g_grpc_subscribe_server_port == EMPTY_STRING) { + g_grpc_subscribe_server_port = GRPC_SUBSCRIBE_SERVER_PORT; + } if (g_ofctl_command == EMPTY_STRING) { g_ofctl_command = OFCTL_COMMAND; } @@ -265,6 +291,13 @@ int main(int argc, char *argv[]) &GoalStateProvisionerAsyncServer::RunServer, g_grpc_server, thread_pools_size)); g_grpc_server_thread->detach(); + + // Create a separate thread to get subsribe info for pulsar + g_grpc_subscribe_server = new SubscribeInfoProvisionerAsyncServer(); + g_grpc_subscribe_server_thread = new std::thread(std::bind( + &SubscribeInfoProvisionerAsyncServer::RunServer, g_grpc_subscribe_server, 1)); + g_grpc_subscribe_server_thread->detach(); + // Create a separate thread to run the grpc client. g_grpc_client = new GoalStateProvisionerClientImpl(); g_grpc_client_thread = new std::thread( @@ -292,9 +325,8 @@ int main(int argc, char *argv[]) //// monitor br-tun for arp request message //ACA_OVS_Control::get_instance().monitor("br-tun", "resume"); - ACA_Message_Pulsar_Consumer network_config_consumer(g_pulsar_topic, g_broker_list, g_pulsar_subsription_name); - //network_config_consumer.multicastConsumerDispatched(); - network_config_consumer.unicastConsumerDispatched(atoi(g_pulsar_hashed_key.c_str())); + ACA_Message_Pulsar_Consumer::get_instance().init(g_pulsar_topic, g_broker_list, g_pulsar_subsription_name); + ACA_Message_Pulsar_Consumer::get_instance().unicastConsumerDispatched(atoi(g_pulsar_hashed_key.c_str())); pause(); aca_cleanup(); diff --git a/src/comm/aca_comm_mgr.cpp b/src/comm/aca_comm_mgr.cpp index 7e2f1953..85804c78 100644 --- a/src/comm/aca_comm_mgr.cpp +++ b/src/comm/aca_comm_mgr.cpp @@ -17,12 +17,15 @@ #include "aca_comm_mgr.h" #include "aca_goal_state_handler.h" #include "aca_dhcp_state_handler.h" +#include "aca_message_pulsar_consumer.h" #include "goalstateprovisioner.grpc.pb.h" +#include "subscribeinfoprovisioner.grpc.pb.h" using namespace std; using namespace alcor::schema; using namespace aca_goal_state_handler; using namespace aca_dhcp_state_handler; +using namespace aca_message_pulsar; extern string g_rpc_server; extern string g_rpc_protocol; @@ -287,6 +290,36 @@ int Aca_Comm_Manager::update_goal_state(GoalStateV2 &goal_state_message, return rc; } +int Aca_Comm_Manager::update_subscribe_info(NodeSubscribeInfo &subscribe_info_message, + SubscribeOperationReply &subscribeOperationReply) +{ + int exec_command_rc = EXIT_SUCCESS; + int rc = EXIT_SUCCESS; + auto start = chrono::steady_clock::now(); + + + auto subscribe_finished_time = chrono::steady_clock::now(); + exec_command_rc = ACA_Message_Pulsar_Consumer::get_instance().unicastResubscribe(subscribe_info_message.subscribe_operation(), + subscribe_info_message.topic(), + subscribe_info_message.key()); + auto subscribe_operation_time = + cast_to_microseconds(subscribe_finished_time - start).count(); + + ACA_LOG_INFO("[METRICS] Elapsed time for subscribe operation took: %ld microseconds or %ld milliseconds\n", + subscribe_operation_time, us_to_ms(subscribe_operation_time)); + + OperationStatus operation_status; + + if (exec_command_rc == EXIT_SUCCESS) + operation_status = OperationStatus::SUCCESS; + else + operation_status = OperationStatus::FAILURE; + subscribeOperationReply.set_operationstatus(operation_status); + + return rc; +} + + void Aca_Comm_Manager::print_goal_state(GoalState parsed_struct) { if (g_debug_mode == false) { diff --git a/src/comm/aca_grpc.cpp b/src/comm/aca_grpc.cpp index 4c4c06cd..77af3745 100644 --- a/src/comm/aca_grpc.cpp +++ b/src/comm/aca_grpc.cpp @@ -174,7 +174,8 @@ void GoalStateProvisionerAsyncServer::ProcessPushGoalStatesStreamAsyncCall( // process goalstateV2 in the thread pool // It has read from the stream, now to GoalStateV2 should not be empty // and we need to process it. - ACA_LOG_DEBUG("%s\n", "This call has already read from the stream, now we process the gsv2..."); + ACA_LOG_DEBUG("%s\n", "ACA_GRPC: This call has already read from the stream, now we process the gsv2..."); + if (streamingCall->goalStateV2_.neighbor_states_size() == 1) { // if there's only one neighbor state, it means that it is pushed @@ -189,16 +190,17 @@ void GoalStateProvisionerAsyncServer::ProcessPushGoalStatesStreamAsyncCall( } std::chrono::_V2::steady_clock::time_point start = std::chrono::steady_clock::now(); + ACA_LOG_INFO("%s\n", "ACA_GRPC: Start updating GoalStateV2..."); int rc = Aca_Comm_Manager::get_instance().update_goal_state( streamingCall->goalStateV2_, streamingCall->gsOperationReply_); if (rc == EXIT_SUCCESS) { - ACA_LOG_INFO("Control Fast Path streaming - Successfully updated host with latest goal state %d.\n", + ACA_LOG_INFO("ACA_GRPC: Control Fast Path streaming - Successfully updated host with latest goal state %d.\n", rc); } else if (rc == EINPROGRESS) { - ACA_LOG_INFO("Control Fast Path streaming - Update host with latest goal state returned pending, rc=%d.\n", + ACA_LOG_INFO("ACA_GRPC: Control Fast Path streaming - Update host with latest goal state returned pending, rc=%d.\n", rc); } else { - ACA_LOG_ERROR("Control Fast Path streaming - Failed to update host with latest goal state, rc=%d.\n", + ACA_LOG_ERROR("ACA_GRPC: Control Fast Path streaming - Failed to update host with latest goal state, rc=%d.\n", rc); } std::chrono::_V2::steady_clock::time_point end = diff --git a/src/comm/aca_grpc_subscribe.cpp b/src/comm/aca_grpc_subscribe.cpp new file mode 100644 index 00000000..f7c2e7ef --- /dev/null +++ b/src/comm/aca_grpc_subscribe.cpp @@ -0,0 +1,189 @@ +/* + * + * Copyright 2015 gRPC authors. + * Copyright 2019 The Alcor Authors - file modified. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include "subscribeinfoprovisioner.grpc.pb.h" +#include "aca_comm_mgr.h" +#include "aca_log.h" +#include "aca_grpc_subscribe.h" + +extern string g_grpc_subscribe_server_port; + + +using namespace alcor::schema; +using aca_comm_manager::Aca_Comm_Manager; + +Status SubscribeInfoProvisionerAsyncServer::ShutDownServer() +{ + ACA_LOG_INFO("%s", "Shutdown server"); + server_->Shutdown(); + cq_->Shutdown(); + thread_pool_.stop(); + keepReadingFromCq_ = false; + return Status::OK; +} + +void SubscribeInfoProvisionerAsyncServer::ProcessPushNodeSubscribeInfoAsyncCall( + AsyncSubscribeInfoProvionerCallBase *baseCall, bool ok) +{ + ACA_LOG_DEBUG("Start of ProcessPushNodeSubscribeInfoAsyncCall, OK: %ld, call_status: %ld\n", + ok, baseCall->status_); + PushNodeSubscribeInfoAsyncCall *unaryCall = + static_cast(baseCall); + if (!ok) { + // maybe delete the instance and init a new one? + ACA_LOG_DEBUG("%s\n", "Got a PushNodeSubscribeInfoAsync Call that is NOT OK."); + delete (PushNodeSubscribeInfoAsyncCall *)baseCall; + PushNodeSubscribeInfoAsyncCall *newPushNodeSubscribeInfoAsyncCallInstance = + new PushNodeSubscribeInfoAsyncCall; + newPushNodeSubscribeInfoAsyncCallInstance->status_ = + AsyncSubscribeInfoProvionerCallBase::CallStatus::INIT; + // Request for the call + service_.RequestPushNodeSubscribeInfo( + &newPushNodeSubscribeInfoAsyncCallInstance->ctx_, /*Context of this call*/ + &newPushNodeSubscribeInfoAsyncCallInstance->subscribeInfo_, /*SubscribeInfo to receive*/ + &newPushNodeSubscribeInfoAsyncCallInstance->responder_, /*Responder of call*/ + cq_.get(), /*CQ for new call*/ + cq_.get(), /*CQ for finished call*/ + newPushNodeSubscribeInfoAsyncCallInstance /*The unique tag for the call*/ + ); + } else { + switch (unaryCall->status_) { + case AsyncSubscribeInfoProvionerCallBase::CallStatus::INIT: { + ACA_LOG_DEBUG("%s\n", "Initing a new PushNodeSubscribeInfoAsyncCallInstance, before processing the current one"); + PushNodeSubscribeInfoAsyncCall *newPushNodeSubscribeInfoAsyncCallInstance = + new PushNodeSubscribeInfoAsyncCall; + newPushNodeSubscribeInfoAsyncCallInstance->status_ = + AsyncSubscribeInfoProvionerCallBase::CallStatus::INIT; + // Request for the call + service_.RequestPushNodeSubscribeInfo( + &newPushNodeSubscribeInfoAsyncCallInstance->ctx_, /*Context of this call*/ + &newPushNodeSubscribeInfoAsyncCallInstance->subscribeInfo_, /*SubscribeInfo to receive*/ + &newPushNodeSubscribeInfoAsyncCallInstance->responder_, /*Responder of call*/ + cq_.get(), /*CQ for new call*/ + cq_.get(), /*CQ for finished call*/ + newPushNodeSubscribeInfoAsyncCallInstance /*The unique tag for the call*/ + ); + // process SubscribeInfo in the thread pool + ACA_LOG_DEBUG("%s\n", "Processing a PushNodeSubscribeInfo call..."); + ACA_LOG_DEBUG("%s\n", "Received a SubscribeInfo, need to process it"); + + int rc = Aca_Comm_Manager::get_instance().update_subscribe_info( + unaryCall->subscribeInfo_, unaryCall->subscribeOperationReply_); + if (rc == EXIT_SUCCESS) { + ACA_LOG_INFO("Successfully updated host with latest subscribe info %d.\n", + rc); + } else if (rc == EINPROGRESS) { + ACA_LOG_INFO("Update host with latest subscribe info returned pending, rc=%d.\n", + rc); + } else { + ACA_LOG_ERROR("Failed to update host with latest subscribe info , rc=%d.\n", + rc); + } + unaryCall->status_ = AsyncSubscribeInfoProvionerCallBase::CallStatus::SENT; + unaryCall->responder_.Finish(unaryCall->subscribeOperationReply_, Status::OK, baseCall); + ACA_LOG_DEBUG("%s\n", "V1: responder_->Finish called"); + + } break; + case AsyncSubscribeInfoProvionerCallBase::CallStatus::SENT: { + ACA_LOG_DEBUG("Finished processing %s gRPC call, deleting it.\n", + "PushNodeSubscribeInfo"); + delete unaryCall; + } break; + default: + break; + } + } +} + + + +void SubscribeInfoProvisionerAsyncServer::AsyncWorkder() +{ + while (keepReadingFromCq_) { + ACA_LOG_DEBUG("%s\n", "At the start of the while loop"); + AsyncSubscribeInfoProvionerCallBase *asyncCallBase = NULL; + bool ok = false; + if (!cq_.get()->Next((void **)&asyncCallBase, &ok)) { + ACA_LOG_DEBUG("Completion Queue Shut. Quitting\n"); + break; + } + ACA_LOG_DEBUG("Got message from CQ, is it OK? %ld\n", ok); + + ProcessPushNodeSubscribeInfoAsyncCall(asyncCallBase, ok); + } + ACA_LOG_DEBUG("%s\n", "Out of the for loop, seems like this server is shutting down."); +} + +void SubscribeInfoProvisionerAsyncServer::RunServer(int thread_pool_size) +{ + ACA_LOG_INFO("Start of RunServer, pool size %ld\n", thread_pool_size); + + thread_pool_.resize(thread_pool_size); + ACA_LOG_DEBUG("Async GRPC SERVER: Resized thread pool to %ld threads, start waiting for the pool to have enough threads\n", + thread_pool_size); + /* wait for thread pool to initialize*/ + while (thread_pool_.n_idle() != thread_pool_.size()) { + ACA_LOG_DEBUG("%s\n", "Still waiting...sleep 1 ms"); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + }; + ACA_LOG_DEBUG("Async GRPC SERVER: finised resizing thread pool to %ld threads\n", + thread_pool_size); + // Create the server + ServerBuilder builder; + string GRPC_SERVER_ADDRESS = "0.0.0.0:" + g_grpc_subscribe_server_port; + builder.AddListeningPort(GRPC_SERVER_ADDRESS, grpc::InsecureServerCredentials()); + builder.SetMaxMessageSize(INT_MAX); + builder.SetMaxReceiveMessageSize(INT_MAX); + builder.SetMaxSendMessageSize(INT_MAX); + builder.RegisterService(&service_); + cq_ = builder.AddCompletionQueue(); + server_ = builder.BuildAndStart(); + + // Test with one thread first, will move to multiple threads later + for (int i = 0; i < thread_pool_size; i++) { + // Instantiate a new PushNodeSubscribeInfo call object, and set its status + PushNodeSubscribeInfoAsyncCall *pushNodeSubscribeInfoAsyncCallInstance = + new PushNodeSubscribeInfoAsyncCall; + pushNodeSubscribeInfoAsyncCallInstance->status_ = + AsyncSubscribeInfoProvionerCallBase::CallStatus::INIT; + // Request for the call + service_.RequestPushNodeSubscribeInfo( + &pushNodeSubscribeInfoAsyncCallInstance->ctx_, /*Context of this call*/ + &pushNodeSubscribeInfoAsyncCallInstance->subscribeInfo_, /*Subscribe Info to receive*/ + &pushNodeSubscribeInfoAsyncCallInstance->responder_, /*Responder of call*/ + cq_.get(), /*CQ for new call*/ + cq_.get(), /*CQ for finished call*/ + pushNodeSubscribeInfoAsyncCallInstance /*The unique tag for the call*/ + ); + } + + for (int i = 0; i < thread_pool_size; i++) { + ACA_LOG_DEBUG("Pushing the %ldth async worker into the pool", i); + thread_pool_.push(std::bind(&SubscribeInfoProvisionerAsyncServer::AsyncWorkder, this)); + } +} diff --git a/src/comm/aca_message_pulsar_consumer.cpp b/src/comm/aca_message_pulsar_consumer.cpp index 5bbc9125..1517486d 100644 --- a/src/comm/aca_message_pulsar_consumer.cpp +++ b/src/comm/aca_message_pulsar_consumer.cpp @@ -30,54 +30,63 @@ using pulsar::StickyRange; namespace aca_message_pulsar { - void listener(Consumer consumer, const Message& message){ alcor::schema::GoalStateV2 deserialized_GoalState; alcor::schema::GoalStateOperationReply gsOperationalReply; int rc; Result result; - ACA_LOG_DEBUG("\n<=====incoming message: %s\n", - message.getDataAsString().c_str()); - + ACA_LOG_INFO("%s","ACA_PULSAR_MQ: Successfully received the incoming message.\n"); + ACA_LOG_INFO("%s","ACA_PULSAR_MQ: Start deserializing the message to GoalState...\n"); + ACA_LOG_INFO("%s","<====================================================>\n"); rc = Aca_Comm_Manager::get_instance().deserialize( (unsigned char *)message.getData(), message.getLength(), deserialized_GoalState); + ACA_LOG_INFO("%s","<====================================================>\n"); if (rc == EXIT_SUCCESS) { + ACA_LOG_INFO("%s","ACA_PULSAR_MQ: Start updating GoalState...\n"); + ACA_LOG_INFO("%s","<====================================================>\n"); rc = Aca_Comm_Manager::get_instance().update_goal_state( deserialized_GoalState, gsOperationalReply); - - + ACA_LOG_INFO("%s","<====================================================>\n"); if (rc != EXIT_SUCCESS) { - ACA_LOG_ERROR("Failed to update host with latest goal state, rc=%d.\n", rc); + ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to update host with latest goal state, rc=%d.\n", rc); } else { - ACA_LOG_INFO("Successfully updated host with latest goal state %d.\n", rc); + ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully updated host with latest goal state, rc=%d.\n", rc); } } else { - ACA_LOG_ERROR("Deserialization failed with error code %d.\n", rc); + ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to deserialize the message with error code %d.\n", rc); } // Now acknowledge message consumer.acknowledge(message.getMessageId()); } -ACA_Message_Pulsar_Consumer::ACA_Message_Pulsar_Consumer(string topic, string brokers, string subscription_name) +ACA_Message_Pulsar_Consumer::ACA_Message_Pulsar_Consumer() { - setUnicastTopicName(topic); - setMulticastTopicName(topic); - setBrokers(brokers); - setUnicastSubscriptionName(subscription_name); - setMulticastSubscriptionName(subscription_name); - - ACA_LOG_DEBUG("Broker list: %s\n", this->brokers_list.c_str()); - ACA_LOG_DEBUG("Unicast consumer topic name: %s\n", this->unicast_topic_name.c_str()); - ACA_LOG_DEBUG("Unicast consumer subscription name: %s\n", this->unicast_subscription_name.c_str()); - ACA_LOG_DEBUG("Multicast consumer topic name: %s\n", this->multicast_topic_name.c_str()); - ACA_LOG_DEBUG("Multicast consumer subscription name: %s\n", this->multicast_subscription_name.c_str()); +} - // Create the clients - //this->ptr_multicast_client= new Client(brokers); - this->ptr_unicast_client = new Client(brokers); +ACA_Message_Pulsar_Consumer &ACA_Message_Pulsar_Consumer::get_instance() +{ + static ACA_Message_Pulsar_Consumer instance; + return instance; +} +void ACA_Message_Pulsar_Consumer::init(string topic, string brokers, string subscription_name){ + setUnicastTopicName(topic); + setMulticastTopicName(topic); + setBrokers(brokers); + setUnicastSubscriptionName(subscription_name); + setMulticastSubscriptionName(subscription_name); + + ACA_LOG_DEBUG("ACA_PULSAR_MQ: Broker list -> %s\n", this->brokers_list.c_str()); + ACA_LOG_DEBUG("ACA_PULSAR_MQ: Unicast consumer topic name -> %s\n", this->unicast_topic_name[0].c_str()); + ACA_LOG_DEBUG("ACA_PULSAR_MQ: Unicast consumer subscription name -> %s\n", this->unicast_subscription_name.c_str()); + ACA_LOG_DEBUG("ACA_PULSAR_MQ: Multicast consumer topic name -> %s\n", this->multicast_topic_name.c_str()); + ACA_LOG_DEBUG("ACA_PULSAR_MQ: Multicast consumer subscription name -> %s\n", this->multicast_subscription_name.c_str()); + + // Create the clients + this->ptr_multicast_client= new Client(brokers); + this->ptr_unicast_client = new Client(brokers); } ACA_Message_Pulsar_Consumer::~ACA_Message_Pulsar_Consumer() @@ -103,7 +112,10 @@ string ACA_Message_Pulsar_Consumer::getMulticastSubscriptionName() const string ACA_Message_Pulsar_Consumer::getUnicastTopicName() const { - return this->unicast_topic_name; + string topic = ""; + for (auto t: this->unicast_topic_name) + topic += (t+" "); + return topic; } string ACA_Message_Pulsar_Consumer::getUnicastSubscriptionName() const @@ -111,10 +123,7 @@ string ACA_Message_Pulsar_Consumer::getUnicastSubscriptionName() const return this->unicast_subscription_name; } - bool ACA_Message_Pulsar_Consumer::unicastConsumerDispatched(int stickyHash){ - Result result; - Consumer consumer; KeySharedPolicy keySharedPolicy; keySharedPolicy.setKeySharedMode(STICKY); @@ -125,9 +134,9 @@ bool ACA_Message_Pulsar_Consumer::unicastConsumerDispatched(int stickyHash){ //Use key shared mode this->unicast_consumer_config.setConsumerType(ConsumerKeyShared).setKeySharedPolicy(keySharedPolicy).setMessageListener(listener); - result = this->ptr_unicast_client->subscribe(this->unicast_topic_name,this->unicast_subscription_name,this->unicast_consumer_config,this->unicast_consumer); + Result result = this->ptr_unicast_client->subscribe(this->unicast_topic_name,this->unicast_subscription_name,this->unicast_consumer_config,this->unicast_consumer); if (result != Result::ResultOk){ - ACA_LOG_ERROR("Failed to subscribe unicast topic: %s\n", this->unicast_topic_name.c_str()); + ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to subscribe unicast topic -> %s\n", this->getUnicastTopicName().c_str()); return EXIT_FAILURE; } @@ -141,12 +150,43 @@ bool ACA_Message_Pulsar_Consumer::multicastConsumerDispatched(){ this->multicast_consumer_config.setMessageListener(listener); result = this->ptr_multicast_client->subscribe(this->multicast_topic_name,this->multicast_subscription_name,this->multicast_consumer_config,this->multicast_consumer); if (result != Result::ResultOk){ - ACA_LOG_ERROR("Failed to subscribe multicast topic: %s\n", this->multicast_topic_name.c_str()); + ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to subscribe multicast topic -> %s\n", this->multicast_topic_name.c_str()); return EXIT_FAILURE; } return EXIT_SUCCESS; } +bool ACA_Message_Pulsar_Consumer::unicastUnsubscribeAll() +{ + if(this->unicast_topic_name.empty()){ + ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully to unsubscribe all the unicast topics.\n"); + return EXIT_SUCCESS; + } + + if (this->unicast_consumer.unsubscribe() == Result::ResultOk){ + ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully to unsubscribe all the unicast topics.\n"); + this->unicast_topic_name.clear(); + return EXIT_SUCCESS; + } + else{ + ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to unsubscribe unicast topics -> %s.\n", this->getUnicastTopicName().c_str()); + return EXIT_FAILURE; + } +} + +bool ACA_Message_Pulsar_Consumer::unicastResubscribe(bool unSubscribe, string topic, string stickHash) +{ + if(!unSubscribe){ + if(this->unicast_consumer.unsubscribe() == Result::ResultOk){ // this unsubscribes topics in pulsar, but doesn't clean the topic list of Consumer. + setUnicastTopicName(topic); + return unicastConsumerDispatched(stoi(stickHash)); + } + return EXIT_FAILURE; + } + else{ + return unicastUnsubscribeAll(); + } +} void ACA_Message_Pulsar_Consumer::setBrokers(string brokers) { @@ -165,7 +205,7 @@ void ACA_Message_Pulsar_Consumer::setMulticastSubscriptionName(string subscripti void ACA_Message_Pulsar_Consumer::setUnicastTopicName(string topic) { - this->unicast_topic_name = topic; + this->unicast_topic_name.push_back(topic); } void ACA_Message_Pulsar_Consumer::setUnicastSubscriptionName(string subscription_name) @@ -173,4 +213,5 @@ void ACA_Message_Pulsar_Consumer::setUnicastSubscriptionName(string subscription this->unicast_subscription_name = subscription_name; } + } // namespace aca_message_pulsar diff --git a/src/comm/aca_message_pulsar_producer.cpp b/src/comm/aca_message_pulsar_producer.cpp index 115e50d0..39303d82 100644 --- a/src/comm/aca_message_pulsar_producer.cpp +++ b/src/comm/aca_message_pulsar_producer.cpp @@ -55,60 +55,56 @@ void ACA_Message_Pulsar_Producer::setTopicName(string topic) bool ACA_Message_Pulsar_Producer::publish(string message) { - Result result; - // Create a producer Producer producer; - result = this->ptr_client->createProducer(this->topic_name,producer); - if(result != ResultOk){ - ACA_LOG_ERROR("Failed to create producer, result=%d.\n", result); + Result createProducerResult = this->ptr_client->createProducer(this->topic_name,producer); + if(createProducerResult != ResultOk){ + ACA_LOG_ERROR("ACA_PULSAR_MQ: Failed to create producer, result=%d.\n", createProducerResult); return EXIT_FAILURE; } // Create a message Message msg = MessageBuilder().setContent(message).build(); - result = producer.send(msg); - if(result != ResultOk){ - ACA_LOG_ERROR("Failed to send message %s.\n", message.c_str()); + + if(producer.send(msg) != ResultOk){ + ACA_LOG_ERROR("ACA_PULSAR_MQ: Failed to send message -> %s.\n", message.c_str()); return EXIT_FAILURE; } + else{ + ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully send message %s.\n", message.c_str()); - ACA_LOG_INFO("Successfully send message %s\n", message.c_str()); - - // Flush all produced messages - producer.flush(); - producer.close(); - return EXIT_SUCCESS; - + // Flush all produced messages + producer.flush(); + producer.close(); + return EXIT_SUCCESS; + } } bool ACA_Message_Pulsar_Producer::publish(string message, string orderingKey) { - Result result; - // Create a producer Producer producer; - result = this->ptr_client->createProducer(this->topic_name,producer); - if(result != ResultOk){ - ACA_LOG_ERROR("Failed to create producer, result=%d.\n", result); + Result createProducerResult = this->ptr_client->createProducer(this->topic_name,producer); + if(createProducerResult != ResultOk){ + ACA_LOG_ERROR("ACA_PULSAR_MQ: Failed to create producer, result=%d.\n", createProducerResult); return EXIT_FAILURE; } // Create a message Message msg = MessageBuilder().setContent(message).setOrderingKey(orderingKey).build(); - result = producer.send(msg); - if(result != ResultOk){ - ACA_LOG_ERROR("Failed to send message %s.\n", message.c_str()); + + if(producer.send(msg) != ResultOk){ + ACA_LOG_ERROR("ACA_PULSAR_MQ: Failed to send message %s.\n", message.c_str()); return EXIT_FAILURE; } + else{ + ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully send message %s.\n", message.c_str()); - ACA_LOG_INFO("Successfully send message %s\n", message.c_str()); - - // Flush all produced messages - producer.flush(); - producer.close(); - return EXIT_SUCCESS; - + // Flush all produced messages + producer.flush(); + producer.close(); + return EXIT_SUCCESS; + } } void ACA_Message_Pulsar_Producer::setBrokers(string brokers) { diff --git a/src/grpc/CMakeLists.txt b/src/grpc/CMakeLists.txt index aa9daeb2..6334d721 100644 --- a/src/grpc/CMakeLists.txt +++ b/src/grpc/CMakeLists.txt @@ -24,8 +24,13 @@ set(aca_proto_srcs "${CMAKE_CURRENT_BINARY_DIR}/goalstateprovisioner.pb.cc") set(aca_proto_hdrs "${CMAKE_CURRENT_BINARY_DIR}/goalstateprovisioner.pb.h") set(aca_grpc_srcs "${CMAKE_CURRENT_BINARY_DIR}/goalstateprovisioner.grpc.pb.cc") set(aca_grpc_hdrs "${CMAKE_CURRENT_BINARY_DIR}/goalstateprovisioner.grpc.pb.h") + +set(aca_proto_sub_srcs "${CMAKE_CURRENT_BINARY_DIR}/subscribeinfoprovisioner.pb.cc") +set(aca_proto_sub_hdrs "${CMAKE_CURRENT_BINARY_DIR}/subscribeinfoprovisioner.pb.h") +set(aca_grpc_sub_srcs "${CMAKE_CURRENT_BINARY_DIR}/subscribeinfoprovisioner.grpc.pb.cc") +set(aca_grpc_sub_hdrs "${CMAKE_CURRENT_BINARY_DIR}/subscribeinfoprovisioner.grpc.pb.h") add_custom_command( - OUTPUT "${aca_proto_srcs}" "${aca_proto_hdrs}" "${aca_grpc_srcs}" "${aca_grpc_hdrs}" + OUTPUT "${aca_proto_srcs}" "${aca_proto_hdrs}" "${aca_grpc_srcs}" "${aca_grpc_hdrs}" "${aca_proto_sub_srcs}" "${aca_proto_sub_hdrs}" "${aca_grpc_sub_srcs}" "${aca_grpc_sub_hdrs}" COMMAND ${_PROTOBUF_PROTOC} ARGS --grpc_out "${CMAKE_CURRENT_BINARY_DIR}" --cpp_out "${CMAKE_CURRENT_BINARY_DIR}" @@ -37,4 +42,4 @@ add_custom_command( # Include generated *.pb.h files include_directories("${CMAKE_CURRENT_BINARY_DIR}") -ADD_LIBRARY(grpc ${aca_proto_srcs} ${aca_proto_hdrs} ${aca_grpc_srcs} ${aca_grpc_hdrs}) \ No newline at end of file +ADD_LIBRARY(grpc ${aca_proto_srcs} ${aca_proto_hdrs} ${aca_grpc_srcs} ${aca_grpc_hdrs} ${aca_proto_sub_srcs} ${aca_proto_sub_hdrs} ${aca_grpc_sub_srcs} ${aca_grpc_sub_hdrs}) \ No newline at end of file diff --git a/test/func_tests/gs_tests.cpp b/test/func_tests/gs_tests.cpp index 1b6a3bdf..3a883cf5 100644 --- a/test/func_tests/gs_tests.cpp +++ b/test/func_tests/gs_tests.cpp @@ -18,7 +18,9 @@ #include "aca_comm_mgr.h" #include "aca_grpc.h" #include "aca_grpc_client.h" +#include "aca_message_pulsar_producer.h" #include "goalstateprovisioner.grpc.pb.h" +#include "subscribeinfoprovisioner.grpc.pb.h" #include "goalstate.pb.h" #include "cppkafka/buffer.h" #include /* for getopt */ @@ -38,6 +40,7 @@ static char EMPTY_STRING[] = ""; static char LOCALHOST[] = "localhost"; static char GRPC_PORT[] = "50001"; +static char SUBSCRIBE_PORT[] = "50002"; using namespace std; using namespace alcor::schema; @@ -46,6 +49,7 @@ using aca_comm_manager::Aca_Comm_Manager; // Global variables string g_grpc_server_ip = EMPTY_STRING; string g_grpc_port = EMPTY_STRING; +string g_subscribe_port = EMPTY_STRING; string g_ofctl_command = EMPTY_STRING; string g_ofctl_target = EMPTY_STRING; string g_ofctl_options = EMPTY_STRING; @@ -54,6 +58,8 @@ string g_ncm_port = EMPTY_STRING; string g_grpc_server_port = EMPTY_STRING; // by default, this should run as GRCP client, unless specified by the corresponding flag. bool g_run_as_server = false; +bool g_run_as_topic_client = false; + GoalStateProvisionerAsyncServer *g_grpc_server = NULL; GoalStateProvisionerClientImpl *g_grpc_client = NULL; // GoalStateProvisionerServer *g_test_grcp_server = NULL; @@ -79,6 +85,7 @@ std::atomic_ulong g_total_vpcs_table_mutex_time(0); std::atomic_ulong g_total_update_GS_time(0); // total time for ACA message in microseconds std::atomic_ulong g_total_ACA_Message_time(0); + bool g_demo_mode = false; bool g_debug_mode = false; @@ -88,6 +95,7 @@ static string subnet_id_1 = "27330ae4-b718-11ea-b3de-111111111111"; static string subnet1_gw_ip = "10.10.0.1"; static string subnet1_gw_mac = "fa:16:3e:d7:f2:11"; static string vmac_address_1 = "fa:16:3e:d7:f2:6c"; +static string broker_list= "pulsar://localhost:6650"; using grpc::Channel; using grpc::ClientAsyncResponseReader; @@ -446,6 +454,48 @@ class GoalStateProvisionerClient { std::unique_ptr stub_; }; +class SubscribeInfoProvisionerClient { +public: + explicit SubscribeInfoProvisionerClient(std::shared_ptr channel) + : stub_(SubscribeInfoProvisioner::NewStub(channel)) + { + } + + void push_info(NodeSubscribeInfo &subscribeInfo, SubscribeOperationReply &reply) + { + ClientContext context; + + auto before_sync_call = std::chrono::steady_clock::now(); + + Status status = stub_->PushNodeSubscribeInfo(&context, subscribeInfo, &reply); + + auto after_sync_call = std::chrono::steady_clock::now(); + + auto sync_call_time = + cast_to_microseconds(after_sync_call - before_sync_call).count(); + + ACA_LOG_INFO("[METRICS] PushNodeSubscribeInfo sync call took: %ld microseconds or %ld milliseconds\n", + sync_call_time, us_to_ms(sync_call_time)); + + if (!status.ok()) { + ACA_LOG_ERROR("%s", "Subscribe info update failed\n"); + } + } + + void update_subscribe_info(const SubscribeOperation operation, const string key, const string topic, SubscribeOperationReply &reply) + { + NodeSubscribeInfo info; + info.set_subscribe_operation(operation); + info.set_key(key); + info.set_topic(topic); + + push_info(info, reply); + } + +private: + std::unique_ptr stub_; +}; + // function to handle ctrl-c and kill process static void aca_signal_handler(int sig_num) { @@ -704,6 +754,56 @@ int run_as_client() return rc; } +int run_as_topic_client() { + int rc=1; + string hashValue="49775"; + string key="9192a4d4-ffff-4ece-b3f0-8d36e3d88001"; + string updateTopic="update topic"; + SubscribeOperation operation=alcor::schema::Subscribe; + + ACA_LOG_INFO("%s", "-------------- setup local subscribe info client --------------\n"); + ACA_LOG_INFO("operation is %s\n","Subscribe"); + ACA_LOG_INFO("subscribe key is %s\n",key.c_str()); + ACA_LOG_INFO("subscribe topic is %s\n",updateTopic.c_str()); + ACA_LOG_INFO("subscribe server is %s\n",string (g_grpc_server_ip + ":" + g_subscribe_port).c_str()); + + auto before_subscribe_client = std::chrono::steady_clock::now(); + + SubscribeInfoProvisionerClient subscribe_client(grpc::CreateChannel( + g_grpc_server_ip + ":" + g_subscribe_port, grpc::InsecureChannelCredentials())); + + auto after_subscribe_client = std::chrono::steady_clock::now(); + auto client_time = + cast_to_microseconds(after_subscribe_client-before_subscribe_client).count(); + ACA_LOG_INFO("[METRICS] create subscribe client took: %ld microseconds or %ld milliseconds\n", + client_time, us_to_ms(client_time)); + + ACA_LOG_INFO("%s", "-------------- sending one subscribe info --------------\n"); + + SubscribeOperationReply reply; + auto before_send_info = std::chrono::steady_clock::now(); + + subscribe_client.update_subscribe_info(operation, hashValue, updateTopic, reply); + + auto after_send_info = std::chrono::steady_clock::now(); + auto send_info_time = + cast_to_microseconds(after_send_info-before_send_info).count(); + ACA_LOG_INFO("[METRICS] send_info call took: %ld microseconds or %ld milliseconds\n", + send_info_time, us_to_ms(send_info_time)); + + ACA_LOG_INFO("%s", "-------------- sending one empty goalstate message --------------\n"); + GoalStateV2 emptyGoalState; + string GoalStateString; + + if(emptyGoalState.SerializeToString(&GoalStateString)){ + ACA_LOG_INFO("%s","Successfully covert GoalStateV2 to message\n"); + } + + aca_message_pulsar::ACA_Message_Pulsar_Producer producer(broker_list, updateTopic); + rc = producer.publish(GoalStateString,key); + return rc; +} + int main(int argc, char *argv[]) { int option; @@ -714,7 +814,7 @@ int main(int argc, char *argv[]) signal(SIGINT, aca_signal_handler); signal(SIGTERM, aca_signal_handler); - while ((option = getopt(argc, argv, "s:p:dm")) != -1) { + while ((option = getopt(argc, argv, "s:p:dmt")) != -1) { switch (option) { case 's': g_grpc_server_ip = optarg; @@ -728,6 +828,9 @@ int main(int argc, char *argv[]) case 'm': g_run_as_server = true; break; + case 't': + g_run_as_topic_client = true; + break; default: /* the '?' case when the option is not recognized */ /* specifying port not avaiable for now */ fprintf(stderr, @@ -735,7 +838,8 @@ int main(int argc, char *argv[]) "\t\t[-s grpc server]\n" "\t\t[-p grpc port]\n" "\t\t[-d enable debug mode]\n" - "\t\t[-m If this flag is passed in, gs test runs as grpc server, which listens on localhost:54321; otherwise it runs as a grpc client]\n", + "\t\t[-m If this flag is passed in, gs test runs as grpc server, which listens on localhost:54321;]\n", + "\t\t[-t If this flag is passed in, gs test runs as grpc topic client, which publishes topic subscribe requests to localhost:50002; otherwise it runs as a grpc client]\n", argv[0]); exit(EXIT_FAILURE); } @@ -749,11 +853,18 @@ int main(int argc, char *argv[]) if (g_grpc_port == EMPTY_STRING) { g_grpc_port = GRPC_PORT; } + if(g_subscribe_port == EMPTY_STRING){ + g_subscribe_port = SUBSCRIBE_PORT; + } if (g_run_as_server) { rc = RunServer(); - } else { - rc = run_as_client(); + } + else if (g_run_as_topic_client){ + rc = run_as_topic_client(); + } + else { + rc = run_as_client(); } // Verify that the version of the library that we linked against is // compatible with the version of the headers we compiled against. @@ -762,4 +873,6 @@ int main(int argc, char *argv[]) aca_cleanup(); return rc; -} \ No newline at end of file +} + + diff --git a/test/gtest/aca_test_mq.cpp b/test/gtest/aca_test_mq.cpp index f5946d94..c2ce57a2 100644 --- a/test/gtest/aca_test_mq.cpp +++ b/test/gtest/aca_test_mq.cpp @@ -77,37 +77,27 @@ static string mq_test_topic = "Host-ts-1"; static string mq_subscription = "test_subscription"; static string mq_key="9192a4d4-ffff-4ece-b3f0-8d36e3d88001"; // 3dda2801-d675-4688-a63f-dcda8d327f50 9192a4d4-ffff-4ece-b3f0-8d36e3d88001 static int mq_hash=49775; // 21485 49775 +static string mq_update_topics[6] = + {"update-topic","update-topic2","update-topic3","update-topic4","update-topic5","update-topic6"}; // // Test suite: pulsar_test_cases -// +// This test suite contains three tests: +// 1. basic pulsar consumer and producer test. +// 2. pulsar consumer multi-subscribe test. +// 3. pulsar consumer resubscribe test. // Note: it requires a pulsar setup on localhost therefore this test is DISABLED by default. // You will need three terminals: // Terminal(1): run pulsar standalone. // Terminal(2): run pulsar consumer test case. -// Terminal(3): run pulsar producer test cases. - - -// This case tests the pulsar consumer implementation. -// First run this case by executing: -// ./aca_tests --gtest_also_run_disabled_tests --gtest_filter=*DISABLED_pulsar_consumer_test -// Then run the following producer test cases. - -TEST(pulsar_test_cases, DISABLED_pulsar_consumer_test) -{ - bool previous_demo_mode = g_demo_mode; - g_demo_mode = true; +// Terminal(3): run pulsar producer test case. - aca_test_reset_environment(); - ACA_Message_Pulsar_Consumer consumer(mq_test_topic, mq_broker_ip, mq_subscription); - consumer.multicastConsumerDispatched(); - pause(); +// 1. Basic pulsar consumer and producer test. +// Ensure you launched the pulsar then executing: +// sudo ./aca_tests --gtest_also_run_disabled_tests --gtest_filter=*DISABLED_pulsar_unicast_consumer_test +// sudo ./aca_tests --gtest_also_run_disabled_tests --gtest_filter=*DISABLED_pulsar_producer_test - g_demo_mode = previous_demo_mode; -} - -// sudo ./aca_tests --gtest_also_run_disabled_tests --gtest_filter=*DISABLED_pulsar_unicast_consumer_test TEST(pulsar_test_cases, DISABLED_pulsar_unicast_consumer_test) { string cmd_string; @@ -118,18 +108,14 @@ TEST(pulsar_test_cases, DISABLED_pulsar_unicast_consumer_test) aca_test_reset_environment(); - ACA_Message_Pulsar_Consumer consumer(mq_test_topic, mq_broker_ip, mq_subscription); - consumer.unicastConsumerDispatched(mq_hash); + ACA_Message_Pulsar_Consumer::get_instance().init(mq_test_topic, mq_broker_ip, mq_subscription); + ACA_Message_Pulsar_Consumer::get_instance().unicastConsumerDispatched(mq_hash); pause(); g_demo_mode = previous_demo_mode; } - -// This case tests the pulsar producer implementation and publishes a GoalState to the subscribed topic. -// First run pulsar_consumer_test then execute -// sudo ./aca_tests --gtest_also_run_disabled_tests --gtest_filter=*DISABLED_pulsar_hash_producer_test -TEST(pulsar_test_cases, DISABLED_pulsar_hash_producer_test) +TEST(pulsar_test_cases, DISABLED_pulsar_producer_test) { int retcode = 0; int overall_rc=0; @@ -139,35 +125,30 @@ TEST(pulsar_test_cases, DISABLED_pulsar_hash_producer_test) string GoalStateString; unsigned char serializedGoalState[length]; - GoalState GoalState_builder; - PortState *new_port_states = GoalState_builder.add_port_states(); - SubnetState *new_subnet_states = GoalState_builder.add_subnet_states(); - - ACA_OVS_L2_Programmer::get_instance().execute_ovsdb_command( - "del-br br-int", not_care_culminative_time, overall_rc); + GoalStateV2 GoalState_builder; + PortState new_port_states; + SubnetState new_subnet_states; - ACA_OVS_L2_Programmer::get_instance().execute_ovsdb_command( - "del-br br-tun", not_care_culminative_time, overall_rc); + aca_test_reset_environment(); - overall_rc = ACA_OVS_L2_Programmer::get_instance().setup_ovs_bridges_if_need(); - ASSERT_EQ(overall_rc, EXIT_SUCCESS); - overall_rc = EXIT_SUCCESS; + aca_test_create_default_port_state(&new_port_states); + auto &port_states_map = *GoalState_builder.mutable_port_states(); + port_states_map[port_id_1] = new_port_states; - // fill in port state structs - aca_test_create_default_port_state(new_port_states); + aca_test_create_default_subnet_state(&new_subnet_states); + auto &subnet_states_map = *GoalState_builder.mutable_subnet_states(); + subnet_states_map[subnet_id_1] = new_subnet_states; - // fill in subnet state structs - aca_test_create_default_subnet_state(new_subnet_states); if(GoalState_builder.SerializeToString(&GoalStateString)){ - ACA_LOG_INFO("%s","Successfully covert GoalState to message\n"); + ACA_LOG_INFO("%s","Successfully covert GoalStateV2 to message\n"); } ACA_Message_Pulsar_Producer producer(mq_broker_ip, mq_test_topic); retcode = producer.publish(GoalStateString,mq_key); EXPECT_EQ(retcode, EXIT_SUCCESS); - ACA_LOG_INFO("%s","Waiting for GoalState update.\n"); + ACA_LOG_INFO("%s","Waiting for GoalStateV2 update.\n"); sleep(1); ACA_OVS_L2_Programmer::get_instance().execute_ovsdb_command( @@ -177,19 +158,42 @@ TEST(pulsar_test_cases, DISABLED_pulsar_hash_producer_test) } -// This case tests the pulsar producer implementation and publishes a GoalStateV2 to the subscribed topic. -// First run pulsar_consumer_test then execute -// sudo ./aca_tests --gtest_also_run_disabled_tests --gtest_filter=*DISABLED_pulsar_producer_testv2 +// 2. Pulsar consumer multi-subscribe test. +// Ensure you launched the pulsar then executing: +// sudo ./aca_tests --gtest_also_run_disabled_tests --gtest_filter=*DISABLED_pulsar_unicast_consumer_multisubscribe_test +// sudo ./aca_tests --gtest_also_run_disabled_tests --gtest_filter=*DISABLED_pulsar_producer_multisubscribe_test -TEST(pulsar_test_cases, DISABLED_pulsar_producer_testv2) +TEST(pulsar_test_cases, DISABLED_pulsar_unicast_consumer_multisubscribe_test) { - int retcode=0; + string cmd_string; + + bool previous_demo_mode = g_demo_mode; + g_demo_mode = true; + + aca_test_reset_environment(); + + ACA_Message_Pulsar_Consumer consumer= + ACA_Message_Pulsar_Consumer::get_instance(); + consumer.init(mq_test_topic, mq_broker_ip, mq_subscription); + consumer.unicastConsumerDispatched(mq_hash); + for(int i = 0; i < size(mq_update_topics); i++){ + consumer.unicastResubscribe(false, mq_update_topics[i], to_string(mq_hash)); + } + ACA_LOG_INFO("Current subscribe topics are: %s\n",consumer.getUnicastTopicName().c_str()); + pause(); + + g_demo_mode = previous_demo_mode; +} + +TEST(pulsar_test_cases, DISABLED_pulsar_producer_multisubscribe_test) +{ + int retcode = 0; int overall_rc=0; + int length=1000; ulong not_care_culminative_time; string cmd_string; string GoalStateString; - - aca_test_reset_environment(); + unsigned char serializedGoalState[length]; GoalStateV2 GoalState_builder; PortState new_port_states; @@ -203,20 +207,47 @@ TEST(pulsar_test_cases, DISABLED_pulsar_producer_testv2) auto &subnet_states_map = *GoalState_builder.mutable_subnet_states(); subnet_states_map[subnet_id_1] = new_subnet_states; - if(GoalState_builder.SerializeToString(&GoalStateString)){ - ACA_LOG_INFO("%s","Successfully covert GoalStateV2 to message\n"); + for(int i=0; i< size(mq_update_topics); i++){ + aca_test_reset_environment(); + + if(GoalState_builder.SerializeToString(&GoalStateString)){ + ACA_LOG_INFO("%s","Successfully covert GoalStateV2 to message\n"); + } + + ACA_Message_Pulsar_Producer producer(mq_broker_ip, mq_update_topics[i]); + retcode = producer.publish(GoalStateString,mq_key); + EXPECT_EQ(retcode, EXIT_SUCCESS); + + ACA_LOG_INFO("%s","Waiting for GoalStateV2 update.\n"); + sleep(1); + + ACA_OVS_L2_Programmer::get_instance().execute_ovsdb_command( + "get Interface " + port_name_1 + " ofport", not_care_culminative_time, overall_rc); + EXPECT_EQ(overall_rc, EXIT_SUCCESS); + overall_rc = EXIT_SUCCESS; } +} - ACA_Message_Pulsar_Producer producer(mq_broker_ip, mq_test_topic); - retcode = producer.publish(GoalStateString); - EXPECT_EQ(retcode, EXIT_SUCCESS); +// 3. Pulsar consumer resubscribe test. +// Ensure you launched the pulsar then executing: +// sudo ./aca_tests --gtest_also_run_disabled_tests --gtest_filter=*DISABLED_pulsar_unicast_consumer_resubscribe_test - ACA_LOG_INFO("%s","Waiting for GoalStateV2 update.\n"); - sleep(2); +TEST(pulsar_test_cases, DISABLED_pulsar_unicast_consumer_resubscribe_test) +{ + string cmd_string; + string mq_update_topic="update topic"; + bool previous_demo_mode = g_demo_mode; + g_demo_mode = true; - ACA_OVS_L2_Programmer::get_instance().execute_ovsdb_command( - "get Interface " + port_name_1 + " ofport", not_care_culminative_time, overall_rc); - EXPECT_EQ(overall_rc, EXIT_SUCCESS); - overall_rc = EXIT_SUCCESS; + aca_test_reset_environment(); + ACA_Message_Pulsar_Consumer consumer= + ACA_Message_Pulsar_Consumer::get_instance(); + consumer.init(mq_update_topic, mq_broker_ip, mq_subscription); + consumer.unicastConsumerDispatched(mq_hash); + consumer.unicastResubscribe(true); + consumer.unicastResubscribe(false,mq_test_topic, to_string(mq_hash)); + pause(); + + g_demo_mode = previous_demo_mode; } \ No newline at end of file