From 2c28c330f4d3dfff96bbbcbaa11fd75e8de5dab1 Mon Sep 17 00:00:00 2001 From: subject-name-here Date: Sat, 16 Feb 2019 12:02:08 +0300 Subject: [PATCH 01/15] added protocol, modified gitignore --- .gitignore | 1 + tcp/server/.gitkeep | 0 tcp/server/include/request.hpp | 17 +++++++++++++++++ tcp/server/include/response.hpp | 15 +++++++++++++++ 4 files changed, 33 insertions(+) delete mode 100644 tcp/server/.gitkeep create mode 100644 tcp/server/include/request.hpp create mode 100644 tcp/server/include/response.hpp diff --git a/.gitignore b/.gitignore index f2e1fa5..7233182 100644 --- a/.gitignore +++ b/.gitignore @@ -45,6 +45,7 @@ cmake_install.cmake install_manifest.txt compile_commands.json CTestTestfile.cmake +cmake-build-debug/ build ### LaTeX ### diff --git a/tcp/server/.gitkeep b/tcp/server/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/tcp/server/include/request.hpp b/tcp/server/include/request.hpp new file mode 100644 index 0000000..32e2559 --- /dev/null +++ b/tcp/server/include/request.hpp @@ -0,0 +1,17 @@ +#pragma once + +struct dctp_request_header { + uint8_t type; + uint32_t id; + int64_t first_operand; + int64_t second_operand; +}; + +enum request_type { + PLUS = 1, + MINUS, + MULT, + DIV, + SQRT, + FACT +}; \ No newline at end of file diff --git a/tcp/server/include/response.hpp b/tcp/server/include/response.hpp new file mode 100644 index 0000000..245529b --- /dev/null +++ b/tcp/server/include/response.hpp @@ -0,0 +1,15 @@ +#pragma once + +struct dctp_response_header { + uint8_t return_code; + uint32_t id; + int64_t result; +}; + +enum return_code { + OK = 0, + OVERFLOW, + DIV_BY_ZERO, + FACT_OF_NEGATIVE, + SQRT_OF_NEGATIVE +}; \ No newline at end of file From 73f07aea4e59d6a7da7c7031dc73a71e9687a644 Mon Sep 17 00:00:00 2001 From: subject-name-here Date: Sat, 16 Feb 2019 12:02:32 +0300 Subject: [PATCH 02/15] added main --- tcp/server/CMakeLists.txt | 11 +++++++++ tcp/server/src/main.cpp | 52 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 tcp/server/CMakeLists.txt create mode 100644 tcp/server/src/main.cpp diff --git a/tcp/server/CMakeLists.txt b/tcp/server/CMakeLists.txt new file mode 100644 index 0000000..728e659 --- /dev/null +++ b/tcp/server/CMakeLists.txt @@ -0,0 +1,11 @@ +cmake_minimum_required(VERSION 3.13) +project(server) + +set(CMAKE_CXX_STANDARD 14) + +set(SOURCE_FILES + src/main.cpp include/response.hpp include/request.hpp) + +include_directories(include) + +add_executable(server ${SOURCE_FILES}) diff --git a/tcp/server/src/main.cpp b/tcp/server/src/main.cpp new file mode 100644 index 0000000..93142d5 --- /dev/null +++ b/tcp/server/src/main.cpp @@ -0,0 +1,52 @@ +#include + +#include + +#include +#include +#include + +#include + +int main(int argc, char **argv) { + int socket_descriptor = socket(AF_INET, SOCK_STREAM, 0); + + if (socket_descriptor < 0) { + perror("ERROR opening socket"); + exit(1); + } + + struct sockaddr_in server_addr{}, client_addr{}; + socklen_t client_size = sizeof(client_addr); + + bzero((char *) &server_addr, sizeof(server_addr)); + + uint16_t port_number = 22229; + if (argc > 1) { + port_number = static_cast(atoi(argv[1])); // NOLINT(cert-err34-c) + } + + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = INADDR_ANY; + server_addr.sin_port = htons(port_number); + + if (bind(socket_descriptor, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) { + perror("ERROR on binding"); + exit(1); + } + + listen(socket_descriptor, 5); + + // something meaningless + bool isTerminated = false; + while (!isTerminated) { + int new_socket_descriptor = accept(socket_descriptor, (struct sockaddr *) &client_addr, &client_size); + if (new_socket_descriptor < 0) { + perror("ERROR on accept"); + exit(1); + } + isTerminated = true; + } + + return 0; +} \ No newline at end of file From 561de82fe860f02db540100e1cf37272fb6289f2 Mon Sep 17 00:00:00 2001 From: subject-name-here Date: Sat, 16 Feb 2019 14:53:26 +0300 Subject: [PATCH 03/15] fixed response --- tcp/server/include/response.hpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tcp/server/include/response.hpp b/tcp/server/include/response.hpp index 245529b..ca7205b 100644 --- a/tcp/server/include/response.hpp +++ b/tcp/server/include/response.hpp @@ -1,7 +1,10 @@ #pragma once +#include + struct dctp_response_header { uint8_t return_code; + uint8_t operation_type; uint32_t id; int64_t result; }; @@ -12,4 +15,9 @@ enum return_code { DIV_BY_ZERO, FACT_OF_NEGATIVE, SQRT_OF_NEGATIVE +}; + +enum operation_type { + FAST = 1, + SLOW }; \ No newline at end of file From b9fc7435344e8cc36cef32a2596b7d06b0e2cbf8 Mon Sep 17 00:00:00 2001 From: subject-name-here Date: Sat, 16 Feb 2019 15:26:43 +0300 Subject: [PATCH 04/15] modified response --- tcp/server/include/response.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tcp/server/include/response.hpp b/tcp/server/include/response.hpp index ca7205b..3355580 100644 --- a/tcp/server/include/response.hpp +++ b/tcp/server/include/response.hpp @@ -11,10 +11,11 @@ struct dctp_response_header { enum return_code { OK = 0, + WAIT_FOR_RESULT, OVERFLOW, DIV_BY_ZERO, FACT_OF_NEGATIVE, - SQRT_OF_NEGATIVE + SQRT_OF_NEGATIVE, }; enum operation_type { From 0b776bdc5fe4c280880efa565324c482d6bcb7c2 Mon Sep 17 00:00:00 2001 From: karvozavr Date: Sat, 16 Feb 2019 16:30:05 +0300 Subject: [PATCH 05/15] Add client --- tcp/client/CMakeLists.txt | 17 ++++ tcp/client/include/CalcuatorServerDriver.cpp | 75 +++++++++++++++++ tcp/client/include/CalcuatorServerDriver.hpp | 50 ++++++++++++ tcp/client/include/CalculatorApp.cpp | 85 ++++++++++++++++++++ tcp/client/include/CalculatorApp.hpp | 31 +++++++ tcp/client/include/ConcurrentQueue.hpp | 33 ++++++++ tcp/client/include/requests.hpp | 45 +++++++++++ tcp/client/include/socketUtils.hpp | 36 +++++++++ tcp/client/src/main.cpp | 11 +++ tcp/client/src/requests.cpp | 18 +++++ tcp/client/src/socketUtils.cpp | 80 ++++++++++++++++++ 11 files changed, 481 insertions(+) create mode 100644 tcp/client/CMakeLists.txt create mode 100644 tcp/client/include/CalcuatorServerDriver.cpp create mode 100644 tcp/client/include/CalcuatorServerDriver.hpp create mode 100644 tcp/client/include/CalculatorApp.cpp create mode 100644 tcp/client/include/CalculatorApp.hpp create mode 100644 tcp/client/include/ConcurrentQueue.hpp create mode 100644 tcp/client/include/requests.hpp create mode 100644 tcp/client/include/socketUtils.hpp create mode 100644 tcp/client/src/main.cpp create mode 100644 tcp/client/src/requests.cpp create mode 100644 tcp/client/src/socketUtils.cpp diff --git a/tcp/client/CMakeLists.txt b/tcp/client/CMakeLists.txt new file mode 100644 index 0000000..965d762 --- /dev/null +++ b/tcp/client/CMakeLists.txt @@ -0,0 +1,17 @@ +cmake_minimum_required(VERSION 2.8) +project(calculator) +set(CMAKE_CXX_STANDARD 17) + +file(GLOB SOURCE_FILES "src/*.cpp") + +add_executable(calculator ${SOURCE_FILES} include/CalculatorApp.cpp include/CalculatorApp.hpp include/CalcuatorServerDriver.cpp include/CalcuatorServerDriver.hpp src/main.cpp include/requests.hpp include/ConcurrentQueue.hpp src/requests.cpp) + +target_include_directories(calculator PRIVATE include) + +find_package(Threads REQUIRED) +if(THREADS_HAVE_PTHREAD_ARG) + target_compile_options(calculator PUBLIC "-pthread") +endif() +if(CMAKE_THREAD_LIBS_INIT) + target_link_libraries(calculator "${CMAKE_THREAD_LIBS_INIT}") +endif() diff --git a/tcp/client/include/CalcuatorServerDriver.cpp b/tcp/client/include/CalcuatorServerDriver.cpp new file mode 100644 index 0000000..2513718 --- /dev/null +++ b/tcp/client/include/CalcuatorServerDriver.cpp @@ -0,0 +1,75 @@ +// +// Created by karvozavr on 16/02/19. +// + +#include "CalcuatorServerDriver.hpp" + +CalcuatorServerDriver::~CalcuatorServerDriver() { + shutdown(m_socket, SHUT_RDWR); + close(m_socket); +} + +void CalcuatorServerDriver::connect() { + m_socket = connectToServer(m_host, m_port); + m_readingThread = std::thread(&CalcuatorServerDriver::readingThreadTask, this); +} + +void CalcuatorServerDriver::readingThreadTask() { + while (true) { + auto response = readObject(m_socket); + if (response.errorCode != WAIT_FOR_RESULT) { + if (response.operationType == SLOW) { + m_longResults.push(response); + } else if (response.operationType == FAST) { + m_instantResults.push(response); + } + } + } +} + +bool CalcuatorServerDriver::hasResult() { + return !m_longResults.empty(); +} + +CalculatorResponse CalcuatorServerDriver::getResult() { + return m_longResults.pop(); +} + +void CalcuatorServerDriver::factorial(uint32_t id, int64_t arg) { + sendRequest({FACT, id, arg, 0}); +} + +void CalcuatorServerDriver::sqrt(uint32_t id, int64_t arg) { + sendRequest({SQRT, id, arg, 0}); +} + +CalculatorResponse CalcuatorServerDriver::plus(uint32_t id, int64_t arg1, int64_t arg2) { + sendRequest({PLUS, id, arg1, arg2}); + return getResponse(); +} + +CalculatorResponse CalcuatorServerDriver::minus(uint32_t id, int64_t arg1, int64_t arg2) { + sendRequest({MINUS, id, arg1, arg2}); + return getResponse(); +} + +CalculatorResponse CalcuatorServerDriver::multiply(uint32_t id, int64_t arg1, int64_t arg2) { + sendRequest({MULT, id, arg1, arg2}); + return getResponse(); +} + +CalculatorResponse CalcuatorServerDriver::divide(uint32_t id, int64_t arg1, int64_t arg2) { + sendRequest({DIV, id, arg1, arg2}); + return getResponse(); +} + +void CalcuatorServerDriver::sendRequest(CalculatorRequest const &request) { + writeObject(m_socket, request); +} + +CalculatorResponse CalcuatorServerDriver::getResponse() { + while (m_instantResults.empty()) {} + return m_instantResults.pop(); +} + + diff --git a/tcp/client/include/CalcuatorServerDriver.hpp b/tcp/client/include/CalcuatorServerDriver.hpp new file mode 100644 index 0000000..a720266 --- /dev/null +++ b/tcp/client/include/CalcuatorServerDriver.hpp @@ -0,0 +1,50 @@ +#include + +#pragma once + +#include +#include +#include +#include +#include "requests.hpp" +#include "socketUtils.hpp" +#include "ConcurrentQueue.hpp" + +class CalcuatorServerDriver { +public: + CalcuatorServerDriver(std::string host, uint16_t port) : m_host(std::move(host)), m_port(port) {} + + ~CalcuatorServerDriver(); + + void connect(); + + bool hasResult(); + + CalculatorResponse getResult(); + + void factorial(uint32_t id, int64_t arg); + + void sqrt(uint32_t id, int64_t arg); + + CalculatorResponse plus(uint32_t id, int64_t arg1, int64_t arg2); + + CalculatorResponse minus(uint32_t id, int64_t arg1, int64_t arg2); + + CalculatorResponse multiply(uint32_t id, int64_t arg1, int64_t arg2); + + CalculatorResponse divide(uint32_t id, int64_t arg1, int64_t arg2); + +private: + void sendRequest(CalculatorRequest const &request); + + CalculatorResponse getResponse(); + + void readingThreadTask(); + + ConcurrentQueue m_longResults; + ConcurrentQueue m_instantResults; + int m_socket = 0; + std::string m_host; + uint16_t m_port; + std::thread m_readingThread; +}; diff --git a/tcp/client/include/CalculatorApp.cpp b/tcp/client/include/CalculatorApp.cpp new file mode 100644 index 0000000..8d4d77f --- /dev/null +++ b/tcp/client/include/CalculatorApp.cpp @@ -0,0 +1,85 @@ +#include "CalculatorApp.hpp" + +void CalculatorApp::start() { + m_driver.connect(); + + printEntryMessage(); + + std::string line; + printPrompt(m_currentComputation); + + while (std::getline(std::cin, line)) { + processInput(line); + ++m_currentComputation; + printPrompt(m_currentComputation); + } +} + +void CalculatorApp::printPrompt(uint32_t computationId) { + std::cout << "In [" << computationId << "]: "; +} + +void CalculatorApp::processInput(std::string &line) { + std::istringstream iss(line); + std::vector results((std::istream_iterator(iss)), + std::istream_iterator()); + + if (results.size() == 2 || results.size() == 3 || results.empty()) { + if (results.size() == 2) { + if (results[0] == "fact") { + m_driver.factorial(m_currentComputation, static_cast(std::stoull(results[1]))); + } else if (results[0] == "sqrt") { + m_driver.sqrt(m_currentComputation, static_cast(std::stoull(results[1]))); + } + } + if (results.size() == 3 && results[0].length() == 1) { + switch (results[0][0]) { + case '+': + m_driver.plus(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + case '-': + m_driver.minus(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + case '*': + m_driver.multiply(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + case '/': + m_driver.divide(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + default: + std::cout << "Syntax error." << std::endl; + break; + } + } + } else { + std::cout << "Syntax error." << std::endl; + } + + printLine(); + + while (m_driver.hasResult()) { + auto result = m_driver.getResult(); + if (result.errorCode != 0) { + printResult(result.computationId, errorCodeToString(result.errorCode)); + } else { + printResult(result.computationId, result.result); + } + printLine(); + } +} + +void CalculatorApp::printEntryMessage() { + std::cout << "Welcome!\n\tOnline calculator 1.0\n\n\n\n\tSupported operations: " << std::endl; +} + +void CalculatorApp::printLine(const std::string &line) { + std::cout << line << std::endl; +} diff --git a/tcp/client/include/CalculatorApp.hpp b/tcp/client/include/CalculatorApp.hpp new file mode 100644 index 0000000..2d0fbcd --- /dev/null +++ b/tcp/client/include/CalculatorApp.hpp @@ -0,0 +1,31 @@ +#pragma once + +#include +#include +#include +#include "CalcuatorServerDriver.hpp" +#include "socketUtils.hpp" + +class CalculatorApp { +public: + CalculatorApp(std::string const &host, uint16_t port) : m_driver(host, port) {} + + void start(); + +private: + void printPrompt(uint32_t computationId); + + void processInput(std::string &line); + + template + void printResult(uint32_t id, T const &value) { + std::cout << "Out [" << id << "]: " << value << std::endl; + } + + void printEntryMessage(); + + void printLine(std::string const &line = ""); + + uint32_t m_currentComputation = 0; + CalcuatorServerDriver m_driver; +}; \ No newline at end of file diff --git a/tcp/client/include/ConcurrentQueue.hpp b/tcp/client/include/ConcurrentQueue.hpp new file mode 100644 index 0000000..0f4c886 --- /dev/null +++ b/tcp/client/include/ConcurrentQueue.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include +#include + +template +class ConcurrentQueue { +public: + void push(T const &response) { + m_queueMutex.lock(); + m_resultsQueue.push(response); + m_queueMutex.unlock(); + } + + T pop() { + m_queueMutex.lock(); + auto result = m_resultsQueue.front(); + m_resultsQueue.pop(); + m_queueMutex.unlock(); + return result; + } + + bool empty() { + m_queueMutex.lock(); + bool isEmpty = m_resultsQueue.empty(); + m_queueMutex.unlock(); + return !isEmpty; + } + +private: + std::mutex m_queueMutex; + std::queue m_resultsQueue; +}; diff --git a/tcp/client/include/requests.hpp b/tcp/client/include/requests.hpp new file mode 100644 index 0000000..bc3168e --- /dev/null +++ b/tcp/client/include/requests.hpp @@ -0,0 +1,45 @@ +#pragma once + +#include +#include + +#pragma pack(0) +struct CalculatorResponse { + uint8_t errorCode; + uint8_t operationType; + uint32_t computationId; + int64_t result; +}; + +enum OperationType { + FAST = 1, + SLOW +}; + +enum ErrorCode { + OK = 0, + WAIT_FOR_RESULT, + OVERFLOW, + DIV_BY_ZERO, + FACT_OF_NEGATIVE, + SQRT_OF_NEGATIVE, +}; + +std::string errorCodeToString(uint8_t code); + +#pragma pack(0) +struct CalculatorRequest { + uint8_t type; + uint32_t computationId; + int64_t firstOperand; + int64_t secondOperand; +}; + +enum RequestType { + PLUS = 1, + MINUS, + MULT, + DIV, + SQRT, + FACT +}; diff --git a/tcp/client/include/socketUtils.hpp b/tcp/client/include/socketUtils.hpp new file mode 100644 index 0000000..03ed503 --- /dev/null +++ b/tcp/client/include/socketUtils.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include + +void writeToSocket(int socketDescriptor, const void *buffer, size_t size); + +void readFromSocket(int socketDescriptor, uint8_t *buffer, size_t size); + +void printError(const std::string &s); + +void error(const std::string &s); + +void error(const std::string &type, const std::string &s); + +int connectToServer(std::string const &hostname, uint16_t port); + +template +void writeObject(int socketDescriptor, const T &object) { + writeToSocket(socketDescriptor, (uint8_t const *) &object, sizeof(T)); +} + +template +T readObject(int socketDescriptor) { + T object; + readFromSocket(socketDescriptor, (uint8_t *) &object, sizeof(T)); + + return object; +} + +std::string read_until_zero(int *ptr, char *buffer, size_t buffer_size); diff --git a/tcp/client/src/main.cpp b/tcp/client/src/main.cpp new file mode 100644 index 0000000..ccc4cf7 --- /dev/null +++ b/tcp/client/src/main.cpp @@ -0,0 +1,11 @@ +#include +#include + +int main(int argc, char *argv[]) { + if (argc != 3) { + error("Invalid argumets", "Usage: calculator "); + } + + CalculatorApp app(std::string(argv[1]), static_cast(atoi(argv[2]))); + app.start(); +} \ No newline at end of file diff --git a/tcp/client/src/requests.cpp b/tcp/client/src/requests.cpp new file mode 100644 index 0000000..defabe9 --- /dev/null +++ b/tcp/client/src/requests.cpp @@ -0,0 +1,18 @@ +#include "requests.hpp" + +std::string errorCodeToString(uint8_t code) { + switch (code) { + case OK: + return "OK"; + case OVERFLOW: + return "Integer overflow"; + case DIV_BY_ZERO: + return "Zero division error"; + case FACT_OF_NEGATIVE: + return "Factorial of negative number"; + case SQRT_OF_NEGATIVE: + return "Sqrt of negative number"; + default: + return "Unknown error"; + } +} \ No newline at end of file diff --git a/tcp/client/src/socketUtils.cpp b/tcp/client/src/socketUtils.cpp new file mode 100644 index 0000000..921cfe4 --- /dev/null +++ b/tcp/client/src/socketUtils.cpp @@ -0,0 +1,80 @@ +#include "socketUtils.hpp" + +void writeToSocket(int socket_descriptor, const void *buf, size_t size) { + ssize_t n = write(socket_descriptor, buf, size); + if (n <= 0) { + error("write", "failed to write to socket!"); + } + + if (n != size) { + error("write", "unexpected end of socket! (Probably server disconnected)"); + exit(0); + } +} + +void readFromSocket(int socket_descriptor, uint8_t *buf, size_t size) { + ssize_t n; + while((n = read(socket_descriptor, buf, size)) < size) { + if (n <= 0) { + error("read", "failed to read from socket!"); + exit(0); + } + + buf += n; + size -= n; + } +} + +void printError(const std::string &s) { + std::cerr << s << std::endl; +} + +void error(const std::string &s) { + printError("Error: " + s); + exit(0); +} + +void error(const std::string &type, const std::string &s) { + printError("Error " + type + ": " + s); + exit(0); +} + +std::string read_until_zero(int* ptr, char* buffer, size_t buffer_size) { + std::string dest; + while (*ptr < buffer_size && buffer[*ptr] != '\0') { + dest += buffer[*ptr]; + (*ptr)++; + } + (*ptr)++; + + return dest; +} + +int connectToServer(std::string const &hostname, uint16_t port) { + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + struct sockaddr_in serv_addr{}; + struct hostent *server; + + if (sockfd < 0) { + error("Error opening socket"); + } + + server = gethostbyname(hostname.c_str()); + + if (server == nullptr) { + error("Connection", "Error, no such host"); + } + + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + bcopy(server->h_addr, (char *) &serv_addr.sin_addr.s_addr, (size_t) server->h_length); + serv_addr.sin_port = htons(port); + + /* Now connect to the server */ + if (connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + perror("Error connecting"); + exit(1); + } + + return sockfd; +} From 5a806dcfc4293dbd04c6664863cd92a4c44ab83b Mon Sep 17 00:00:00 2001 From: subject-name-here Date: Sat, 16 Feb 2019 16:44:18 +0300 Subject: [PATCH 06/15] added undone server --- tcp/server/CMakeLists.txt | 10 ++- tcp/server/include/request.hpp | 3 + tcp/server/include/response.hpp | 1 + tcp/server/include/server.h | 31 ++++++++ tcp/server/include/util.hpp | 2 + tcp/server/src/main.cpp | 43 +--------- tcp/server/src/server.cpp | 135 ++++++++++++++++++++++++++++++++ 7 files changed, 184 insertions(+), 41 deletions(-) create mode 100644 tcp/server/include/server.h create mode 100644 tcp/server/include/util.hpp create mode 100644 tcp/server/src/server.cpp diff --git a/tcp/server/CMakeLists.txt b/tcp/server/CMakeLists.txt index 728e659..661409b 100644 --- a/tcp/server/CMakeLists.txt +++ b/tcp/server/CMakeLists.txt @@ -4,8 +4,16 @@ project(server) set(CMAKE_CXX_STANDARD 14) set(SOURCE_FILES - src/main.cpp include/response.hpp include/request.hpp) + src/main.cpp include/response.hpp include/request.hpp src/server.cpp include/server.h) include_directories(include) add_executable(server ${SOURCE_FILES}) + +find_package(Threads REQUIRED) +if(THREADS_HAVE_PTHREAD_ARG) + target_compile_options(server PUBLIC "-pthread") +endif() +if(CMAKE_THREAD_LIBS_INIT) + target_link_libraries(server "${CMAKE_THREAD_LIBS_INIT}") +endif() \ No newline at end of file diff --git a/tcp/server/include/request.hpp b/tcp/server/include/request.hpp index 32e2559..7a8ca93 100644 --- a/tcp/server/include/request.hpp +++ b/tcp/server/include/request.hpp @@ -1,5 +1,8 @@ #pragma once +#include + +#pragma pack(0) struct dctp_request_header { uint8_t type; uint32_t id; diff --git a/tcp/server/include/response.hpp b/tcp/server/include/response.hpp index 3355580..bdf3db6 100644 --- a/tcp/server/include/response.hpp +++ b/tcp/server/include/response.hpp @@ -2,6 +2,7 @@ #include +#pragma pack(0) struct dctp_response_header { uint8_t return_code; uint8_t operation_type; diff --git a/tcp/server/include/server.h b/tcp/server/include/server.h new file mode 100644 index 0000000..24acc4f --- /dev/null +++ b/tcp/server/include/server.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include +#include +#include + +class server { +public: + explicit server(uint16_t port); + + ~server(); + + void start(); + + void client_handler(int socket_descriptor); + + void process_sqrt(int socket_descriptor, struct dctp_request_header request); + + void process_fact(int socket_descriptor, struct dctp_request_header request); +private: + uint16_t port_number; + int socket_descriptor; + sockaddr_in server_addr{}; + sockaddr_in client_addr{}; + bool isTerminated = false; + + std::vector socket_pool{}; + std::vector slow_ops_pool{}; +}; + diff --git a/tcp/server/include/util.hpp b/tcp/server/include/util.hpp new file mode 100644 index 0000000..3f59c93 --- /dev/null +++ b/tcp/server/include/util.hpp @@ -0,0 +1,2 @@ +#pragma once + diff --git a/tcp/server/src/main.cpp b/tcp/server/src/main.cpp index 93142d5..73643ce 100644 --- a/tcp/server/src/main.cpp +++ b/tcp/server/src/main.cpp @@ -1,52 +1,15 @@ #include -#include - -#include -#include -#include - -#include +#include "server.h" int main(int argc, char **argv) { - int socket_descriptor = socket(AF_INET, SOCK_STREAM, 0); - - if (socket_descriptor < 0) { - perror("ERROR opening socket"); - exit(1); - } - - struct sockaddr_in server_addr{}, client_addr{}; - socklen_t client_size = sizeof(client_addr); - - bzero((char *) &server_addr, sizeof(server_addr)); - uint16_t port_number = 22229; if (argc > 1) { port_number = static_cast(atoi(argv[1])); // NOLINT(cert-err34-c) } - server_addr.sin_family = AF_INET; - server_addr.sin_addr.s_addr = INADDR_ANY; - server_addr.sin_port = htons(port_number); - - if (bind(socket_descriptor, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) { - perror("ERROR on binding"); - exit(1); - } - - listen(socket_descriptor, 5); - - // something meaningless - bool isTerminated = false; - while (!isTerminated) { - int new_socket_descriptor = accept(socket_descriptor, (struct sockaddr *) &client_addr, &client_size); - if (new_socket_descriptor < 0) { - perror("ERROR on accept"); - exit(1); - } - isTerminated = true; - } + server s(port_number); + s.start(); return 0; } \ No newline at end of file diff --git a/tcp/server/src/server.cpp b/tcp/server/src/server.cpp new file mode 100644 index 0000000..0e87482 --- /dev/null +++ b/tcp/server/src/server.cpp @@ -0,0 +1,135 @@ +#include "server.h" + +#include + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + + +server::server(uint16_t port_number) : port_number(port_number) { + socket_descriptor = socket(AF_INET, SOCK_STREAM, 0); + + if (socket_descriptor < 0) { + perror("ERROR opening socket"); + exit(1); + } + + bzero((char *) &server_addr, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = INADDR_ANY; + server_addr.sin_port = htons(port_number); + + if (bind(socket_descriptor, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) { + perror("ERROR on binding"); + exit(1); + } +} + +server::~server() { + for (auto slow_operation : this->slow_ops_pool) { + slow_operation->join(); + } + + close(socket_descriptor); + + for (auto socket_thread : this->socket_pool) { + socket_thread->join(); + } +} + +void server::start() { + listen(socket_descriptor, 5); + + while (!this->isTerminated) { + socklen_t client_size = sizeof(client_addr); + int new_socket_descriptor = accept(socket_descriptor, (struct sockaddr *) &client_addr, &client_size); + if (new_socket_descriptor < 0) { + perror("ERROR on accept"); + exit(1); + } + + std::thread *thread = new std::thread(&server::client_handler, this, new_socket_descriptor); + socket_pool.push_back(thread); + } +} + +void server::client_handler(int socket_descriptor) { + while (true) { + struct dctp_request_header request{}; + ssize_t c = read(socket_descriptor, &request, sizeof(request)); + + if (c < 0) { + perror("ERROR reading from socket"); + exit(1); + } + + struct dctp_response_header response{}; + std::cout << "kek\n"; + std::cout << request.type << " " << request.id << request.first_operand << request.second_operand << "\n"; + int64_t result; + std::thread *thread; + switch (request.type) { + case PLUS: + result = request.first_operand + request.second_operand; + response = {OK, FAST, request.id, result}; + break; + case MINUS: + result = request.first_operand - request.second_operand; + response = {OK, FAST, request.id, result}; + break; + case MULT: + result = request.first_operand * request.second_operand; + response = {OK, FAST, request.id, result}; + break; + case DIV: + if (request.second_operand == 0) { + response = {DIV_BY_ZERO, FAST, request.id, 0}; + } else { + result = request.first_operand / request.second_operand; + response = {OK, FAST, request.id, result}; + } + break; + case FACT: + response = {WAIT_FOR_RESULT, SLOW, request.id, 0}; + thread = new std::thread(&server::process_fact, this, socket_descriptor, request); + slow_ops_pool.push_back(thread); + break; + case SQRT: + response = {WAIT_FOR_RESULT, SLOW, request.id, 0}; + thread = new std::thread(&server::process_sqrt, this, socket_descriptor, request); + slow_ops_pool.push_back(thread); + break; + default:break; + } + std::cout << response.return_code << " " << response.id << " " << response.result << "\n"; + + c = write(socket_descriptor, &response, sizeof(response)); + std::cout << c << std::endl; + std::cout << sizeof(response) << std::endl; + if (c < 0) { + perror("ERROR writing from socket"); + exit(1); + } + + } +} + +void server::process_sqrt(int socket_descriptor, struct dctp_request_header request) { + +} + +void server::process_fact(int socket_descriptor, struct dctp_request_header request) { + +} + + + From d34da548bc88c98e8cc9202f952c6e35472be532 Mon Sep 17 00:00:00 2001 From: subject-name-here Date: Sat, 16 Feb 2019 19:11:46 +0300 Subject: [PATCH 07/15] more update --- tcp/server/CMakeLists.txt | 2 +- tcp/server/include/request.hpp | 14 ++++- tcp/server/include/response.hpp | 15 ++++- tcp/server/include/server.h | 1 + tcp/server/include/util.hpp | 9 +++ tcp/server/src/server.cpp | 99 ++++++++++++++++++++++++++------- 6 files changed, 114 insertions(+), 26 deletions(-) diff --git a/tcp/server/CMakeLists.txt b/tcp/server/CMakeLists.txt index 661409b..1f56738 100644 --- a/tcp/server/CMakeLists.txt +++ b/tcp/server/CMakeLists.txt @@ -4,7 +4,7 @@ project(server) set(CMAKE_CXX_STANDARD 14) set(SOURCE_FILES - src/main.cpp include/response.hpp include/request.hpp src/server.cpp include/server.h) + src/main.cpp include/response.hpp include/request.hpp src/server.cpp include/server.h include/util.hpp) include_directories(include) diff --git a/tcp/server/include/request.hpp b/tcp/server/include/request.hpp index 7a8ca93..58f1945 100644 --- a/tcp/server/include/request.hpp +++ b/tcp/server/include/request.hpp @@ -2,7 +2,8 @@ #include -#pragma pack(0) +#pragma pack(push, 1) + struct dctp_request_header { uint8_t type; uint32_t id; @@ -10,6 +11,8 @@ struct dctp_request_header { int64_t second_operand; }; +#pragma pack(pop) + enum request_type { PLUS = 1, MINUS, @@ -17,4 +20,11 @@ enum request_type { DIV, SQRT, FACT -}; \ No newline at end of file +}; + +std::string request_to_string(struct dctp_request_header request) { + return "Id: " + std::to_string(request.id) + + "; type: " + std::to_string(request.type) + + "; first operand: " + std::to_string(request.first_operand) + + "; second operand: " + std::to_string(request.second_operand); +} \ No newline at end of file diff --git a/tcp/server/include/response.hpp b/tcp/server/include/response.hpp index bdf3db6..1f929ca 100644 --- a/tcp/server/include/response.hpp +++ b/tcp/server/include/response.hpp @@ -2,7 +2,8 @@ #include -#pragma pack(0) +#pragma pack(push, 1) + struct dctp_response_header { uint8_t return_code; uint8_t operation_type; @@ -10,6 +11,8 @@ struct dctp_response_header { int64_t result; }; +#pragma pack(pop) + enum return_code { OK = 0, WAIT_FOR_RESULT, @@ -17,9 +20,17 @@ enum return_code { DIV_BY_ZERO, FACT_OF_NEGATIVE, SQRT_OF_NEGATIVE, + UNKNOWN_OPERATION }; enum operation_type { FAST = 1, SLOW -}; \ No newline at end of file +}; + +std::string response_to_string(struct dctp_response_header response) { + return "Return code: " + std::to_string(response.return_code) + + "; type: " + std::to_string(response.operation_type) + + "; id: " + std::to_string(response.id) + + "; result: " + std::to_string(response.result); +} \ No newline at end of file diff --git a/tcp/server/include/server.h b/tcp/server/include/server.h index 24acc4f..2680b00 100644 --- a/tcp/server/include/server.h +++ b/tcp/server/include/server.h @@ -23,6 +23,7 @@ class server { int socket_descriptor; sockaddr_in server_addr{}; sockaddr_in client_addr{}; + bool isInitialized = false; bool isTerminated = false; std::vector socket_pool{}; diff --git a/tcp/server/include/util.hpp b/tcp/server/include/util.hpp index 3f59c93..67dcb6c 100644 --- a/tcp/server/include/util.hpp +++ b/tcp/server/include/util.hpp @@ -1,2 +1,11 @@ #pragma once +#include + +void log(const std::string &s) { + std::cout << s << std::endl; +} + +void log_error(const std::string &s) { + std::cerr << "Error: " << s << std::endl; +} \ No newline at end of file diff --git a/tcp/server/src/server.cpp b/tcp/server/src/server.cpp index 0e87482..ea6e739 100644 --- a/tcp/server/src/server.cpp +++ b/tcp/server/src/server.cpp @@ -8,19 +8,23 @@ #include +#include #include #include #include #include #include +#include + server::server(uint16_t port_number) : port_number(port_number) { - socket_descriptor = socket(AF_INET, SOCK_STREAM, 0); + log("Server: initialization begin."); + socket_descriptor = socket(AF_INET, SOCK_STREAM, 0); if (socket_descriptor < 0) { - perror("ERROR opening socket"); - exit(1); + log_error("can't open socket."); + return; } bzero((char *) &server_addr, sizeof(server_addr)); @@ -29,12 +33,17 @@ server::server(uint16_t port_number) : port_number(port_number) { server_addr.sin_port = htons(port_number); if (bind(socket_descriptor, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) { - perror("ERROR on binding"); - exit(1); + log_error("can't bind."); + return; } + isInitialized = true; + + log("Server: initialization success."); } server::~server() { + log("Server: destruction begin."); + for (auto slow_operation : this->slow_ops_pool) { slow_operation->join(); } @@ -44,37 +53,41 @@ server::~server() { for (auto socket_thread : this->socket_pool) { socket_thread->join(); } + + log("Server: destruction success."); } void server::start() { + if (!this->isInitialized) { + return; + } listen(socket_descriptor, 5); while (!this->isTerminated) { socklen_t client_size = sizeof(client_addr); int new_socket_descriptor = accept(socket_descriptor, (struct sockaddr *) &client_addr, &client_size); if (new_socket_descriptor < 0) { - perror("ERROR on accept"); - exit(1); + log_error("failed to accept"); + continue; } - + log("Server: new socket connected. " + std::to_string(new_socket_descriptor)); std::thread *thread = new std::thread(&server::client_handler, this, new_socket_descriptor); socket_pool.push_back(thread); } } void server::client_handler(int socket_descriptor) { - while (true) { + while (!this->isTerminated) { struct dctp_request_header request{}; ssize_t c = read(socket_descriptor, &request, sizeof(request)); - if (c < 0) { - perror("ERROR reading from socket"); - exit(1); + log_error("failed to read request."); + continue; } + log("Server: socket " + std::to_string(socket_descriptor) + " sent request: " + request_to_string(request)); + struct dctp_response_header response{}; - std::cout << "kek\n"; - std::cout << request.type << " " << request.id << request.first_operand << request.second_operand << "\n"; int64_t result; std::thread *thread; switch (request.type) { @@ -108,27 +121,71 @@ void server::client_handler(int socket_descriptor) { thread = new std::thread(&server::process_sqrt, this, socket_descriptor, request); slow_ops_pool.push_back(thread); break; - default:break; + default: + response = {UNKNOWN_OPERATION, FAST, request.id, 0}; + break; } - std::cout << response.return_code << " " << response.id << " " << response.result << "\n"; c = write(socket_descriptor, &response, sizeof(response)); - std::cout << c << std::endl; - std::cout << sizeof(response) << std::endl; + + log("Server: sent to socket " + + std::to_string(socket_descriptor) + + " response (size = " + std::to_string(c) + "): " + + response_to_string(response)); + if (c < 0) { - perror("ERROR writing from socket"); - exit(1); + log_error("can't write to socket"); } - } + + close(socket_descriptor); } void server::process_sqrt(int socket_descriptor, struct dctp_request_header request) { + sleep(2); + struct dctp_response_header response{}; + if (request.first_operand < 0) { + response = {SQRT_OF_NEGATIVE, SLOW, request.id, 0}; + } else { + auto result = static_cast(sqrt(request.first_operand)); + response = {OK, SLOW, request.id, result}; + } + + ssize_t c = write(socket_descriptor, &response, sizeof(response)); + + log("Server: sent to socket " + + std::to_string(socket_descriptor) + + " response (size = " + std::to_string(c) + "): " + + response_to_string(response)); + if (c < 0) { + log_error("can't write to socket"); + } } void server::process_fact(int socket_descriptor, struct dctp_request_header request) { + sleep(2); + struct dctp_response_header response{}; + if (request.first_operand < 0) { + response = {FACT_OF_NEGATIVE, SLOW, request.id, 0}; + } else { + auto result = 1; + for (int i = 1; i < fmin(request.first_operand, 20); i++) { + result *= i; + } + response = {OK, SLOW, request.id, result}; + } + ssize_t c = write(socket_descriptor, &response, sizeof(response)); + + log("Server: sent to socket " + + std::to_string(socket_descriptor) + + " response (size = " + std::to_string(c) + "): " + + response_to_string(response)); + + if (c < 0) { + log_error("can't write to socket"); + } } From 540df45d5dd52590b2ec2c13c7e379e2c87dd775 Mon Sep 17 00:00:00 2001 From: subject-name-here Date: Sat, 16 Feb 2019 20:06:13 +0300 Subject: [PATCH 08/15] something works --- tcp/server/include/response.hpp | 2 +- tcp/server/include/server.h | 3 +++ tcp/server/src/server.cpp | 27 +++++++++++++++++++++++---- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/tcp/server/include/response.hpp b/tcp/server/include/response.hpp index 1f929ca..94abe15 100644 --- a/tcp/server/include/response.hpp +++ b/tcp/server/include/response.hpp @@ -16,7 +16,7 @@ struct dctp_response_header { enum return_code { OK = 0, WAIT_FOR_RESULT, - OVERFLOW, + OVERFLOW, // not used DIV_BY_ZERO, FACT_OF_NEGATIVE, SQRT_OF_NEGATIVE, diff --git a/tcp/server/include/server.h b/tcp/server/include/server.h index 2680b00..796d5ea 100644 --- a/tcp/server/include/server.h +++ b/tcp/server/include/server.h @@ -4,6 +4,7 @@ #include #include #include +#include class server { public: @@ -26,7 +27,9 @@ class server { bool isInitialized = false; bool isTerminated = false; + std::mutex socket_pool_lock; std::vector socket_pool{}; + std::mutex slow_pool_lock; std::vector slow_ops_pool{}; }; diff --git a/tcp/server/src/server.cpp b/tcp/server/src/server.cpp index ea6e739..78269a4 100644 --- a/tcp/server/src/server.cpp +++ b/tcp/server/src/server.cpp @@ -72,7 +72,9 @@ void server::start() { } log("Server: new socket connected. " + std::to_string(new_socket_descriptor)); std::thread *thread = new std::thread(&server::client_handler, this, new_socket_descriptor); + socket_pool_lock.lock(); socket_pool.push_back(thread); + socket_pool_lock.unlock(); } } @@ -84,6 +86,10 @@ void server::client_handler(int socket_descriptor) { log_error("failed to read request."); continue; } + if (c != sizeof(request)) { + log_error("socket has been closed unexpectedly."); + break; + } log("Server: socket " + std::to_string(socket_descriptor) + " sent request: " + request_to_string(request)); @@ -114,12 +120,16 @@ void server::client_handler(int socket_descriptor) { case FACT: response = {WAIT_FOR_RESULT, SLOW, request.id, 0}; thread = new std::thread(&server::process_fact, this, socket_descriptor, request); + slow_pool_lock.lock(); slow_ops_pool.push_back(thread); + slow_pool_lock.unlock(); break; case SQRT: response = {WAIT_FOR_RESULT, SLOW, request.id, 0}; thread = new std::thread(&server::process_sqrt, this, socket_descriptor, request); + slow_pool_lock.lock(); slow_ops_pool.push_back(thread); + slow_pool_lock.unlock(); break; default: response = {UNKNOWN_OPERATION, FAST, request.id, 0}; @@ -136,9 +146,14 @@ void server::client_handler(int socket_descriptor) { if (c < 0) { log_error("can't write to socket"); } + if (c != sizeof(response)) { + log_error("socket has been closed unexpectedly."); + break; + } } close(socket_descriptor); + log("Server: closed socket " + std::to_string(socket_descriptor)); } void server::process_sqrt(int socket_descriptor, struct dctp_request_header request) { @@ -161,6 +176,9 @@ void server::process_sqrt(int socket_descriptor, struct dctp_request_header requ if (c < 0) { log_error("can't write to socket"); } + if (c != sizeof(response)) { + log_error("socket has been closed unexpectedly."); + } } void server::process_fact(int socket_descriptor, struct dctp_request_header request) { @@ -169,8 +187,8 @@ void server::process_fact(int socket_descriptor, struct dctp_request_header requ if (request.first_operand < 0) { response = {FACT_OF_NEGATIVE, SLOW, request.id, 0}; } else { - auto result = 1; - for (int i = 1; i < fmin(request.first_operand, 20); i++) { + int64_t result = 1; + for (int i = 1; i <= fmin(request.first_operand, 20); i++) { result *= i; } response = {OK, SLOW, request.id, result}; @@ -186,7 +204,8 @@ void server::process_fact(int socket_descriptor, struct dctp_request_header requ if (c < 0) { log_error("can't write to socket"); } + if (c != sizeof(response)) { + log_error("socket has been closed unexpectedly."); + } } - - From 565a67fe18a5cad03de63500835dd2a1417ff10a Mon Sep 17 00:00:00 2001 From: karvozavr Date: Sat, 16 Feb 2019 20:06:44 +0300 Subject: [PATCH 09/15] Done --- tcp/client/CMakeLists.txt | 2 +- tcp/client/include/CalculatorApp.hpp | 2 + tcp/client/include/ConcurrentQueue.hpp | 2 +- tcp/client/include/requests.hpp | 6 ++- .../CalcuatorServerDriver.cpp | 0 tcp/client/{include => src}/CalculatorApp.cpp | 51 +++++++++++-------- 6 files changed, 38 insertions(+), 25 deletions(-) rename tcp/client/{include => src}/CalcuatorServerDriver.cpp (100%) rename tcp/client/{include => src}/CalculatorApp.cpp (50%) diff --git a/tcp/client/CMakeLists.txt b/tcp/client/CMakeLists.txt index 965d762..bae8c75 100644 --- a/tcp/client/CMakeLists.txt +++ b/tcp/client/CMakeLists.txt @@ -4,7 +4,7 @@ set(CMAKE_CXX_STANDARD 17) file(GLOB SOURCE_FILES "src/*.cpp") -add_executable(calculator ${SOURCE_FILES} include/CalculatorApp.cpp include/CalculatorApp.hpp include/CalcuatorServerDriver.cpp include/CalcuatorServerDriver.hpp src/main.cpp include/requests.hpp include/ConcurrentQueue.hpp src/requests.cpp) +add_executable(calculator ${SOURCE_FILES} src/CalculatorApp.cpp include/CalculatorApp.hpp src/CalcuatorServerDriver.cpp include/CalcuatorServerDriver.hpp src/main.cpp include/requests.hpp include/ConcurrentQueue.hpp src/requests.cpp) target_include_directories(calculator PRIVATE include) diff --git a/tcp/client/include/CalculatorApp.hpp b/tcp/client/include/CalculatorApp.hpp index 2d0fbcd..84dca42 100644 --- a/tcp/client/include/CalculatorApp.hpp +++ b/tcp/client/include/CalculatorApp.hpp @@ -28,4 +28,6 @@ class CalculatorApp { uint32_t m_currentComputation = 0; CalcuatorServerDriver m_driver; + + void printResponse(const CalculatorResponse &response); }; \ No newline at end of file diff --git a/tcp/client/include/ConcurrentQueue.hpp b/tcp/client/include/ConcurrentQueue.hpp index 0f4c886..16639f8 100644 --- a/tcp/client/include/ConcurrentQueue.hpp +++ b/tcp/client/include/ConcurrentQueue.hpp @@ -24,7 +24,7 @@ class ConcurrentQueue { m_queueMutex.lock(); bool isEmpty = m_resultsQueue.empty(); m_queueMutex.unlock(); - return !isEmpty; + return isEmpty; } private: diff --git a/tcp/client/include/requests.hpp b/tcp/client/include/requests.hpp index bc3168e..075a99d 100644 --- a/tcp/client/include/requests.hpp +++ b/tcp/client/include/requests.hpp @@ -3,13 +3,14 @@ #include #include -#pragma pack(0) +#pragma pack(push, 1) struct CalculatorResponse { uint8_t errorCode; uint8_t operationType; uint32_t computationId; int64_t result; }; +#pragma pack(pop) enum OperationType { FAST = 1, @@ -27,13 +28,14 @@ enum ErrorCode { std::string errorCodeToString(uint8_t code); -#pragma pack(0) +#pragma pack(push, 1) struct CalculatorRequest { uint8_t type; uint32_t computationId; int64_t firstOperand; int64_t secondOperand; }; +#pragma pack(pop) enum RequestType { PLUS = 1, diff --git a/tcp/client/include/CalcuatorServerDriver.cpp b/tcp/client/src/CalcuatorServerDriver.cpp similarity index 100% rename from tcp/client/include/CalcuatorServerDriver.cpp rename to tcp/client/src/CalcuatorServerDriver.cpp diff --git a/tcp/client/include/CalculatorApp.cpp b/tcp/client/src/CalculatorApp.cpp similarity index 50% rename from tcp/client/include/CalculatorApp.cpp rename to tcp/client/src/CalculatorApp.cpp index 8d4d77f..6e4ab50 100644 --- a/tcp/client/include/CalculatorApp.cpp +++ b/tcp/client/src/CalculatorApp.cpp @@ -31,33 +31,37 @@ void CalculatorApp::processInput(std::string &line) { } else if (results[0] == "sqrt") { m_driver.sqrt(m_currentComputation, static_cast(std::stoull(results[1]))); } - } - if (results.size() == 3 && results[0].length() == 1) { + } else if (results.size() == 3 && results[0].length() == 1) { + CalculatorResponse response{}; switch (results[0][0]) { case '+': - m_driver.plus(m_currentComputation, - static_cast(std::stoull(results[1])), - static_cast(std::stoull(results[2]))); + response = m_driver.plus(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); break; case '-': - m_driver.minus(m_currentComputation, - static_cast(std::stoull(results[1])), - static_cast(std::stoull(results[2]))); + response = m_driver.minus(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); break; case '*': - m_driver.multiply(m_currentComputation, - static_cast(std::stoull(results[1])), - static_cast(std::stoull(results[2]))); + response = m_driver.multiply(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); break; case '/': - m_driver.divide(m_currentComputation, - static_cast(std::stoull(results[1])), - static_cast(std::stoull(results[2]))); + response = m_driver.divide(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); break; default: std::cout << "Syntax error." << std::endl; break; } + + printResponse(response); + } else { + std::cout << "Syntax error." << std::endl; } } else { std::cout << "Syntax error." << std::endl; @@ -66,18 +70,23 @@ void CalculatorApp::processInput(std::string &line) { printLine(); while (m_driver.hasResult()) { - auto result = m_driver.getResult(); - if (result.errorCode != 0) { - printResult(result.computationId, errorCodeToString(result.errorCode)); - } else { - printResult(result.computationId, result.result); - } + printResponse(m_driver.getResult()); printLine(); } } +void CalculatorApp::printResponse(const CalculatorResponse &response) { + if (response.errorCode != OK) { + printResult(response.computationId, errorCodeToString(response.errorCode)); + } else { + printResult(response.computationId, response.result); + } +} + void CalculatorApp::printEntryMessage() { - std::cout << "Welcome!\n\tOnline calculator 1.0\n\n\n\n\tSupported operations: " << std::endl; + std::cout + << "Welcome!\n\tOnline calculator 1.0\n\n\tSupported operations: + - * / fact sqrt\n\tUse prefix notation (e.g + 2 3).\n" + << std::endl; } void CalculatorApp::printLine(const std::string &line) { From caaab3ffbc24c0a84becfe7b9ceaf139aa3f6703 Mon Sep 17 00:00:00 2001 From: subject-name-here Date: Sun, 17 Feb 2019 10:11:55 +0300 Subject: [PATCH 10/15] small fixes --- tcp/server/src/main.cpp | 16 ++++++++++++++-- tcp/server/src/server.cpp | 10 ++++++---- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/tcp/server/src/main.cpp b/tcp/server/src/main.cpp index 73643ce..d699cda 100644 --- a/tcp/server/src/main.cpp +++ b/tcp/server/src/main.cpp @@ -1,15 +1,27 @@ #include +#include #include "server.h" +server* s_pointer; +void signal_handler( int signum ) { + std::cout << "Interrupt signal (" << signum << ") received.\n"; + + delete s_pointer; + exit(signum); +} + int main(int argc, char **argv) { + std::signal(SIGINT, signal_handler); + std::signal(SIGTERM, signal_handler); + uint16_t port_number = 22229; if (argc > 1) { port_number = static_cast(atoi(argv[1])); // NOLINT(cert-err34-c) } - server s(port_number); - s.start(); + s_pointer = new server(port_number); + s_pointer->start(); return 0; } \ No newline at end of file diff --git a/tcp/server/src/server.cpp b/tcp/server/src/server.cpp index 78269a4..a23b485 100644 --- a/tcp/server/src/server.cpp +++ b/tcp/server/src/server.cpp @@ -1,7 +1,5 @@ #include "server.h" -#include - #include #include #include @@ -26,6 +24,10 @@ server::server(uint16_t port_number) : port_number(port_number) { log_error("can't open socket."); return; } + int k = 1; + if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEADDR, &k, sizeof(int)) < 0) { + log_error("setsockopt(SO_REUSEADDR) failed"); + } bzero((char *) &server_addr, sizeof(server_addr)); server_addr.sin_family = AF_INET; @@ -43,13 +45,13 @@ server::server(uint16_t port_number) : port_number(port_number) { server::~server() { log("Server: destruction begin."); + this->isTerminated = true; + close(socket_descriptor); for (auto slow_operation : this->slow_ops_pool) { slow_operation->join(); } - close(socket_descriptor); - for (auto socket_thread : this->socket_pool) { socket_thread->join(); } From 405d93b3e6d81c38eb8e06787f115494997938d6 Mon Sep 17 00:00:00 2001 From: subject-name-here Date: Fri, 22 Feb 2019 23:04:45 +0300 Subject: [PATCH 11/15] added overflow, moved thread logic to different classes, small fixes --- tcp/server/CMakeLists.txt | 2 +- tcp/server/include/pools.h | 26 ++++++++++++ tcp/server/include/server.h | 10 ++--- tcp/server/include/util.hpp | 44 ++++++++++++++++++++ tcp/server/src/main.cpp | 8 ++-- tcp/server/src/pools.cpp | 49 ++++++++++++++++++++++ tcp/server/src/server.cpp | 82 ++++++++----------------------------- 7 files changed, 144 insertions(+), 77 deletions(-) create mode 100644 tcp/server/include/pools.h create mode 100644 tcp/server/src/pools.cpp diff --git a/tcp/server/CMakeLists.txt b/tcp/server/CMakeLists.txt index 1f56738..1f9047e 100644 --- a/tcp/server/CMakeLists.txt +++ b/tcp/server/CMakeLists.txt @@ -4,7 +4,7 @@ project(server) set(CMAKE_CXX_STANDARD 14) set(SOURCE_FILES - src/main.cpp include/response.hpp include/request.hpp src/server.cpp include/server.h include/util.hpp) + src/main.cpp include/response.hpp include/request.hpp src/server.cpp include/server.h include/util.hpp src/pools.cpp include/pools.h) include_directories(include) diff --git a/tcp/server/include/pools.h b/tcp/server/include/pools.h new file mode 100644 index 0000000..13d8ea2 --- /dev/null +++ b/tcp/server/include/pools.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include +#include + +class socket_pool { +public: + void insert(int socket_descriptor, std::thread* thread); + void remove(int socket_descriptor); + void clear(); +private: + std::mutex lock; + std::unordered_map pool; +}; + +class socket_int_pool { +public: + void insert(int socket_descriptor, int id, std::thread* thread); + void remove(int socket_descriptor, int id); + void clear(); +private: + std::mutex lock; + std::map, std::thread*> pool; +}; diff --git a/tcp/server/include/server.h b/tcp/server/include/server.h index 796d5ea..0421d1a 100644 --- a/tcp/server/include/server.h +++ b/tcp/server/include/server.h @@ -4,7 +4,7 @@ #include #include #include -#include +#include "pools.h" class server { public: @@ -12,7 +12,7 @@ class server { ~server(); - void start(); + void wait_for_clients(); void client_handler(int socket_descriptor); @@ -27,9 +27,7 @@ class server { bool isInitialized = false; bool isTerminated = false; - std::mutex socket_pool_lock; - std::vector socket_pool{}; - std::mutex slow_pool_lock; - std::vector slow_ops_pool{}; + socket_pool client_socket_pool; + socket_int_pool slow_ops_pool; }; diff --git a/tcp/server/include/util.hpp b/tcp/server/include/util.hpp index 67dcb6c..cc9191d 100644 --- a/tcp/server/include/util.hpp +++ b/tcp/server/include/util.hpp @@ -8,4 +8,48 @@ void log(const std::string &s) { void log_error(const std::string &s) { std::cerr << "Error: " << s << std::endl; +} + +ssize_t socket_write_response(int socket_descriptor, struct dctp_response_header& response) { + ssize_t c = write(socket_descriptor, &response, sizeof(response)); + + log("Server: sent to socket " + + std::to_string(socket_descriptor) + + " response (size = " + std::to_string(c) + "): " + + response_to_string(response)); + + if (c < 0) { + log_error("can't write to socket"); + return -1; + } + if (c == 0) { + log_error("socket has closed."); + return -1; + } + if (c > 0 && c != sizeof(response)) { + log_error("data was not written fully."); + return -1; + } + + return c; +} + +ssize_t socket_read_request(int socket_descriptor, struct dctp_request_header& request) { + ssize_t c = read(socket_descriptor, &request, sizeof(request)); + log("Server: socket " + std::to_string(socket_descriptor) + " sent request: " + request_to_string(request)); + + if (c < 0) { + log_error("error while reading request"); + return -1; + } + if (c == 0) { + log_error("socket has closed."); + return -1; + } + if (c > 0 && c != sizeof(request)) { + log_error("data was not read fully."); + return -1; + } + + return c; } \ No newline at end of file diff --git a/tcp/server/src/main.cpp b/tcp/server/src/main.cpp index d699cda..5c72320 100644 --- a/tcp/server/src/main.cpp +++ b/tcp/server/src/main.cpp @@ -3,11 +3,11 @@ #include "server.h" -server* s_pointer; +server* server_pointer; void signal_handler( int signum ) { std::cout << "Interrupt signal (" << signum << ") received.\n"; - delete s_pointer; + delete server_pointer; exit(signum); } @@ -20,8 +20,8 @@ int main(int argc, char **argv) { port_number = static_cast(atoi(argv[1])); // NOLINT(cert-err34-c) } - s_pointer = new server(port_number); - s_pointer->start(); + server_pointer = new server(port_number); + server_pointer->wait_for_clients(); return 0; } \ No newline at end of file diff --git a/tcp/server/src/pools.cpp b/tcp/server/src/pools.cpp new file mode 100644 index 0000000..11bfeba --- /dev/null +++ b/tcp/server/src/pools.cpp @@ -0,0 +1,49 @@ +#include + +#include +#include +#include + +void socket_pool::insert(int socket_descriptor, std::thread *thread) { + lock.lock(); + pool[socket_descriptor] = thread; + lock.unlock(); +} + +void socket_pool::remove(int socket_descriptor) { + lock.lock(); + pool.erase(socket_descriptor); + lock.unlock(); +} + +void socket_pool::clear() { + for (auto socket_thread_pair : this->pool) { + int socket_descriptor = socket_thread_pair.first; + close(socket_descriptor); + std::thread* client_thread = socket_thread_pair.second; + client_thread->join(); + } + this->pool.clear(); +} + +void socket_int_pool::insert(int socket_descriptor, int id, std::thread *thread) { + lock.lock(); + pool[{socket_descriptor, id}] = thread; + lock.unlock(); +} + +void socket_int_pool::remove(int socket_descriptor, int id) { + lock.lock(); + pool.erase({socket_descriptor, id}); + lock.unlock(); +} + +void socket_int_pool::clear() { + for (auto socket_id_thread : this->pool) { + auto socket_and_id = socket_id_thread.first; + close(socket_and_id.first); + std::thread* slow_op_thread = socket_id_thread.second; + slow_op_thread->join(); + } + this->pool.clear(); +} diff --git a/tcp/server/src/server.cpp b/tcp/server/src/server.cpp index a23b485..035a74d 100644 --- a/tcp/server/src/server.cpp +++ b/tcp/server/src/server.cpp @@ -48,18 +48,13 @@ server::~server() { this->isTerminated = true; close(socket_descriptor); - for (auto slow_operation : this->slow_ops_pool) { - slow_operation->join(); - } - - for (auto socket_thread : this->socket_pool) { - socket_thread->join(); - } + this->slow_ops_pool.clear(); + this->client_socket_pool.clear(); log("Server: destruction success."); } -void server::start() { +void server::wait_for_clients() { if (!this->isInitialized) { return; } @@ -74,27 +69,17 @@ void server::start() { } log("Server: new socket connected. " + std::to_string(new_socket_descriptor)); std::thread *thread = new std::thread(&server::client_handler, this, new_socket_descriptor); - socket_pool_lock.lock(); - socket_pool.push_back(thread); - socket_pool_lock.unlock(); + client_socket_pool.insert(new_socket_descriptor, thread); } } void server::client_handler(int socket_descriptor) { while (!this->isTerminated) { struct dctp_request_header request{}; - ssize_t c = read(socket_descriptor, &request, sizeof(request)); - if (c < 0) { - log_error("failed to read request."); - continue; - } - if (c != sizeof(request)) { - log_error("socket has been closed unexpectedly."); + if (socket_read_request(socket_descriptor, request) < 0) { break; } - log("Server: socket " + std::to_string(socket_descriptor) + " sent request: " + request_to_string(request)); - struct dctp_response_header response{}; int64_t result; std::thread *thread; @@ -122,38 +107,23 @@ void server::client_handler(int socket_descriptor) { case FACT: response = {WAIT_FOR_RESULT, SLOW, request.id, 0}; thread = new std::thread(&server::process_fact, this, socket_descriptor, request); - slow_pool_lock.lock(); - slow_ops_pool.push_back(thread); - slow_pool_lock.unlock(); + slow_ops_pool.insert(socket_descriptor, request.id, thread); break; case SQRT: response = {WAIT_FOR_RESULT, SLOW, request.id, 0}; thread = new std::thread(&server::process_sqrt, this, socket_descriptor, request); - slow_pool_lock.lock(); - slow_ops_pool.push_back(thread); - slow_pool_lock.unlock(); + slow_ops_pool.insert(socket_descriptor, request.id, thread); break; default: response = {UNKNOWN_OPERATION, FAST, request.id, 0}; break; } - - c = write(socket_descriptor, &response, sizeof(response)); - - log("Server: sent to socket " - + std::to_string(socket_descriptor) - + " response (size = " + std::to_string(c) + "): " - + response_to_string(response)); - - if (c < 0) { - log_error("can't write to socket"); - } - if (c != sizeof(response)) { - log_error("socket has been closed unexpectedly."); + if (socket_write_response(socket_descriptor, response) < 0) { break; } } + client_socket_pool.remove(socket_descriptor); close(socket_descriptor); log("Server: closed socket " + std::to_string(socket_descriptor)); } @@ -168,19 +138,8 @@ void server::process_sqrt(int socket_descriptor, struct dctp_request_header requ response = {OK, SLOW, request.id, result}; } - ssize_t c = write(socket_descriptor, &response, sizeof(response)); - - log("Server: sent to socket " - + std::to_string(socket_descriptor) - + " response (size = " + std::to_string(c) + "): " - + response_to_string(response)); - - if (c < 0) { - log_error("can't write to socket"); - } - if (c != sizeof(response)) { - log_error("socket has been closed unexpectedly."); - } + socket_write_response(socket_descriptor, response); + slow_ops_pool.remove(socket_descriptor, request.id); } void server::process_fact(int socket_descriptor, struct dctp_request_header request) { @@ -188,26 +147,17 @@ void server::process_fact(int socket_descriptor, struct dctp_request_header requ struct dctp_response_header response{}; if (request.first_operand < 0) { response = {FACT_OF_NEGATIVE, SLOW, request.id, 0}; + } else if (request.first_operand > 20) { + response = {OVERFLOW, SLOW, request.id, 0}; } else { int64_t result = 1; - for (int i = 1; i <= fmin(request.first_operand, 20); i++) { + for (int i = 1; i <= request.first_operand; i++) { result *= i; } response = {OK, SLOW, request.id, result}; } - ssize_t c = write(socket_descriptor, &response, sizeof(response)); - - log("Server: sent to socket " - + std::to_string(socket_descriptor) - + " response (size = " + std::to_string(c) + "): " - + response_to_string(response)); - - if (c < 0) { - log_error("can't write to socket"); - } - if (c != sizeof(response)) { - log_error("socket has been closed unexpectedly."); - } + socket_write_response(socket_descriptor, response); + slow_ops_pool.remove(socket_descriptor, request.id); } From 7d75d667136cb0243dfe414edcd4a04bd1c84225 Mon Sep 17 00:00:00 2001 From: subject-name-here Date: Sat, 16 Mar 2019 18:55:11 +0300 Subject: [PATCH 12/15] udp init --- udp/client/CMakeLists.txt | 17 ++ udp/client/include/CalcuatorServerDriver.hpp | 50 ++++++ udp/client/include/CalculatorApp.hpp | 33 ++++ udp/client/include/ConcurrentQueue.hpp | 33 ++++ udp/client/include/requests.hpp | 47 ++++++ udp/client/include/socketUtils.hpp | 36 ++++ udp/client/src/CalcuatorServerDriver.cpp | 75 +++++++++ udp/client/src/CalculatorApp.cpp | 94 +++++++++++ udp/client/src/main.cpp | 11 ++ udp/client/src/requests.cpp | 18 ++ udp/client/src/socketUtils.cpp | 80 +++++++++ udp/server/CMakeLists.txt | 19 +++ udp/server/include/pools.h | 26 +++ udp/server/include/request.hpp | 30 ++++ udp/server/include/response.hpp | 36 ++++ udp/server/include/server.h | 33 ++++ udp/server/include/util.hpp | 55 +++++++ udp/server/src/main.cpp | 27 +++ udp/server/src/pools.cpp | 49 ++++++ udp/server/src/server.cpp | 163 +++++++++++++++++++ 20 files changed, 932 insertions(+) create mode 100644 udp/client/CMakeLists.txt create mode 100644 udp/client/include/CalcuatorServerDriver.hpp create mode 100644 udp/client/include/CalculatorApp.hpp create mode 100644 udp/client/include/ConcurrentQueue.hpp create mode 100644 udp/client/include/requests.hpp create mode 100644 udp/client/include/socketUtils.hpp create mode 100644 udp/client/src/CalcuatorServerDriver.cpp create mode 100644 udp/client/src/CalculatorApp.cpp create mode 100644 udp/client/src/main.cpp create mode 100644 udp/client/src/requests.cpp create mode 100644 udp/client/src/socketUtils.cpp create mode 100644 udp/server/CMakeLists.txt create mode 100644 udp/server/include/pools.h create mode 100644 udp/server/include/request.hpp create mode 100644 udp/server/include/response.hpp create mode 100644 udp/server/include/server.h create mode 100644 udp/server/include/util.hpp create mode 100644 udp/server/src/main.cpp create mode 100644 udp/server/src/pools.cpp create mode 100644 udp/server/src/server.cpp diff --git a/udp/client/CMakeLists.txt b/udp/client/CMakeLists.txt new file mode 100644 index 0000000..bae8c75 --- /dev/null +++ b/udp/client/CMakeLists.txt @@ -0,0 +1,17 @@ +cmake_minimum_required(VERSION 2.8) +project(calculator) +set(CMAKE_CXX_STANDARD 17) + +file(GLOB SOURCE_FILES "src/*.cpp") + +add_executable(calculator ${SOURCE_FILES} src/CalculatorApp.cpp include/CalculatorApp.hpp src/CalcuatorServerDriver.cpp include/CalcuatorServerDriver.hpp src/main.cpp include/requests.hpp include/ConcurrentQueue.hpp src/requests.cpp) + +target_include_directories(calculator PRIVATE include) + +find_package(Threads REQUIRED) +if(THREADS_HAVE_PTHREAD_ARG) + target_compile_options(calculator PUBLIC "-pthread") +endif() +if(CMAKE_THREAD_LIBS_INIT) + target_link_libraries(calculator "${CMAKE_THREAD_LIBS_INIT}") +endif() diff --git a/udp/client/include/CalcuatorServerDriver.hpp b/udp/client/include/CalcuatorServerDriver.hpp new file mode 100644 index 0000000..a720266 --- /dev/null +++ b/udp/client/include/CalcuatorServerDriver.hpp @@ -0,0 +1,50 @@ +#include + +#pragma once + +#include +#include +#include +#include +#include "requests.hpp" +#include "socketUtils.hpp" +#include "ConcurrentQueue.hpp" + +class CalcuatorServerDriver { +public: + CalcuatorServerDriver(std::string host, uint16_t port) : m_host(std::move(host)), m_port(port) {} + + ~CalcuatorServerDriver(); + + void connect(); + + bool hasResult(); + + CalculatorResponse getResult(); + + void factorial(uint32_t id, int64_t arg); + + void sqrt(uint32_t id, int64_t arg); + + CalculatorResponse plus(uint32_t id, int64_t arg1, int64_t arg2); + + CalculatorResponse minus(uint32_t id, int64_t arg1, int64_t arg2); + + CalculatorResponse multiply(uint32_t id, int64_t arg1, int64_t arg2); + + CalculatorResponse divide(uint32_t id, int64_t arg1, int64_t arg2); + +private: + void sendRequest(CalculatorRequest const &request); + + CalculatorResponse getResponse(); + + void readingThreadTask(); + + ConcurrentQueue m_longResults; + ConcurrentQueue m_instantResults; + int m_socket = 0; + std::string m_host; + uint16_t m_port; + std::thread m_readingThread; +}; diff --git a/udp/client/include/CalculatorApp.hpp b/udp/client/include/CalculatorApp.hpp new file mode 100644 index 0000000..84dca42 --- /dev/null +++ b/udp/client/include/CalculatorApp.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include +#include "CalcuatorServerDriver.hpp" +#include "socketUtils.hpp" + +class CalculatorApp { +public: + CalculatorApp(std::string const &host, uint16_t port) : m_driver(host, port) {} + + void start(); + +private: + void printPrompt(uint32_t computationId); + + void processInput(std::string &line); + + template + void printResult(uint32_t id, T const &value) { + std::cout << "Out [" << id << "]: " << value << std::endl; + } + + void printEntryMessage(); + + void printLine(std::string const &line = ""); + + uint32_t m_currentComputation = 0; + CalcuatorServerDriver m_driver; + + void printResponse(const CalculatorResponse &response); +}; \ No newline at end of file diff --git a/udp/client/include/ConcurrentQueue.hpp b/udp/client/include/ConcurrentQueue.hpp new file mode 100644 index 0000000..16639f8 --- /dev/null +++ b/udp/client/include/ConcurrentQueue.hpp @@ -0,0 +1,33 @@ +#pragma once + +#include +#include + +template +class ConcurrentQueue { +public: + void push(T const &response) { + m_queueMutex.lock(); + m_resultsQueue.push(response); + m_queueMutex.unlock(); + } + + T pop() { + m_queueMutex.lock(); + auto result = m_resultsQueue.front(); + m_resultsQueue.pop(); + m_queueMutex.unlock(); + return result; + } + + bool empty() { + m_queueMutex.lock(); + bool isEmpty = m_resultsQueue.empty(); + m_queueMutex.unlock(); + return isEmpty; + } + +private: + std::mutex m_queueMutex; + std::queue m_resultsQueue; +}; diff --git a/udp/client/include/requests.hpp b/udp/client/include/requests.hpp new file mode 100644 index 0000000..075a99d --- /dev/null +++ b/udp/client/include/requests.hpp @@ -0,0 +1,47 @@ +#pragma once + +#include +#include + +#pragma pack(push, 1) +struct CalculatorResponse { + uint8_t errorCode; + uint8_t operationType; + uint32_t computationId; + int64_t result; +}; +#pragma pack(pop) + +enum OperationType { + FAST = 1, + SLOW +}; + +enum ErrorCode { + OK = 0, + WAIT_FOR_RESULT, + OVERFLOW, + DIV_BY_ZERO, + FACT_OF_NEGATIVE, + SQRT_OF_NEGATIVE, +}; + +std::string errorCodeToString(uint8_t code); + +#pragma pack(push, 1) +struct CalculatorRequest { + uint8_t type; + uint32_t computationId; + int64_t firstOperand; + int64_t secondOperand; +}; +#pragma pack(pop) + +enum RequestType { + PLUS = 1, + MINUS, + MULT, + DIV, + SQRT, + FACT +}; diff --git a/udp/client/include/socketUtils.hpp b/udp/client/include/socketUtils.hpp new file mode 100644 index 0000000..03ed503 --- /dev/null +++ b/udp/client/include/socketUtils.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include + +void writeToSocket(int socketDescriptor, const void *buffer, size_t size); + +void readFromSocket(int socketDescriptor, uint8_t *buffer, size_t size); + +void printError(const std::string &s); + +void error(const std::string &s); + +void error(const std::string &type, const std::string &s); + +int connectToServer(std::string const &hostname, uint16_t port); + +template +void writeObject(int socketDescriptor, const T &object) { + writeToSocket(socketDescriptor, (uint8_t const *) &object, sizeof(T)); +} + +template +T readObject(int socketDescriptor) { + T object; + readFromSocket(socketDescriptor, (uint8_t *) &object, sizeof(T)); + + return object; +} + +std::string read_until_zero(int *ptr, char *buffer, size_t buffer_size); diff --git a/udp/client/src/CalcuatorServerDriver.cpp b/udp/client/src/CalcuatorServerDriver.cpp new file mode 100644 index 0000000..2513718 --- /dev/null +++ b/udp/client/src/CalcuatorServerDriver.cpp @@ -0,0 +1,75 @@ +// +// Created by karvozavr on 16/02/19. +// + +#include "CalcuatorServerDriver.hpp" + +CalcuatorServerDriver::~CalcuatorServerDriver() { + shutdown(m_socket, SHUT_RDWR); + close(m_socket); +} + +void CalcuatorServerDriver::connect() { + m_socket = connectToServer(m_host, m_port); + m_readingThread = std::thread(&CalcuatorServerDriver::readingThreadTask, this); +} + +void CalcuatorServerDriver::readingThreadTask() { + while (true) { + auto response = readObject(m_socket); + if (response.errorCode != WAIT_FOR_RESULT) { + if (response.operationType == SLOW) { + m_longResults.push(response); + } else if (response.operationType == FAST) { + m_instantResults.push(response); + } + } + } +} + +bool CalcuatorServerDriver::hasResult() { + return !m_longResults.empty(); +} + +CalculatorResponse CalcuatorServerDriver::getResult() { + return m_longResults.pop(); +} + +void CalcuatorServerDriver::factorial(uint32_t id, int64_t arg) { + sendRequest({FACT, id, arg, 0}); +} + +void CalcuatorServerDriver::sqrt(uint32_t id, int64_t arg) { + sendRequest({SQRT, id, arg, 0}); +} + +CalculatorResponse CalcuatorServerDriver::plus(uint32_t id, int64_t arg1, int64_t arg2) { + sendRequest({PLUS, id, arg1, arg2}); + return getResponse(); +} + +CalculatorResponse CalcuatorServerDriver::minus(uint32_t id, int64_t arg1, int64_t arg2) { + sendRequest({MINUS, id, arg1, arg2}); + return getResponse(); +} + +CalculatorResponse CalcuatorServerDriver::multiply(uint32_t id, int64_t arg1, int64_t arg2) { + sendRequest({MULT, id, arg1, arg2}); + return getResponse(); +} + +CalculatorResponse CalcuatorServerDriver::divide(uint32_t id, int64_t arg1, int64_t arg2) { + sendRequest({DIV, id, arg1, arg2}); + return getResponse(); +} + +void CalcuatorServerDriver::sendRequest(CalculatorRequest const &request) { + writeObject(m_socket, request); +} + +CalculatorResponse CalcuatorServerDriver::getResponse() { + while (m_instantResults.empty()) {} + return m_instantResults.pop(); +} + + diff --git a/udp/client/src/CalculatorApp.cpp b/udp/client/src/CalculatorApp.cpp new file mode 100644 index 0000000..6e4ab50 --- /dev/null +++ b/udp/client/src/CalculatorApp.cpp @@ -0,0 +1,94 @@ +#include "CalculatorApp.hpp" + +void CalculatorApp::start() { + m_driver.connect(); + + printEntryMessage(); + + std::string line; + printPrompt(m_currentComputation); + + while (std::getline(std::cin, line)) { + processInput(line); + ++m_currentComputation; + printPrompt(m_currentComputation); + } +} + +void CalculatorApp::printPrompt(uint32_t computationId) { + std::cout << "In [" << computationId << "]: "; +} + +void CalculatorApp::processInput(std::string &line) { + std::istringstream iss(line); + std::vector results((std::istream_iterator(iss)), + std::istream_iterator()); + + if (results.size() == 2 || results.size() == 3 || results.empty()) { + if (results.size() == 2) { + if (results[0] == "fact") { + m_driver.factorial(m_currentComputation, static_cast(std::stoull(results[1]))); + } else if (results[0] == "sqrt") { + m_driver.sqrt(m_currentComputation, static_cast(std::stoull(results[1]))); + } + } else if (results.size() == 3 && results[0].length() == 1) { + CalculatorResponse response{}; + switch (results[0][0]) { + case '+': + response = m_driver.plus(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + case '-': + response = m_driver.minus(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + case '*': + response = m_driver.multiply(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + case '/': + response = m_driver.divide(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + default: + std::cout << "Syntax error." << std::endl; + break; + } + + printResponse(response); + } else { + std::cout << "Syntax error." << std::endl; + } + } else { + std::cout << "Syntax error." << std::endl; + } + + printLine(); + + while (m_driver.hasResult()) { + printResponse(m_driver.getResult()); + printLine(); + } +} + +void CalculatorApp::printResponse(const CalculatorResponse &response) { + if (response.errorCode != OK) { + printResult(response.computationId, errorCodeToString(response.errorCode)); + } else { + printResult(response.computationId, response.result); + } +} + +void CalculatorApp::printEntryMessage() { + std::cout + << "Welcome!\n\tOnline calculator 1.0\n\n\tSupported operations: + - * / fact sqrt\n\tUse prefix notation (e.g + 2 3).\n" + << std::endl; +} + +void CalculatorApp::printLine(const std::string &line) { + std::cout << line << std::endl; +} diff --git a/udp/client/src/main.cpp b/udp/client/src/main.cpp new file mode 100644 index 0000000..ccc4cf7 --- /dev/null +++ b/udp/client/src/main.cpp @@ -0,0 +1,11 @@ +#include +#include + +int main(int argc, char *argv[]) { + if (argc != 3) { + error("Invalid argumets", "Usage: calculator "); + } + + CalculatorApp app(std::string(argv[1]), static_cast(atoi(argv[2]))); + app.start(); +} \ No newline at end of file diff --git a/udp/client/src/requests.cpp b/udp/client/src/requests.cpp new file mode 100644 index 0000000..defabe9 --- /dev/null +++ b/udp/client/src/requests.cpp @@ -0,0 +1,18 @@ +#include "requests.hpp" + +std::string errorCodeToString(uint8_t code) { + switch (code) { + case OK: + return "OK"; + case OVERFLOW: + return "Integer overflow"; + case DIV_BY_ZERO: + return "Zero division error"; + case FACT_OF_NEGATIVE: + return "Factorial of negative number"; + case SQRT_OF_NEGATIVE: + return "Sqrt of negative number"; + default: + return "Unknown error"; + } +} \ No newline at end of file diff --git a/udp/client/src/socketUtils.cpp b/udp/client/src/socketUtils.cpp new file mode 100644 index 0000000..921cfe4 --- /dev/null +++ b/udp/client/src/socketUtils.cpp @@ -0,0 +1,80 @@ +#include "socketUtils.hpp" + +void writeToSocket(int socket_descriptor, const void *buf, size_t size) { + ssize_t n = write(socket_descriptor, buf, size); + if (n <= 0) { + error("write", "failed to write to socket!"); + } + + if (n != size) { + error("write", "unexpected end of socket! (Probably server disconnected)"); + exit(0); + } +} + +void readFromSocket(int socket_descriptor, uint8_t *buf, size_t size) { + ssize_t n; + while((n = read(socket_descriptor, buf, size)) < size) { + if (n <= 0) { + error("read", "failed to read from socket!"); + exit(0); + } + + buf += n; + size -= n; + } +} + +void printError(const std::string &s) { + std::cerr << s << std::endl; +} + +void error(const std::string &s) { + printError("Error: " + s); + exit(0); +} + +void error(const std::string &type, const std::string &s) { + printError("Error " + type + ": " + s); + exit(0); +} + +std::string read_until_zero(int* ptr, char* buffer, size_t buffer_size) { + std::string dest; + while (*ptr < buffer_size && buffer[*ptr] != '\0') { + dest += buffer[*ptr]; + (*ptr)++; + } + (*ptr)++; + + return dest; +} + +int connectToServer(std::string const &hostname, uint16_t port) { + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + struct sockaddr_in serv_addr{}; + struct hostent *server; + + if (sockfd < 0) { + error("Error opening socket"); + } + + server = gethostbyname(hostname.c_str()); + + if (server == nullptr) { + error("Connection", "Error, no such host"); + } + + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + bcopy(server->h_addr, (char *) &serv_addr.sin_addr.s_addr, (size_t) server->h_length); + serv_addr.sin_port = htons(port); + + /* Now connect to the server */ + if (connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { + perror("Error connecting"); + exit(1); + } + + return sockfd; +} diff --git a/udp/server/CMakeLists.txt b/udp/server/CMakeLists.txt new file mode 100644 index 0000000..1f9047e --- /dev/null +++ b/udp/server/CMakeLists.txt @@ -0,0 +1,19 @@ +cmake_minimum_required(VERSION 3.13) +project(server) + +set(CMAKE_CXX_STANDARD 14) + +set(SOURCE_FILES + src/main.cpp include/response.hpp include/request.hpp src/server.cpp include/server.h include/util.hpp src/pools.cpp include/pools.h) + +include_directories(include) + +add_executable(server ${SOURCE_FILES}) + +find_package(Threads REQUIRED) +if(THREADS_HAVE_PTHREAD_ARG) + target_compile_options(server PUBLIC "-pthread") +endif() +if(CMAKE_THREAD_LIBS_INIT) + target_link_libraries(server "${CMAKE_THREAD_LIBS_INIT}") +endif() \ No newline at end of file diff --git a/udp/server/include/pools.h b/udp/server/include/pools.h new file mode 100644 index 0000000..13d8ea2 --- /dev/null +++ b/udp/server/include/pools.h @@ -0,0 +1,26 @@ +#pragma once + +#include +#include +#include +#include + +class socket_pool { +public: + void insert(int socket_descriptor, std::thread* thread); + void remove(int socket_descriptor); + void clear(); +private: + std::mutex lock; + std::unordered_map pool; +}; + +class socket_int_pool { +public: + void insert(int socket_descriptor, int id, std::thread* thread); + void remove(int socket_descriptor, int id); + void clear(); +private: + std::mutex lock; + std::map, std::thread*> pool; +}; diff --git a/udp/server/include/request.hpp b/udp/server/include/request.hpp new file mode 100644 index 0000000..58f1945 --- /dev/null +++ b/udp/server/include/request.hpp @@ -0,0 +1,30 @@ +#pragma once + +#include + +#pragma pack(push, 1) + +struct dctp_request_header { + uint8_t type; + uint32_t id; + int64_t first_operand; + int64_t second_operand; +}; + +#pragma pack(pop) + +enum request_type { + PLUS = 1, + MINUS, + MULT, + DIV, + SQRT, + FACT +}; + +std::string request_to_string(struct dctp_request_header request) { + return "Id: " + std::to_string(request.id) + + "; type: " + std::to_string(request.type) + + "; first operand: " + std::to_string(request.first_operand) + + "; second operand: " + std::to_string(request.second_operand); +} \ No newline at end of file diff --git a/udp/server/include/response.hpp b/udp/server/include/response.hpp new file mode 100644 index 0000000..94abe15 --- /dev/null +++ b/udp/server/include/response.hpp @@ -0,0 +1,36 @@ +#pragma once + +#include + +#pragma pack(push, 1) + +struct dctp_response_header { + uint8_t return_code; + uint8_t operation_type; + uint32_t id; + int64_t result; +}; + +#pragma pack(pop) + +enum return_code { + OK = 0, + WAIT_FOR_RESULT, + OVERFLOW, // not used + DIV_BY_ZERO, + FACT_OF_NEGATIVE, + SQRT_OF_NEGATIVE, + UNKNOWN_OPERATION +}; + +enum operation_type { + FAST = 1, + SLOW +}; + +std::string response_to_string(struct dctp_response_header response) { + return "Return code: " + std::to_string(response.return_code) + + "; type: " + std::to_string(response.operation_type) + + "; id: " + std::to_string(response.id) + + "; result: " + std::to_string(response.result); +} \ No newline at end of file diff --git a/udp/server/include/server.h b/udp/server/include/server.h new file mode 100644 index 0000000..0421d1a --- /dev/null +++ b/udp/server/include/server.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include +#include +#include +#include "pools.h" + +class server { +public: + explicit server(uint16_t port); + + ~server(); + + void wait_for_clients(); + + void client_handler(int socket_descriptor); + + void process_sqrt(int socket_descriptor, struct dctp_request_header request); + + void process_fact(int socket_descriptor, struct dctp_request_header request); +private: + uint16_t port_number; + int socket_descriptor; + sockaddr_in server_addr{}; + sockaddr_in client_addr{}; + bool isInitialized = false; + bool isTerminated = false; + + socket_pool client_socket_pool; + socket_int_pool slow_ops_pool; +}; + diff --git a/udp/server/include/util.hpp b/udp/server/include/util.hpp new file mode 100644 index 0000000..cc9191d --- /dev/null +++ b/udp/server/include/util.hpp @@ -0,0 +1,55 @@ +#pragma once + +#include + +void log(const std::string &s) { + std::cout << s << std::endl; +} + +void log_error(const std::string &s) { + std::cerr << "Error: " << s << std::endl; +} + +ssize_t socket_write_response(int socket_descriptor, struct dctp_response_header& response) { + ssize_t c = write(socket_descriptor, &response, sizeof(response)); + + log("Server: sent to socket " + + std::to_string(socket_descriptor) + + " response (size = " + std::to_string(c) + "): " + + response_to_string(response)); + + if (c < 0) { + log_error("can't write to socket"); + return -1; + } + if (c == 0) { + log_error("socket has closed."); + return -1; + } + if (c > 0 && c != sizeof(response)) { + log_error("data was not written fully."); + return -1; + } + + return c; +} + +ssize_t socket_read_request(int socket_descriptor, struct dctp_request_header& request) { + ssize_t c = read(socket_descriptor, &request, sizeof(request)); + log("Server: socket " + std::to_string(socket_descriptor) + " sent request: " + request_to_string(request)); + + if (c < 0) { + log_error("error while reading request"); + return -1; + } + if (c == 0) { + log_error("socket has closed."); + return -1; + } + if (c > 0 && c != sizeof(request)) { + log_error("data was not read fully."); + return -1; + } + + return c; +} \ No newline at end of file diff --git a/udp/server/src/main.cpp b/udp/server/src/main.cpp new file mode 100644 index 0000000..5c72320 --- /dev/null +++ b/udp/server/src/main.cpp @@ -0,0 +1,27 @@ +#include +#include + +#include "server.h" + +server* server_pointer; +void signal_handler( int signum ) { + std::cout << "Interrupt signal (" << signum << ") received.\n"; + + delete server_pointer; + exit(signum); +} + +int main(int argc, char **argv) { + std::signal(SIGINT, signal_handler); + std::signal(SIGTERM, signal_handler); + + uint16_t port_number = 22229; + if (argc > 1) { + port_number = static_cast(atoi(argv[1])); // NOLINT(cert-err34-c) + } + + server_pointer = new server(port_number); + server_pointer->wait_for_clients(); + + return 0; +} \ No newline at end of file diff --git a/udp/server/src/pools.cpp b/udp/server/src/pools.cpp new file mode 100644 index 0000000..11bfeba --- /dev/null +++ b/udp/server/src/pools.cpp @@ -0,0 +1,49 @@ +#include + +#include +#include +#include + +void socket_pool::insert(int socket_descriptor, std::thread *thread) { + lock.lock(); + pool[socket_descriptor] = thread; + lock.unlock(); +} + +void socket_pool::remove(int socket_descriptor) { + lock.lock(); + pool.erase(socket_descriptor); + lock.unlock(); +} + +void socket_pool::clear() { + for (auto socket_thread_pair : this->pool) { + int socket_descriptor = socket_thread_pair.first; + close(socket_descriptor); + std::thread* client_thread = socket_thread_pair.second; + client_thread->join(); + } + this->pool.clear(); +} + +void socket_int_pool::insert(int socket_descriptor, int id, std::thread *thread) { + lock.lock(); + pool[{socket_descriptor, id}] = thread; + lock.unlock(); +} + +void socket_int_pool::remove(int socket_descriptor, int id) { + lock.lock(); + pool.erase({socket_descriptor, id}); + lock.unlock(); +} + +void socket_int_pool::clear() { + for (auto socket_id_thread : this->pool) { + auto socket_and_id = socket_id_thread.first; + close(socket_and_id.first); + std::thread* slow_op_thread = socket_id_thread.second; + slow_op_thread->join(); + } + this->pool.clear(); +} diff --git a/udp/server/src/server.cpp b/udp/server/src/server.cpp new file mode 100644 index 0000000..035a74d --- /dev/null +++ b/udp/server/src/server.cpp @@ -0,0 +1,163 @@ +#include "server.h" + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +#include + + +server::server(uint16_t port_number) : port_number(port_number) { + log("Server: initialization begin."); + + socket_descriptor = socket(AF_INET, SOCK_STREAM, 0); + if (socket_descriptor < 0) { + log_error("can't open socket."); + return; + } + int k = 1; + if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEADDR, &k, sizeof(int)) < 0) { + log_error("setsockopt(SO_REUSEADDR) failed"); + } + + bzero((char *) &server_addr, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = INADDR_ANY; + server_addr.sin_port = htons(port_number); + + if (bind(socket_descriptor, (struct sockaddr *) &server_addr, sizeof(server_addr)) < 0) { + log_error("can't bind."); + return; + } + isInitialized = true; + + log("Server: initialization success."); +} + +server::~server() { + log("Server: destruction begin."); + this->isTerminated = true; + close(socket_descriptor); + + this->slow_ops_pool.clear(); + this->client_socket_pool.clear(); + + log("Server: destruction success."); +} + +void server::wait_for_clients() { + if (!this->isInitialized) { + return; + } + listen(socket_descriptor, 5); + + while (!this->isTerminated) { + socklen_t client_size = sizeof(client_addr); + int new_socket_descriptor = accept(socket_descriptor, (struct sockaddr *) &client_addr, &client_size); + if (new_socket_descriptor < 0) { + log_error("failed to accept"); + continue; + } + log("Server: new socket connected. " + std::to_string(new_socket_descriptor)); + std::thread *thread = new std::thread(&server::client_handler, this, new_socket_descriptor); + client_socket_pool.insert(new_socket_descriptor, thread); + } +} + +void server::client_handler(int socket_descriptor) { + while (!this->isTerminated) { + struct dctp_request_header request{}; + if (socket_read_request(socket_descriptor, request) < 0) { + break; + } + + struct dctp_response_header response{}; + int64_t result; + std::thread *thread; + switch (request.type) { + case PLUS: + result = request.first_operand + request.second_operand; + response = {OK, FAST, request.id, result}; + break; + case MINUS: + result = request.first_operand - request.second_operand; + response = {OK, FAST, request.id, result}; + break; + case MULT: + result = request.first_operand * request.second_operand; + response = {OK, FAST, request.id, result}; + break; + case DIV: + if (request.second_operand == 0) { + response = {DIV_BY_ZERO, FAST, request.id, 0}; + } else { + result = request.first_operand / request.second_operand; + response = {OK, FAST, request.id, result}; + } + break; + case FACT: + response = {WAIT_FOR_RESULT, SLOW, request.id, 0}; + thread = new std::thread(&server::process_fact, this, socket_descriptor, request); + slow_ops_pool.insert(socket_descriptor, request.id, thread); + break; + case SQRT: + response = {WAIT_FOR_RESULT, SLOW, request.id, 0}; + thread = new std::thread(&server::process_sqrt, this, socket_descriptor, request); + slow_ops_pool.insert(socket_descriptor, request.id, thread); + break; + default: + response = {UNKNOWN_OPERATION, FAST, request.id, 0}; + break; + } + if (socket_write_response(socket_descriptor, response) < 0) { + break; + } + } + + client_socket_pool.remove(socket_descriptor); + close(socket_descriptor); + log("Server: closed socket " + std::to_string(socket_descriptor)); +} + +void server::process_sqrt(int socket_descriptor, struct dctp_request_header request) { + sleep(2); + struct dctp_response_header response{}; + if (request.first_operand < 0) { + response = {SQRT_OF_NEGATIVE, SLOW, request.id, 0}; + } else { + auto result = static_cast(sqrt(request.first_operand)); + response = {OK, SLOW, request.id, result}; + } + + socket_write_response(socket_descriptor, response); + slow_ops_pool.remove(socket_descriptor, request.id); +} + +void server::process_fact(int socket_descriptor, struct dctp_request_header request) { + sleep(2); + struct dctp_response_header response{}; + if (request.first_operand < 0) { + response = {FACT_OF_NEGATIVE, SLOW, request.id, 0}; + } else if (request.first_operand > 20) { + response = {OVERFLOW, SLOW, request.id, 0}; + } else { + int64_t result = 1; + for (int i = 1; i <= request.first_operand; i++) { + result *= i; + } + response = {OK, SLOW, request.id, result}; + } + + socket_write_response(socket_descriptor, response); + slow_ops_pool.remove(socket_descriptor, request.id); +} + From bd76b1025a553b63e453016649da23bc6839e56c Mon Sep 17 00:00:00 2001 From: karvozavr Date: Mon, 18 Mar 2019 21:43:26 +0300 Subject: [PATCH 13/15] Change to UDP --- udp/client/CMakeLists.txt | 2 +- udp/client/include/CalcuatorServerDriver.hpp | 51 ++++--- udp/client/include/CalculatorApp.hpp | 26 ++-- udp/client/include/ConcurrentMap.hpp | 48 +++++++ udp/client/include/ConcurrentQueue.hpp | 40 +++--- udp/client/include/PacketManager.hpp | 35 +++++ udp/client/include/requests.hpp | 46 +++---- udp/client/include/socketUtils.hpp | 12 +- udp/client/src/CalcuatorServerDriver.cpp | 106 ++++++++++----- udp/client/src/CalculatorApp.cpp | 132 +++++++++---------- udp/client/src/main.cpp | 10 +- udp/client/src/requests.cpp | 28 ++-- udp/client/src/socketUtils.cpp | 99 +++++++------- 13 files changed, 389 insertions(+), 246 deletions(-) create mode 100644 udp/client/include/ConcurrentMap.hpp create mode 100644 udp/client/include/PacketManager.hpp diff --git a/udp/client/CMakeLists.txt b/udp/client/CMakeLists.txt index bae8c75..5f080c9 100644 --- a/udp/client/CMakeLists.txt +++ b/udp/client/CMakeLists.txt @@ -4,7 +4,7 @@ set(CMAKE_CXX_STANDARD 17) file(GLOB SOURCE_FILES "src/*.cpp") -add_executable(calculator ${SOURCE_FILES} src/CalculatorApp.cpp include/CalculatorApp.hpp src/CalcuatorServerDriver.cpp include/CalcuatorServerDriver.hpp src/main.cpp include/requests.hpp include/ConcurrentQueue.hpp src/requests.cpp) +add_executable(calculator ${SOURCE_FILES}) target_include_directories(calculator PRIVATE include) diff --git a/udp/client/include/CalcuatorServerDriver.hpp b/udp/client/include/CalcuatorServerDriver.hpp index a720266..89979db 100644 --- a/udp/client/include/CalcuatorServerDriver.hpp +++ b/udp/client/include/CalcuatorServerDriver.hpp @@ -9,42 +9,53 @@ #include "requests.hpp" #include "socketUtils.hpp" #include "ConcurrentQueue.hpp" +#include "ConcurrentMap.hpp" +#include "PacketManager.hpp" class CalcuatorServerDriver { public: - CalcuatorServerDriver(std::string host, uint16_t port) : m_host(std::move(host)), m_port(port) {} + CalcuatorServerDriver(std::string host, uint16_t port) : m_host(std::move(host)), m_port(port) {} - ~CalcuatorServerDriver(); + ~CalcuatorServerDriver(); - void connect(); + void initialize(); - bool hasResult(); + bool hasResult(); - CalculatorResponse getResult(); + CalculatorResponse getResult(); - void factorial(uint32_t id, int64_t arg); + void factorial(uint32_t id, int64_t arg); - void sqrt(uint32_t id, int64_t arg); + void sqrt(uint32_t id, int64_t arg); - CalculatorResponse plus(uint32_t id, int64_t arg1, int64_t arg2); + CalculatorResponse plus(uint32_t id, int64_t arg1, int64_t arg2); - CalculatorResponse minus(uint32_t id, int64_t arg1, int64_t arg2); + CalculatorResponse minus(uint32_t id, int64_t arg1, int64_t arg2); - CalculatorResponse multiply(uint32_t id, int64_t arg1, int64_t arg2); + CalculatorResponse multiply(uint32_t id, int64_t arg1, int64_t arg2); - CalculatorResponse divide(uint32_t id, int64_t arg1, int64_t arg2); + CalculatorResponse divide(uint32_t id, int64_t arg1, int64_t arg2); private: - void sendRequest(CalculatorRequest const &request); + void sendRequest(CalculatorRequest const &request); - CalculatorResponse getResponse(); + void sendRequestImpl(CalculatorRequest const &request); - void readingThreadTask(); + void checkForComputations(); - ConcurrentQueue m_longResults; - ConcurrentQueue m_instantResults; - int m_socket = 0; - std::string m_host; - uint16_t m_port; - std::thread m_readingThread; + CalculatorResponse getResponse(uint32_t computationId); + + void readingThreadTask(); + + ConcurrentQueue m_longResults; + ConcurrentMap m_instantResults; + int m_socket = 0; + std::string m_host; + uint16_t m_port; + sockaddr_in m_server; + std::thread m_readingThread; + PacketManager m_packetManager; + CalculatorRequest m_lastRequest{}; + std::vector m_longComputations; + volatile bool m_terminated = false; }; diff --git a/udp/client/include/CalculatorApp.hpp b/udp/client/include/CalculatorApp.hpp index 84dca42..546f3da 100644 --- a/udp/client/include/CalculatorApp.hpp +++ b/udp/client/include/CalculatorApp.hpp @@ -8,26 +8,26 @@ class CalculatorApp { public: - CalculatorApp(std::string const &host, uint16_t port) : m_driver(host, port) {} + CalculatorApp(std::string const &host, uint16_t port) : m_driver(host, port) {} - void start(); + void start(); private: - void printPrompt(uint32_t computationId); + void printPrompt(uint32_t computationId); - void processInput(std::string &line); + void processInput(std::string &line); - template - void printResult(uint32_t id, T const &value) { - std::cout << "Out [" << id << "]: " << value << std::endl; - } + template + void printResult(uint32_t id, T const &value) { + std::cout << "Out [" << id << "]: " << value << std::endl; + } - void printEntryMessage(); + void printEntryMessage(); - void printLine(std::string const &line = ""); + void printLine(std::string const &line = ""); - uint32_t m_currentComputation = 0; - CalcuatorServerDriver m_driver; + uint32_t m_currentComputation = 0; + CalcuatorServerDriver m_driver; - void printResponse(const CalculatorResponse &response); + void printResponse(const CalculatorResponse &response); }; \ No newline at end of file diff --git a/udp/client/include/ConcurrentMap.hpp b/udp/client/include/ConcurrentMap.hpp new file mode 100644 index 0000000..5cf20f3 --- /dev/null +++ b/udp/client/include/ConcurrentMap.hpp @@ -0,0 +1,48 @@ +#pragma once + +#include +#include + +template +class ConcurrentMap { +public: + void add(Key const &key, Value const &value) { + m_setMutex.lock(); + m_map[key] = value; + m_setMutex.unlock(); + } + + std::optional get(Key const &key) { + m_setMutex.lock(); + + if (m_map.find(key) != m_map.end()) { + auto result = m_map[key]; + m_setMutex.unlock(); + return result; + } + + m_setMutex.unlock(); + return std::optional(); + } + + void remove(Key const &key) { + m_setMutex.lock(); + + if (m_map.find(key) != m_map.end()) { + m_map.erase(key); + } + + m_setMutex.unlock(); + } + + bool empty() { + m_setMutex.lock(); + bool isEmpty = m_map.empty(); + m_setMutex.unlock(); + return isEmpty; + } + +private: + std::mutex m_setMutex; + std::unordered_map m_map; +}; diff --git a/udp/client/include/ConcurrentQueue.hpp b/udp/client/include/ConcurrentQueue.hpp index 16639f8..65d7a20 100644 --- a/udp/client/include/ConcurrentQueue.hpp +++ b/udp/client/include/ConcurrentQueue.hpp @@ -6,28 +6,28 @@ template class ConcurrentQueue { public: - void push(T const &response) { - m_queueMutex.lock(); - m_resultsQueue.push(response); - m_queueMutex.unlock(); - } + void push(T const &element) { + m_queueMutex.lock(); + m_resultsQueue.push(element); + m_queueMutex.unlock(); + } - T pop() { - m_queueMutex.lock(); - auto result = m_resultsQueue.front(); - m_resultsQueue.pop(); - m_queueMutex.unlock(); - return result; - } + T pop() { + m_queueMutex.lock(); + auto result = m_resultsQueue.front(); + m_resultsQueue.pop(); + m_queueMutex.unlock(); + return result; + } - bool empty() { - m_queueMutex.lock(); - bool isEmpty = m_resultsQueue.empty(); - m_queueMutex.unlock(); - return isEmpty; - } + bool empty() { + m_queueMutex.lock(); + bool isEmpty = m_resultsQueue.empty(); + m_queueMutex.unlock(); + return isEmpty; + } private: - std::mutex m_queueMutex; - std::queue m_resultsQueue; + std::mutex m_queueMutex; + std::queue m_resultsQueue; }; diff --git a/udp/client/include/PacketManager.hpp b/udp/client/include/PacketManager.hpp new file mode 100644 index 0000000..462dc8c --- /dev/null +++ b/udp/client/include/PacketManager.hpp @@ -0,0 +1,35 @@ +#pragma once + +#include +#include "socketUtils.hpp" + +class PacketManager { +public: + + explicit PacketManager() {} + + PacketManager(sockaddr_in const &host, uint16_t port, int socket) + : m_host(host), m_port(port), m_socket(socket) {} + + void sendMessage(uint8_t const *data, size_t size) { + ssize_t sent = sendto(m_socket, data, size, 0, (const sockaddr *) &m_host, sizeof(m_host)); + + if (size != sent) { + error("Server unavailable."); + } + } + + void receiveMessage(uint8_t *data, size_t size) { + socklen_t addrLength = sizeof(m_host); + ssize_t received = recvfrom(m_socket, (void *) data, size, 0, (sockaddr *) &m_host, &addrLength); + + if (size != received) { + error("Lost connection."); + } + } + +private: + int m_socket; + sockaddr_in m_host; + uint16_t m_port; +}; \ No newline at end of file diff --git a/udp/client/include/requests.hpp b/udp/client/include/requests.hpp index 075a99d..7cfaa00 100644 --- a/udp/client/include/requests.hpp +++ b/udp/client/include/requests.hpp @@ -5,43 +5,45 @@ #pragma pack(push, 1) struct CalculatorResponse { - uint8_t errorCode; - uint8_t operationType; - uint32_t computationId; - int64_t result; + uint8_t errorCode; + uint8_t type; + uint32_t computationId; + int64_t result; }; #pragma pack(pop) enum OperationType { - FAST = 1, - SLOW + FAST = 1, + SLOW }; enum ErrorCode { - OK = 0, - WAIT_FOR_RESULT, - OVERFLOW, - DIV_BY_ZERO, - FACT_OF_NEGATIVE, - SQRT_OF_NEGATIVE, + OK = 0, + WAIT_FOR_RESULT, + OVERFLOW, + DIV_BY_ZERO, + FACT_OF_NEGATIVE, + SQRT_OF_NEGATIVE, + UNKNOWN_OP }; std::string errorCodeToString(uint8_t code); #pragma pack(push, 1) struct CalculatorRequest { - uint8_t type; - uint32_t computationId; - int64_t firstOperand; - int64_t secondOperand; + uint8_t type; + uint32_t computationId; + int64_t firstOperand; + int64_t secondOperand; }; #pragma pack(pop) enum RequestType { - PLUS = 1, - MINUS, - MULT, - DIV, - SQRT, - FACT + PLUS = 1, + MINUS, + MULT, + DIV, + SQRT, + FACT, + LONG_OP_RESULT }; diff --git a/udp/client/include/socketUtils.hpp b/udp/client/include/socketUtils.hpp index 03ed503..fb9ccec 100644 --- a/udp/client/include/socketUtils.hpp +++ b/udp/client/include/socketUtils.hpp @@ -18,19 +18,21 @@ void error(const std::string &s); void error(const std::string &type, const std::string &s); -int connectToServer(std::string const &hostname, uint16_t port); +sockaddr_in initializeHost(std::string const &hostname, uint16_t port); + +int initializeSocket(); template void writeObject(int socketDescriptor, const T &object) { - writeToSocket(socketDescriptor, (uint8_t const *) &object, sizeof(T)); + writeToSocket(socketDescriptor, (uint8_t const *) &object, sizeof(T)); } template T readObject(int socketDescriptor) { - T object; - readFromSocket(socketDescriptor, (uint8_t *) &object, sizeof(T)); + T object; + readFromSocket(socketDescriptor, (uint8_t *) &object, sizeof(T)); - return object; + return object; } std::string read_until_zero(int *ptr, char *buffer, size_t buffer_size); diff --git a/udp/client/src/CalcuatorServerDriver.cpp b/udp/client/src/CalcuatorServerDriver.cpp index 2513718..534bf74 100644 --- a/udp/client/src/CalcuatorServerDriver.cpp +++ b/udp/client/src/CalcuatorServerDriver.cpp @@ -5,71 +5,117 @@ #include "CalcuatorServerDriver.hpp" CalcuatorServerDriver::~CalcuatorServerDriver() { - shutdown(m_socket, SHUT_RDWR); - close(m_socket); + m_terminated = true; + shutdown(m_socket, SHUT_RDWR); + close(m_socket); } -void CalcuatorServerDriver::connect() { - m_socket = connectToServer(m_host, m_port); - m_readingThread = std::thread(&CalcuatorServerDriver::readingThreadTask, this); +void CalcuatorServerDriver::initialize() { + m_socket = initializeSocket(); + m_server = initializeHost(m_host, m_port); + m_readingThread = std::thread(&CalcuatorServerDriver::readingThreadTask, this); + m_packetManager = PacketManager(m_server, m_port, m_socket); } void CalcuatorServerDriver::readingThreadTask() { - while (true) { - auto response = readObject(m_socket); - if (response.errorCode != WAIT_FOR_RESULT) { - if (response.operationType == SLOW) { - m_longResults.push(response); - } else if (response.operationType == FAST) { - m_instantResults.push(response); - } - } + while (!m_terminated) { + CalculatorResponse response{}; + m_packetManager.receiveMessage(reinterpret_cast(&response), sizeof(response)); + + if (response.errorCode != WAIT_FOR_RESULT) { + if (response.type == SLOW) { + m_longResults.push(response); + } else if (response.type == FAST) { + m_instantResults.add(response.computationId, response); + } + } else { + m_instantResults.add(response.computationId, response); } + } } bool CalcuatorServerDriver::hasResult() { - return !m_longResults.empty(); + return !m_longResults.empty(); } CalculatorResponse CalcuatorServerDriver::getResult() { - return m_longResults.pop(); + return m_longResults.pop(); } void CalcuatorServerDriver::factorial(uint32_t id, int64_t arg) { - sendRequest({FACT, id, arg, 0}); + sendRequest({FACT, id, arg, 0}); + getResponse(id); } void CalcuatorServerDriver::sqrt(uint32_t id, int64_t arg) { - sendRequest({SQRT, id, arg, 0}); + sendRequest({SQRT, id, arg, 0}); + getResponse(id); } CalculatorResponse CalcuatorServerDriver::plus(uint32_t id, int64_t arg1, int64_t arg2) { - sendRequest({PLUS, id, arg1, arg2}); - return getResponse(); + sendRequest({PLUS, id, arg1, arg2}); + return getResponse(id); } CalculatorResponse CalcuatorServerDriver::minus(uint32_t id, int64_t arg1, int64_t arg2) { - sendRequest({MINUS, id, arg1, arg2}); - return getResponse(); + sendRequest({MINUS, id, arg1, arg2}); + return getResponse(id); } CalculatorResponse CalcuatorServerDriver::multiply(uint32_t id, int64_t arg1, int64_t arg2) { - sendRequest({MULT, id, arg1, arg2}); - return getResponse(); + sendRequest({MULT, id, arg1, arg2}); + return getResponse(id); } CalculatorResponse CalcuatorServerDriver::divide(uint32_t id, int64_t arg1, int64_t arg2) { - sendRequest({DIV, id, arg1, arg2}); - return getResponse(); + sendRequest({DIV, id, arg1, arg2}); + return getResponse(id); +} + +void CalcuatorServerDriver::sendRequestImpl(CalculatorRequest const &request) { + m_lastRequest = request; + m_packetManager.sendMessage(reinterpret_cast(&request), sizeof(request)); } void CalcuatorServerDriver::sendRequest(CalculatorRequest const &request) { - writeObject(m_socket, request); + checkForComputations(); + sendRequestImpl(request); +} + +CalculatorResponse CalcuatorServerDriver::getResponse(uint32_t computationId) { + const int COUNTER_INITIAL = 5; + const int MAX_TIMES_RESEND = 10; + const int MILLISECONDS_WAIT = 100; + + int counter = COUNTER_INITIAL; + int timesResend = 0; + + std::optional response = m_instantResults.get(computationId); + + while (!response.has_value()) { + if (counter == 0) { + if (timesResend == MAX_TIMES_RESEND) { + error("Lost connection."); + } + sendRequest(m_lastRequest); + ++timesResend; + counter = COUNTER_INITIAL; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(MILLISECONDS_WAIT)); + response = m_instantResults.get(computationId); + + --counter; + } + + return response.value(); } -CalculatorResponse CalcuatorServerDriver::getResponse() { - while (m_instantResults.empty()) {} - return m_instantResults.pop(); +void CalcuatorServerDriver::checkForComputations() { + for (uint32_t id : m_longComputations) { + sendRequestImpl({LONG_OP_RESULT, id, 0, 0}); + getResponse(id); + } } diff --git a/udp/client/src/CalculatorApp.cpp b/udp/client/src/CalculatorApp.cpp index 6e4ab50..5ab12a8 100644 --- a/udp/client/src/CalculatorApp.cpp +++ b/udp/client/src/CalculatorApp.cpp @@ -1,94 +1,94 @@ #include "CalculatorApp.hpp" void CalculatorApp::start() { - m_driver.connect(); + m_driver.initialize(); - printEntryMessage(); + printEntryMessage(); - std::string line; - printPrompt(m_currentComputation); + std::string line; + printPrompt(m_currentComputation); - while (std::getline(std::cin, line)) { - processInput(line); - ++m_currentComputation; - printPrompt(m_currentComputation); - } + while (std::getline(std::cin, line)) { + processInput(line); + ++m_currentComputation; + printPrompt(m_currentComputation); + } } void CalculatorApp::printPrompt(uint32_t computationId) { - std::cout << "In [" << computationId << "]: "; + std::cout << "In [" << computationId << "]: "; } void CalculatorApp::processInput(std::string &line) { - std::istringstream iss(line); - std::vector results((std::istream_iterator(iss)), - std::istream_iterator()); + std::istringstream iss(line); + std::vector results((std::istream_iterator(iss)), + std::istream_iterator()); - if (results.size() == 2 || results.size() == 3 || results.empty()) { - if (results.size() == 2) { - if (results[0] == "fact") { - m_driver.factorial(m_currentComputation, static_cast(std::stoull(results[1]))); - } else if (results[0] == "sqrt") { - m_driver.sqrt(m_currentComputation, static_cast(std::stoull(results[1]))); - } - } else if (results.size() == 3 && results[0].length() == 1) { - CalculatorResponse response{}; - switch (results[0][0]) { - case '+': - response = m_driver.plus(m_currentComputation, - static_cast(std::stoull(results[1])), - static_cast(std::stoull(results[2]))); - break; - case '-': - response = m_driver.minus(m_currentComputation, - static_cast(std::stoull(results[1])), - static_cast(std::stoull(results[2]))); - break; - case '*': - response = m_driver.multiply(m_currentComputation, - static_cast(std::stoull(results[1])), - static_cast(std::stoull(results[2]))); - break; - case '/': - response = m_driver.divide(m_currentComputation, - static_cast(std::stoull(results[1])), - static_cast(std::stoull(results[2]))); - break; - default: - std::cout << "Syntax error." << std::endl; - break; - } + if (results.size() == 2 || results.size() == 3 || results.empty()) { + if (results.size() == 2) { + if (results[0] == "fact") { + m_driver.factorial(m_currentComputation, static_cast(std::stoull(results[1]))); + } else if (results[0] == "sqrt") { + m_driver.sqrt(m_currentComputation, static_cast(std::stoull(results[1]))); + } + } else if (results.size() == 3 && results[0].length() == 1) { + CalculatorResponse response{}; + switch (results[0][0]) { + case '+': + response = m_driver.plus(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + case '-': + response = m_driver.minus(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + case '*': + response = m_driver.multiply(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + case '/': + response = m_driver.divide(m_currentComputation, + static_cast(std::stoull(results[1])), + static_cast(std::stoull(results[2]))); + break; + default: + std::cout << "Syntax error." << std::endl; + break; + } - printResponse(response); - } else { - std::cout << "Syntax error." << std::endl; - } + printResponse(response); } else { - std::cout << "Syntax error." << std::endl; + std::cout << "Syntax error." << std::endl; } + } else { + std::cout << "Syntax error." << std::endl; + } - printLine(); + printLine(); - while (m_driver.hasResult()) { - printResponse(m_driver.getResult()); - printLine(); - } + while (m_driver.hasResult()) { + printResponse(m_driver.getResult()); + printLine(); + } } void CalculatorApp::printResponse(const CalculatorResponse &response) { - if (response.errorCode != OK) { - printResult(response.computationId, errorCodeToString(response.errorCode)); - } else { - printResult(response.computationId, response.result); - } + if (response.errorCode != OK) { + printResult(response.computationId, errorCodeToString(response.errorCode)); + } else { + printResult(response.computationId, response.result); + } } void CalculatorApp::printEntryMessage() { - std::cout - << "Welcome!\n\tOnline calculator 1.0\n\n\tSupported operations: + - * / fact sqrt\n\tUse prefix notation (e.g + 2 3).\n" - << std::endl; + std::cout + << "Welcome!\n\tOnline calculator 1.0\n\n\tSupported operations: + - * / fact sqrt\n\tUse prefix notation (e.g + 2 3).\n" + << std::endl; } void CalculatorApp::printLine(const std::string &line) { - std::cout << line << std::endl; + std::cout << line << std::endl; } diff --git a/udp/client/src/main.cpp b/udp/client/src/main.cpp index ccc4cf7..e86bd75 100644 --- a/udp/client/src/main.cpp +++ b/udp/client/src/main.cpp @@ -2,10 +2,10 @@ #include int main(int argc, char *argv[]) { - if (argc != 3) { - error("Invalid argumets", "Usage: calculator "); - } + if (argc != 3) { + error("Invalid argumets", "Usage: calculator "); + } - CalculatorApp app(std::string(argv[1]), static_cast(atoi(argv[2]))); - app.start(); + CalculatorApp app(std::string(argv[1]), static_cast(atoi(argv[2]))); + app.start(); } \ No newline at end of file diff --git a/udp/client/src/requests.cpp b/udp/client/src/requests.cpp index defabe9..a51420a 100644 --- a/udp/client/src/requests.cpp +++ b/udp/client/src/requests.cpp @@ -1,18 +1,18 @@ #include "requests.hpp" std::string errorCodeToString(uint8_t code) { - switch (code) { - case OK: - return "OK"; - case OVERFLOW: - return "Integer overflow"; - case DIV_BY_ZERO: - return "Zero division error"; - case FACT_OF_NEGATIVE: - return "Factorial of negative number"; - case SQRT_OF_NEGATIVE: - return "Sqrt of negative number"; - default: - return "Unknown error"; - } + switch (code) { + case OK: + return "OK"; + case OVERFLOW: + return "Integer overflow"; + case DIV_BY_ZERO: + return "Zero division error"; + case FACT_OF_NEGATIVE: + return "Factorial of negative number"; + case SQRT_OF_NEGATIVE: + return "Sqrt of negative number"; + default: + return "Unknown error"; + } } \ No newline at end of file diff --git a/udp/client/src/socketUtils.cpp b/udp/client/src/socketUtils.cpp index 921cfe4..c47d7a3 100644 --- a/udp/client/src/socketUtils.cpp +++ b/udp/client/src/socketUtils.cpp @@ -1,80 +1,79 @@ #include "socketUtils.hpp" void writeToSocket(int socket_descriptor, const void *buf, size_t size) { - ssize_t n = write(socket_descriptor, buf, size); - if (n <= 0) { - error("write", "failed to write to socket!"); - } + ssize_t n = write(socket_descriptor, buf, size); + if (n <= 0) { + error("write", "failed to write to socket!"); + } - if (n != size) { - error("write", "unexpected end of socket! (Probably server disconnected)"); - exit(0); - } + if (n != size) { + error("write", "unexpected end of socket! (Probably server disconnected)"); + exit(0); + } } void readFromSocket(int socket_descriptor, uint8_t *buf, size_t size) { - ssize_t n; - while((n = read(socket_descriptor, buf, size)) < size) { - if (n <= 0) { - error("read", "failed to read from socket!"); - exit(0); - } - - buf += n; - size -= n; + ssize_t n; + while ((n = read(socket_descriptor, buf, size)) < size) { + if (n <= 0) { + error("read", "failed to read from socket!"); + exit(0); } + + buf += n; + size -= n; + } } void printError(const std::string &s) { - std::cerr << s << std::endl; + std::cerr << s << std::endl; } void error(const std::string &s) { - printError("Error: " + s); - exit(0); + printError("Error: " + s); + exit(0); } void error(const std::string &type, const std::string &s) { - printError("Error " + type + ": " + s); - exit(0); + printError("Error " + type + ": " + s); + exit(0); } -std::string read_until_zero(int* ptr, char* buffer, size_t buffer_size) { - std::string dest; - while (*ptr < buffer_size && buffer[*ptr] != '\0') { - dest += buffer[*ptr]; - (*ptr)++; - } +std::string read_until_zero(int *ptr, char *buffer, size_t buffer_size) { + std::string dest; + while (*ptr < buffer_size && buffer[*ptr] != '\0') { + dest += buffer[*ptr]; (*ptr)++; + } + (*ptr)++; - return dest; + return dest; } -int connectToServer(std::string const &hostname, uint16_t port) { - int sockfd = socket(AF_INET, SOCK_STREAM, 0); - struct sockaddr_in serv_addr{}; - struct hostent *server; +sockaddr_in initializeHost(std::string const &hostname, uint16_t port) { + hostent *server; - if (sockfd < 0) { - error("Error opening socket"); - } + server = gethostbyname(hostname.c_str()); + sockaddr_in serv_addr{}; - server = gethostbyname(hostname.c_str()); + if (server == nullptr) { + error("Connection", "Error, no such host"); + } - if (server == nullptr) { - error("Connection", "Error, no such host"); - } + bzero((char *) &serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + bcopy(server->h_addr, (char *) &serv_addr.sin_addr.s_addr, (size_t) server->h_length); + serv_addr.sin_port = htons(port); - bzero((char *) &serv_addr, sizeof(serv_addr)); - serv_addr.sin_family = AF_INET; - bcopy(server->h_addr, (char *) &serv_addr.sin_addr.s_addr, (size_t) server->h_length); - serv_addr.sin_port = htons(port); + return serv_addr; +} - /* Now connect to the server */ - if (connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) { - perror("Error connecting"); - exit(1); - } +int initializeSocket() { + int sockfd = socket(AF_INET, SOCK_DGRAM, 0); + + if (sockfd < 0) { + error("Error opening socket"); + } - return sockfd; + return sockfd; } From 0ff9788a39c75f0163c49098aced48a12d988841 Mon Sep 17 00:00:00 2001 From: karvozavr Date: Mon, 18 Mar 2019 23:48:11 +0300 Subject: [PATCH 14/15] Fix long computations response bug --- udp/client/include/CalcuatorServerDriver.hpp | 3 ++- udp/client/src/CalcuatorServerDriver.cpp | 18 ++++++++++-------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/udp/client/include/CalcuatorServerDriver.hpp b/udp/client/include/CalcuatorServerDriver.hpp index 89979db..51f1db9 100644 --- a/udp/client/include/CalcuatorServerDriver.hpp +++ b/udp/client/include/CalcuatorServerDriver.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include "requests.hpp" #include "socketUtils.hpp" #include "ConcurrentQueue.hpp" @@ -56,6 +57,6 @@ class CalcuatorServerDriver { std::thread m_readingThread; PacketManager m_packetManager; CalculatorRequest m_lastRequest{}; - std::vector m_longComputations; + std::unordered_set m_longComputations; volatile bool m_terminated = false; }; diff --git a/udp/client/src/CalcuatorServerDriver.cpp b/udp/client/src/CalcuatorServerDriver.cpp index 534bf74..9c3142b 100644 --- a/udp/client/src/CalcuatorServerDriver.cpp +++ b/udp/client/src/CalcuatorServerDriver.cpp @@ -1,7 +1,3 @@ -// -// Created by karvozavr on 16/02/19. -// - #include "CalcuatorServerDriver.hpp" CalcuatorServerDriver::~CalcuatorServerDriver() { @@ -13,8 +9,8 @@ CalcuatorServerDriver::~CalcuatorServerDriver() { void CalcuatorServerDriver::initialize() { m_socket = initializeSocket(); m_server = initializeHost(m_host, m_port); - m_readingThread = std::thread(&CalcuatorServerDriver::readingThreadTask, this); m_packetManager = PacketManager(m_server, m_port, m_socket); + m_readingThread = std::thread(&CalcuatorServerDriver::readingThreadTask, this); } void CalcuatorServerDriver::readingThreadTask() { @@ -25,6 +21,7 @@ void CalcuatorServerDriver::readingThreadTask() { if (response.errorCode != WAIT_FOR_RESULT) { if (response.type == SLOW) { m_longResults.push(response); + m_instantResults.add(response.computationId, response); } else if (response.type == FAST) { m_instantResults.add(response.computationId, response); } @@ -39,17 +36,21 @@ bool CalcuatorServerDriver::hasResult() { } CalculatorResponse CalcuatorServerDriver::getResult() { - return m_longResults.pop(); + auto response = m_longResults.pop(); + m_longComputations.erase(response.computationId); + return response; } void CalcuatorServerDriver::factorial(uint32_t id, int64_t arg) { sendRequest({FACT, id, arg, 0}); getResponse(id); + m_longComputations.insert(id); } void CalcuatorServerDriver::sqrt(uint32_t id, int64_t arg) { sendRequest({SQRT, id, arg, 0}); getResponse(id); + m_longComputations.insert(id); } CalculatorResponse CalcuatorServerDriver::plus(uint32_t id, int64_t arg1, int64_t arg2) { @@ -84,7 +85,7 @@ void CalcuatorServerDriver::sendRequest(CalculatorRequest const &request) { CalculatorResponse CalcuatorServerDriver::getResponse(uint32_t computationId) { const int COUNTER_INITIAL = 5; - const int MAX_TIMES_RESEND = 10; + const int MAX_TIMES_RESEND = 5; const int MILLISECONDS_WAIT = 100; int counter = COUNTER_INITIAL; @@ -97,7 +98,7 @@ CalculatorResponse CalcuatorServerDriver::getResponse(uint32_t computationId) { if (timesResend == MAX_TIMES_RESEND) { error("Lost connection."); } - sendRequest(m_lastRequest); + sendRequestImpl(m_lastRequest); ++timesResend; counter = COUNTER_INITIAL; } @@ -108,6 +109,7 @@ CalculatorResponse CalcuatorServerDriver::getResponse(uint32_t computationId) { --counter; } + m_instantResults.remove(computationId); return response.value(); } From fe3dace5d2e8dba190afc75b5b565c9a43cef3bb Mon Sep 17 00:00:00 2001 From: subject-name-here Date: Mon, 18 Mar 2019 23:49:14 +0300 Subject: [PATCH 15/15] done udp server --- udp/server/.gitkeep | 0 udp/server/include/pools.h | 21 +++-- udp/server/include/request.hpp | 3 +- udp/server/include/response.hpp | 6 +- udp/server/include/server.h | 9 ++- udp/server/include/util.hpp | 45 +++++++---- udp/server/src/pools.cpp | 51 +++++++----- udp/server/src/server.cpp | 133 +++++++++++++++++--------------- 8 files changed, 157 insertions(+), 111 deletions(-) delete mode 100644 udp/server/.gitkeep diff --git a/udp/server/.gitkeep b/udp/server/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/udp/server/include/pools.h b/udp/server/include/pools.h index 13d8ea2..eae46f1 100644 --- a/udp/server/include/pools.h +++ b/udp/server/include/pools.h @@ -4,23 +4,28 @@ #include #include #include +#include +#include "response.hpp" -class socket_pool { +class socket_int_pool { public: - void insert(int socket_descriptor, std::thread* thread); - void remove(int socket_descriptor); + void insert(int socket_descriptor, int id, std::thread* thread); + void remove(int socket_descriptor, int id); void clear(); private: std::mutex lock; - std::unordered_map pool; + std::map, std::thread*> pool; }; -class socket_int_pool { +bool operator <(const struct sockaddr_in& x, const struct sockaddr_in& y); + +class long_computation_response_pool { public: - void insert(int socket_descriptor, int id, std::thread* thread); - void remove(int socket_descriptor, int id); + void insert(struct sockaddr_in client_addr, int id, int type, dctp_response_header response); + dctp_response_header get(struct sockaddr_in client_addr, int id, int type); + bool contains(struct sockaddr_in client_addr, int id, int type); void clear(); private: std::mutex lock; - std::map, std::thread*> pool; + std::map, dctp_response_header> pool; }; diff --git a/udp/server/include/request.hpp b/udp/server/include/request.hpp index 58f1945..2682e83 100644 --- a/udp/server/include/request.hpp +++ b/udp/server/include/request.hpp @@ -19,7 +19,8 @@ enum request_type { MULT, DIV, SQRT, - FACT + FACT, + LONG_COMPUTATION_RESULT }; std::string request_to_string(struct dctp_request_header request) { diff --git a/udp/server/include/response.hpp b/udp/server/include/response.hpp index 94abe15..ae7ac9b 100644 --- a/udp/server/include/response.hpp +++ b/udp/server/include/response.hpp @@ -16,7 +16,7 @@ struct dctp_response_header { enum return_code { OK = 0, WAIT_FOR_RESULT, - OVERFLOW, // not used + OVERFLOW, DIV_BY_ZERO, FACT_OF_NEGATIVE, SQRT_OF_NEGATIVE, @@ -28,9 +28,9 @@ enum operation_type { SLOW }; -std::string response_to_string(struct dctp_response_header response) { +inline std::string response_to_string(struct dctp_response_header response) { return "Return code: " + std::to_string(response.return_code) + "; type: " + std::to_string(response.operation_type) + "; id: " + std::to_string(response.id) + "; result: " + std::to_string(response.result); -} \ No newline at end of file +} diff --git a/udp/server/include/server.h b/udp/server/include/server.h index 0421d1a..99bfe0b 100644 --- a/udp/server/include/server.h +++ b/udp/server/include/server.h @@ -14,7 +14,12 @@ class server { void wait_for_clients(); - void client_handler(int socket_descriptor); + void client_handler(struct dctp_request_header request); + dctp_response_header handle_new_request(struct dctp_request_header request); + struct dctp_response_header handle_slow_function( + struct dctp_request_header request, + void(server::* process)(int, struct dctp_request_header) + ); void process_sqrt(int socket_descriptor, struct dctp_request_header request); @@ -27,7 +32,7 @@ class server { bool isInitialized = false; bool isTerminated = false; - socket_pool client_socket_pool; socket_int_pool slow_ops_pool; + long_computation_response_pool computation_response_pool; }; diff --git a/udp/server/include/util.hpp b/udp/server/include/util.hpp index cc9191d..222a9e2 100644 --- a/udp/server/include/util.hpp +++ b/udp/server/include/util.hpp @@ -10,46 +10,63 @@ void log_error(const std::string &s) { std::cerr << "Error: " << s << std::endl; } -ssize_t socket_write_response(int socket_descriptor, struct dctp_response_header& response) { - ssize_t c = write(socket_descriptor, &response, sizeof(response)); +ssize_t socket_write_response(int socket_descriptor, struct dctp_response_header& response, sockaddr_in& client_addr) { + socklen_t client_size = sizeof(client_addr); + ssize_t sent_bytes = sendto( + socket_descriptor, + &response, + sizeof(response), + 0, + (struct sockaddr *) &client_addr, + client_size + ); log("Server: sent to socket " + std::to_string(socket_descriptor) - + " response (size = " + std::to_string(c) + "): " + + " response (size = " + std::to_string(sent_bytes) + "): " + response_to_string(response)); - if (c < 0) { + if (sent_bytes < 0) { log_error("can't write to socket"); return -1; } - if (c == 0) { + if (sent_bytes == 0) { log_error("socket has closed."); return -1; } - if (c > 0 && c != sizeof(response)) { + if (sent_bytes > 0 && sent_bytes != sizeof(response)) { log_error("data was not written fully."); return -1; } - return c; + return sent_bytes; } -ssize_t socket_read_request(int socket_descriptor, struct dctp_request_header& request) { - ssize_t c = read(socket_descriptor, &request, sizeof(request)); +ssize_t socket_read_request(int socket_descriptor, struct dctp_request_header& request, sockaddr_in& client_addr) { + socklen_t client_size = sizeof(client_addr); + ssize_t read_bytes = recvfrom( + socket_descriptor, + (void *) &request, + sizeof(request), + 0, + (struct sockaddr *) &client_addr, + &client_size + ); + log("Server: socket " + std::to_string(socket_descriptor) + " sent request: " + request_to_string(request)); - if (c < 0) { + if (read_bytes < 0) { log_error("error while reading request"); return -1; } - if (c == 0) { + if (read_bytes == 0) { log_error("socket has closed."); return -1; } - if (c > 0 && c != sizeof(request)) { - log_error("data was not read fully."); + if (read_bytes > 0 && read_bytes != sizeof(request)) { + log_error("data was not read fully (read " + std::to_string(read_bytes) + " bytes)."); return -1; } - return c; + return read_bytes; } \ No newline at end of file diff --git a/udp/server/src/pools.cpp b/udp/server/src/pools.cpp index 11bfeba..e31e1e2 100644 --- a/udp/server/src/pools.cpp +++ b/udp/server/src/pools.cpp @@ -4,46 +4,55 @@ #include #include -void socket_pool::insert(int socket_descriptor, std::thread *thread) { +void socket_int_pool::insert(int socket_descriptor, int id, std::thread *thread) { lock.lock(); - pool[socket_descriptor] = thread; + pool[{socket_descriptor, id}] = thread; lock.unlock(); } -void socket_pool::remove(int socket_descriptor) { +void socket_int_pool::remove(int socket_descriptor, int id) { lock.lock(); - pool.erase(socket_descriptor); + pool.erase({socket_descriptor, id}); lock.unlock(); } -void socket_pool::clear() { - for (auto socket_thread_pair : this->pool) { - int socket_descriptor = socket_thread_pair.first; - close(socket_descriptor); - std::thread* client_thread = socket_thread_pair.second; - client_thread->join(); +void socket_int_pool::clear() { + for (auto socket_id_thread : this->pool) { + auto socket_and_id = socket_id_thread.first; + close(socket_and_id.first); + std::thread* slow_op_thread = socket_id_thread.second; + slow_op_thread->join(); } this->pool.clear(); } -void socket_int_pool::insert(int socket_descriptor, int id, std::thread *thread) { + +bool operator <(const struct sockaddr_in& x, const struct sockaddr_in& y) { + return x.sin_addr.s_addr < y.sin_addr.s_addr || x.sin_port < y.sin_port; +} + +void long_computation_response_pool::insert(sockaddr_in client_addr, int id, int type, dctp_response_header response) { lock.lock(); - pool[{socket_descriptor, id}] = thread; + pool[{client_addr, id, type}] = response; lock.unlock(); } -void socket_int_pool::remove(int socket_descriptor, int id) { +dctp_response_header long_computation_response_pool::get(struct sockaddr_in client_addr, int id, int type) { + dctp_response_header response{}; lock.lock(); - pool.erase({socket_descriptor, id}); + response = pool[{client_addr, id, type}]; lock.unlock(); + return response; } -void socket_int_pool::clear() { - for (auto socket_id_thread : this->pool) { - auto socket_and_id = socket_id_thread.first; - close(socket_and_id.first); - std::thread* slow_op_thread = socket_id_thread.second; - slow_op_thread->join(); - } +void long_computation_response_pool::clear() { this->pool.clear(); } + +bool long_computation_response_pool::contains(struct sockaddr_in client_addr, int id, int type) { + bool result; + lock.lock(); + result = pool.find(std::make_tuple(client_addr, id, type)) != pool.end(); + lock.unlock(); + return result; +} diff --git a/udp/server/src/server.cpp b/udp/server/src/server.cpp index 035a74d..8100924 100644 --- a/udp/server/src/server.cpp +++ b/udp/server/src/server.cpp @@ -19,7 +19,7 @@ server::server(uint16_t port_number) : port_number(port_number) { log("Server: initialization begin."); - socket_descriptor = socket(AF_INET, SOCK_STREAM, 0); + socket_descriptor = socket(AF_INET, SOCK_DGRAM, 0); if (socket_descriptor < 0) { log_error("can't open socket."); return; @@ -49,7 +49,7 @@ server::~server() { close(socket_descriptor); this->slow_ops_pool.clear(); - this->client_socket_pool.clear(); + this->computation_response_pool.clear(); log("Server: destruction success."); } @@ -58,78 +58,87 @@ void server::wait_for_clients() { if (!this->isInitialized) { return; } - listen(socket_descriptor, 5); while (!this->isTerminated) { - socklen_t client_size = sizeof(client_addr); - int new_socket_descriptor = accept(socket_descriptor, (struct sockaddr *) &client_addr, &client_size); - if (new_socket_descriptor < 0) { - log_error("failed to accept"); + dctp_request_header request{}; + if (socket_read_request(socket_descriptor, request, client_addr) < 0) { continue; } - log("Server: new socket connected. " + std::to_string(new_socket_descriptor)); - std::thread *thread = new std::thread(&server::client_handler, this, new_socket_descriptor); - client_socket_pool.insert(new_socket_descriptor, thread); + + log("Server: new client connected. " + std::to_string(client_addr.sin_addr.s_addr)); + client_handler(request); + log("Server: stopped work with client. " + std::to_string(client_addr.sin_addr.s_addr)); } } -void server::client_handler(int socket_descriptor) { - while (!this->isTerminated) { - struct dctp_request_header request{}; - if (socket_read_request(socket_descriptor, request) < 0) { - break; - } +void server::client_handler(struct dctp_request_header request) { + struct dctp_response_header response{}; + if (computation_response_pool.contains(client_addr, request.id, request.type)) { + response = computation_response_pool.get(client_addr, request.id, request.type); + } else { + response = handle_new_request(request); + computation_response_pool.insert(client_addr, request.id, request.type, response); + } - struct dctp_response_header response{}; - int64_t result; - std::thread *thread; - switch (request.type) { - case PLUS: - result = request.first_operand + request.second_operand; - response = {OK, FAST, request.id, result}; - break; - case MINUS: - result = request.first_operand - request.second_operand; - response = {OK, FAST, request.id, result}; - break; - case MULT: - result = request.first_operand * request.second_operand; + socket_write_response(socket_descriptor, response, client_addr); +} + +dctp_response_header server::handle_new_request(struct dctp_request_header request) { + struct dctp_response_header response{}; + int64_t result; + switch (request.type) { + case PLUS: + result = request.first_operand + request.second_operand; + response = {OK, FAST, request.id, result}; + break; + case MINUS: + result = request.first_operand - request.second_operand; + response = {OK, FAST, request.id, result}; + break; + case MULT: + result = request.first_operand * request.second_operand; + response = {OK, FAST, request.id, result}; + break; + case DIV: + if (request.second_operand == 0) { + response = {DIV_BY_ZERO, FAST, request.id, 0}; + } else { + result = request.first_operand / request.second_operand; response = {OK, FAST, request.id, result}; - break; - case DIV: - if (request.second_operand == 0) { - response = {DIV_BY_ZERO, FAST, request.id, 0}; - } else { - result = request.first_operand / request.second_operand; - response = {OK, FAST, request.id, result}; - } - break; - case FACT: - response = {WAIT_FOR_RESULT, SLOW, request.id, 0}; - thread = new std::thread(&server::process_fact, this, socket_descriptor, request); - slow_ops_pool.insert(socket_descriptor, request.id, thread); - break; - case SQRT: - response = {WAIT_FOR_RESULT, SLOW, request.id, 0}; - thread = new std::thread(&server::process_sqrt, this, socket_descriptor, request); - slow_ops_pool.insert(socket_descriptor, request.id, thread); - break; - default: - response = {UNKNOWN_OPERATION, FAST, request.id, 0}; - break; - } - if (socket_write_response(socket_descriptor, response) < 0) { + } + break; + case FACT: + response = handle_slow_function(request, &server::process_fact); + break; + case SQRT: + response = handle_slow_function(request, &server::process_sqrt); + break; + case LONG_COMPUTATION_RESULT: + // If it's not in computation pool, then it doesn't exists. + response = {UNKNOWN_OPERATION, SLOW, request.id, 0}; + break; + default: + response = {UNKNOWN_OPERATION, FAST, request.id, 0}; break; - } } - client_socket_pool.remove(socket_descriptor); - close(socket_descriptor); - log("Server: closed socket " + std::to_string(socket_descriptor)); + return response; +} + +struct dctp_response_header server::handle_slow_function( + struct dctp_request_header request, + void(server::* process)(int, struct dctp_request_header) + ) { + struct dctp_response_header response{WAIT_FOR_RESULT, SLOW, request.id, 0}; + computation_response_pool.insert(client_addr, request.id, request_type::LONG_COMPUTATION_RESULT, response); + + std::thread* thread = new std::thread(process, this, socket_descriptor, request); + slow_ops_pool.insert(socket_descriptor, request.id, thread); + return response; } void server::process_sqrt(int socket_descriptor, struct dctp_request_header request) { - sleep(2); + sleep(3); struct dctp_response_header response{}; if (request.first_operand < 0) { response = {SQRT_OF_NEGATIVE, SLOW, request.id, 0}; @@ -138,12 +147,12 @@ void server::process_sqrt(int socket_descriptor, struct dctp_request_header requ response = {OK, SLOW, request.id, result}; } - socket_write_response(socket_descriptor, response); + computation_response_pool.insert(client_addr, request.id, request_type::LONG_COMPUTATION_RESULT, response); slow_ops_pool.remove(socket_descriptor, request.id); } void server::process_fact(int socket_descriptor, struct dctp_request_header request) { - sleep(2); + sleep(3); struct dctp_response_header response{}; if (request.first_operand < 0) { response = {FACT_OF_NEGATIVE, SLOW, request.id, 0}; @@ -157,7 +166,7 @@ void server::process_fact(int socket_descriptor, struct dctp_request_header requ response = {OK, SLOW, request.id, result}; } - socket_write_response(socket_descriptor, response); + computation_response_pool.insert(client_addr, request.id, request_type::LONG_COMPUTATION_RESULT, response); slow_ops_pool.remove(socket_descriptor, request.id); }