diff --git a/.github/workflows/sharedmemtest-c++.yml b/.github/workflows/sharedmemtest-c++.yml new file mode 100644 index 00000000..1df14f27 --- /dev/null +++ b/.github/workflows/sharedmemtest-c++.yml @@ -0,0 +1,39 @@ +name: shared-mem-test-c++ + +env: + usingSharedMem: '' +on: + push: + branches: + - 'main' + +jobs: + + test: + runs-on: ubuntu-latest + container: ubuntu:latest + + services: + model: + image: linusseelinger/model-exahype-tsunami + ports: + - 4242:4242 + options: --ipc=host + + steps: + - + name: Checkout + uses: actions/checkout@v2 + - + name: Dependencies + run: | + apt update; DEBIAN_FRONTEND="noninteractive" apt install -y g++ libssl-dev + - + name: Build and run + run: | + cd clients/c++ && ./build.sh && checkShMem=$(./http-client model:4242 | grep -o "not accessible"); echo "CheckShMem=$checkShMem" >> $GITHUB_ENV + - + name: Check if Shared Memory is used + if: env.CheckShMem == 'not accessible' + run: | + echo "Shared Memory not available, failed"; exit 1 diff --git a/.github/workflows/sharedmemtest-python.yml b/.github/workflows/sharedmemtest-python.yml new file mode 100644 index 00000000..fbf60610 --- /dev/null +++ b/.github/workflows/sharedmemtest-python.yml @@ -0,0 +1,38 @@ +name: shared-mem-test-python + +on: + push: + branches: + - 'main' + +jobs: + + test: + runs-on: ubuntu-latest + container: ubuntu:latest + + services: + model: + image: linusseelinger/model-exahype-tsunami:latest + ports: + - 4242:4242 + options: --ipc=host + + steps: + - + name: Checkout + uses: actions/checkout@v2 + - + name: Dependencies + run: | + apt update && DEBIAN_FRONTEND="noninteractive" apt install -y python3-pip && pip3 install umbridge + - + name: Build and run + run: | + cd clients/python && checkShMem=$(python3 umbridge-client.py http://model:4242 | grep -o "not accessible"); echo "CheckShMem=$checkShMem" >> $GITHUB_ENV + - + name: Check if Shared Memory is used + if: env.CheckShMem == 'not accessible' + run: | + echo "Shared Memory not available, failed"; exit 1 + diff --git a/lib/umbridge.h b/lib/umbridge.h index 8347a52e..02ffd48d 100644 --- a/lib/umbridge.h +++ b/lib/umbridge.h @@ -1,6 +1,15 @@ #ifndef UMBRIDGE #define UMBRIDGE +// Only enable shared memory functionality on Linux as it supports POSIX standard (Apple Mac probably too, needs testing of shared memory and pthread_self). +// TO-DO?: Future support for Windows will require a shared memory vector implementation using WinAPI considering different behaviour than POSIX. +#if defined __linux__ +#define SUPPORT_POSIX_SHMEM +#endif +#ifdef SUPPORT_POSIX_SHMEM +#include +#include +#endif // #define LOGGING // Increase timeout to allow for long-running models. @@ -86,6 +95,59 @@ namespace umbridge { } } +#ifdef SUPPORT_POSIX_SHMEM + class SharedMemoryVector + { + public: + SharedMemoryVector(std::size_t size, std::string shmem_name, bool create) + : length(size * sizeof(double)), shmem_name(shmem_name) { + int oflags = O_RDWR; + if (create) { + created = true; + oflags |= O_CREAT; + } + + int fd = shm_open(shmem_name.c_str(), oflags, 0644); // Create shared memory + ftruncate(fd, length); // Set size of shared memory + if(fd < 0){ + throw std::runtime_error("Shared Memory object could not be created or found by name"); + } + + ptr = (u_char *)mmap(NULL, length, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); // Map shared memory to process + close(fd); + + assert(ptr); + } + + SharedMemoryVector(const std::vector& vector, std::string shmem_name) + : SharedMemoryVector(vector.size(), shmem_name, true) { + SetVector(vector); + } + + std::vector GetVector() { + std::vector vector(length / sizeof(double)); + memcpy(vector.data(), ptr, length); + return vector; + } + + void SetVector(const std::vector& vector) { + memcpy(ptr, vector.data(), length); + } + + ~SharedMemoryVector() { + munmap(ptr, length); + if (created) + shm_unlink(shmem_name.c_str()); + } + + private: + bool created = false; + u_char *ptr = nullptr; + off_t length = 0; + std::string shmem_name; + }; +#endif + // Client-side Model connecting to a server for the actual evaluations etc. class HTTPModel : public Model { public: @@ -114,9 +176,34 @@ namespace umbridge { supportsGradient = supported_features.value("Gradient", false); supportsApplyJacobian = supported_features.value("ApplyJacobian", false); supportsApplyHessian = supported_features.value("ApplyHessian", false); +#ifdef SUPPORT_POSIX_SHMEM + supportsEvaluateShMem = supported_features.value("EvaluateShMem", false); + supportsGradientShMem = supported_features.value("GradientShMem", false); + supportsApplyJacobianShMem = supported_features.value("ApplyJacobianShMem", false); + supportsApplyHessianShMem = supported_features.value("ApplyHessianShMem", false); +#endif } else { throw std::runtime_error("POST ModelInfo failed with error type '" + to_string(res.error()) + "'"); } +#ifdef SUPPORT_POSIX_SHMEM + // Test whether client and server are able to communicate through shared memory. Disables ShMem if test fails. + unsigned long int tid = pthread_self(); + request_body["tid"] = std::to_string(tid); + std::vector testvec = {12345.0}; + SharedMemoryVector shmem_input(testvec, "/umbridge_test_shmem_in_" + std::to_string(tid)); + SharedMemoryVector shmem_output(1, "/umbridge_test_shmem_out_" + std::to_string(tid), true); + auto res = cli.Post("/TestShMem", headers, request_body.dump(), "application/json"); + + if (shmem_output.GetVector()[0] != testvec[0]) { + supportsEvaluateShMem = false; + supportsApplyJacobianShMem = false; + supportsApplyHessianShMem = false; + supportsGradientShMem = false; + std::cout << "Server not accessible via shared memory" << std::endl; + } else { + std::cout << "Server accessible via shared memory" << std::endl; + } +#endif } std::vector GetInputSizes(const json& config_json = json::parse("{}")) const override { @@ -154,9 +241,43 @@ namespace umbridge { } std::vector> Evaluate(const std::vector>& inputs, json config_json = json::parse("{}")) override { +#ifdef SUPPORT_POSIX_SHMEM + if (supportsEvaluateShMem) { + unsigned int tid = pthread_self(); + std::vector> shmem_inputs; + for (int i = 0; i < inputs.size(); i++) { + shmem_inputs.push_back(std::make_unique(inputs[i], "/umbridge_in_" + std::to_string(tid) + "_" + std::to_string(i))); + } + std::vector> shmem_outputs; + std::vector output_sizes = GetOutputSizes(config_json); // Potential optimization: Avoid this call (e.g. share output memory with appropriate dimension from server side, sync with client via POSIX semaphore) + for (int i = 0; i < output_sizes.size(); i++) { + shmem_outputs.push_back(std::make_unique(output_sizes[i], "/umbridge_out_" + std::to_string(tid) + "_" + std::to_string(i), true)); + } - json request_body; - request_body["name"] = name; + json request_body; + request_body["tid"] = std::to_string(tid); + request_body["name"] = name; + request_body["config"] = config_json; + request_body["shmem_name"] = "/umbridge"; + request_body["shmem_num_inputs"] = inputs.size(); + for (int i = 0; i < inputs.size(); i++) { + request_body["shmem_size_" + std::to_string(i)] = inputs[i].size(); + } + if (auto res = cli.Post("/EvaluateShMem", headers, request_body.dump(), "application/json")) { + json response_body = parse_result_with_error_handling(res); + + std::vector> outputs(output_sizes.size()); + for (int i = 0; i < output_sizes.size(); i++) { + outputs[i] = shmem_outputs[i]->GetVector(); + } + return outputs; + } else { + throw std::runtime_error("POST Evaluate failed with error type '" + to_string(res.error()) + "'"); + } + } else { +#endif + json request_body; + request_body["name"] = name; request_body["input"] = json::parse("[]"); for (std::size_t i = 0; i < inputs.size(); i++) { @@ -164,17 +285,20 @@ namespace umbridge { } request_body["config"] = config_json; - if (auto res = cli.Post("/Evaluate", headers, request_body.dump(), "application/json")) { - json response_body = parse_result_with_error_handling(res); + if (auto res = cli.Post("/Evaluate", headers, request_body.dump(), "application/json")) { + json response_body = parse_result_with_error_handling(res); - std::vector> outputs(response_body["output"].size()); - for (std::size_t i = 0; i < response_body["output"].size(); i++) { - outputs[i] = response_body["output"][i].get>(); + std::vector> outputs(response_body["output"].size()); + for (std::size_t i = 0; i < response_body["output"].size(); i++) { + outputs[i] = response_body["output"][i].get>(); + } + return outputs; + } else { + throw std::runtime_error("POST Evaluate failed with error type '" + to_string(res.error()) + "'"); } - return outputs; - } else { - throw std::runtime_error("POST Evaluate failed with error type '" + to_string(res.error()) + "'"); +#ifdef SUPPORT_POSIX_SHMEM } +#endif } std::vector Gradient(unsigned int outWrt, @@ -184,23 +308,58 @@ namespace umbridge { json config_json = json::parse("{}")) override { - json request_body; - request_body["name"] = name; - request_body["outWrt"] = outWrt; - request_body["inWrt"] = inWrt; - for (std::size_t i = 0; i < inputs.size(); i++) { - request_body["input"][i] = inputs[i]; - } - request_body["sens"] = sens; - request_body["config"] = config_json; +#ifdef SUPPORT_POSIX_SHMEM + if (supportsGradientShMem) { + unsigned int tid = pthread_self(); + std::vector> shmem_inputs; + for (int i = 0; i < inputs.size(); i++) { + shmem_inputs.push_back(std::make_unique(inputs[i], "/umbridge_in_" + std::to_string(tid) + "_" + std::to_string(i))); + } + SharedMemoryVector shmem_output(inputs[inWrt].size(), "/umbridge_out_" + std::to_string(tid) + "_" + std::to_string(0), true); - if (auto res = cli.Post("/Gradient", headers, request_body.dump(), "application/json")) { - json response_body = parse_result_with_error_handling(res); + json request_body; + request_body["tid"] = std::to_string(tid); + request_body["name"] = name; + request_body["config"] = config_json; + request_body["outWrt"] = outWrt; + request_body["inWrt"] = inWrt; + request_body["shmem_name"] = "/umbridge"; + request_body["sens"] = sens; + request_body["shmem_num_inputs"] = inputs.size(); + for (int i = 0; i < inputs.size(); i++) { + request_body["shmem_size_" + std::to_string(i)] = inputs[i].size(); + } + if (auto res = cli.Post("/GradientShMem", headers, request_body.dump(), "application/json")) { + json response_body = parse_result_with_error_handling(res); - return response_body["output"].get>(); + std::vector output(inputs[inWrt].size()); + output = shmem_output.GetVector(); + return output; + } else { + throw std::runtime_error("POST Gradient failed with error type '" + to_string(res.error()) + "'"); + } } else { - throw std::runtime_error("POST Gradient failed with error type '" + to_string(res.error()) + "'"); +#endif + json request_body; + request_body["name"] = name; + request_body["outWrt"] = outWrt; + request_body["inWrt"] = inWrt; + for (std::size_t i = 0; i < inputs.size(); i++) { + request_body["input"][i] = inputs[i]; + } + request_body["sens"] = sens; + request_body["config"] = config_json; + + if (auto res = cli.Post("/Gradient", headers, request_body.dump(), "application/json")) { + json response_body = parse_result_with_error_handling(res); + + return response_body["output"].get>(); + } else { + throw std::runtime_error("POST Gradient failed with error type '" + to_string(res.error()) + "'"); + } +#ifdef SUPPORT_POSIX_SHMEM } +#endif } std::vector ApplyJacobian(unsigned int outWrt, @@ -209,23 +368,59 @@ namespace umbridge { const std::vector& vec, json config_json = json::parse("{}")) override { - json request_body; - request_body["name"] = name; - request_body["outWrt"] = outWrt; - request_body["inWrt"] = inWrt; - for (std::size_t i = 0; i < inputs.size(); i++) { - request_body["input"][i] = inputs[i]; - } - request_body["vec"] = vec; - request_body["config"] = config_json; +#ifdef SUPPORT_POSIX_SHMEM + if (supportsApplyJacobianShMem) { + unsigned int tid = pthread_self(); + std::vector> shmem_inputs; + for (int i = 0; i < inputs.size(); i++) { + shmem_inputs.push_back(std::make_unique(inputs[i], "/umbridge_in_" + std::to_string(tid) + "_" + std::to_string(i))); + } + std::vector output_sizes = GetOutputSizes(config_json); // Potential optimization: Avoid this call (e.g. share output memory with appropriate dimension from server side, sync with client via POSIX semaphore) + SharedMemoryVector shmem_output(output_sizes[outWrt], "/umbridge_out_" + std::to_string(tid) + "_" + std::to_string(0), true); - if (auto res = cli.Post("/ApplyJacobian", headers, request_body.dump(), "application/json")) { - json response_body = parse_result_with_error_handling(res); + json request_body; + request_body["tid"] = std::to_string(tid); + request_body["name"] = name; + request_body["config"] = config_json; + request_body["outWrt"] = outWrt; + request_body["inWrt"] = inWrt; + request_body["vec"] = vec; + request_body["shmem_name"] = "/umbridge"; + request_body["shmem_num_inputs"] = inputs.size(); + for (int i = 0; i < inputs.size(); i++) { + request_body["shmem_size_" + std::to_string(i)] = inputs[i].size(); + } + if (auto res = cli.Post("/ApplyJacobianShMem", headers, request_body.dump(), "application/json")) { + json response_body = parse_result_with_error_handling(res); - return response_body["output"].get>(); + std::vector output(output_sizes[outWrt]); + output = shmem_output.GetVector(); + return output; + } else { + throw std::runtime_error("POST ApplyJacobian failed with error type '" + to_string(res.error()) + "'"); + } } else { - throw std::runtime_error("POST ApplyJacobian failed with error type '" + to_string(res.error()) + "'"); +#endif + json request_body; + request_body["name"] = name; + request_body["outWrt"] = outWrt; + request_body["inWrt"] = inWrt; + for (std::size_t i = 0; i < inputs.size(); i++) { + request_body["input"][i] = inputs[i]; + } + request_body["vec"] = vec; + request_body["config"] = config_json; + + if (auto res = cli.Post("/ApplyJacobian", headers, request_body.dump(), "application/json")) { + json response_body = parse_result_with_error_handling(res); + + return response_body["output"].get>(); + } else { + throw std::runtime_error("POST ApplyJacobian failed with error type '" + to_string(res.error()) + "'"); + } +#ifdef SUPPORT_POSIX_SHMEM } +#endif } std::vector ApplyHessian(unsigned int outWrt, @@ -236,25 +431,64 @@ namespace umbridge { const std::vector& vec, json config_json = json::parse("{}")) override { - json request_body; - request_body["name"] = name; - request_body["outWrt"] = outWrt; - request_body["inWrt1"] = inWrt1; - request_body["inWrt2"] = inWrt2; - for (std::size_t i = 0; i < inputs.size(); i++) { - request_body["input"][i] = inputs[i]; - } - request_body["sens"] = sens; - request_body["vec"] = vec; - request_body["config"] = config_json; +#ifdef SUPPORT_POSIX_SHMEM + if (supportsApplyHessianShMem) { + unsigned int tid = pthread_self(); + std::vector> shmem_inputs; + for (int i = 0; i < inputs.size(); i++) { + shmem_inputs.push_back(std::make_unique(inputs[i], "/umbridge_in_" + std::to_string(tid) + "_" + std::to_string(i))); + } + std::vector output_sizes = GetOutputSizes(config_json); // Potential optimization: Avoid this call (e.g. share output memory with appropriate dimension from server side, sync with client via POSIX semaphore) - if (auto res = cli.Post("/ApplyHessian", headers, request_body.dump(), "application/json")) { - json response_body = parse_result_with_error_handling(res); + SharedMemoryVector shmem_output(output_sizes[outWrt], "/umbridge_out_" + std::to_string(tid) + "_" + std::to_string(0), true); + + json request_body; + request_body["tid"] = std::to_string(tid); + request_body["name"] = name; + request_body["config"] = config_json; + request_body["outWrt"] = outWrt; + request_body["inWrt1"] = inWrt1; + request_body["inWrt2"] = inWrt2; + request_body["shmem_name"] = "/umbridge"; + request_body["sens"] = sens; + request_body["vec"] = vec; + request_body["shmem_num_inputs"] = inputs.size(); + for (int i = 0; i < inputs.size(); i++) { + request_body["shmem_size_" + std::to_string(i)] = inputs[i].size(); + } + if (auto res = cli.Post("/ApplyHessianShMem", headers, request_body.dump(), "application/json")) { + json response_body = parse_result_with_error_handling(res); - return response_body["output"].get>(); + std::vector output(output_sizes[outWrt]); + output = shmem_output.GetVector(); + return output; + } else { + throw std::runtime_error("POST ApplyHessian failed with error type '" + to_string(res.error()) + "'"); + } } else { - throw std::runtime_error("POST ApplyHessian failed with error type '" + to_string(res.error()) + "'"); +#endif + json request_body; + request_body["name"] = name; + request_body["outWrt"] = outWrt; + request_body["inWrt1"] = inWrt1; + request_body["inWrt2"] = inWrt2; + for (std::size_t i = 0; i < inputs.size(); i++) { + request_body["input"][i] = inputs[i]; + } + request_body["sens"] = sens; + request_body["vec"] = vec; + request_body["config"] = config_json; + + if (auto res = cli.Post("/ApplyHessian", headers, request_body.dump(), "application/json")) { + json response_body = parse_result_with_error_handling(res); + + return response_body["output"].get>(); + } else { + throw std::runtime_error("POST ApplyHessian failed with error type '" + to_string(res.error()) + "'"); + } +#ifdef SUPPORT_POSIX_SHMEM } +#endif } bool SupportsEvaluate() override { @@ -279,6 +513,10 @@ namespace umbridge { bool supportsGradient = false; bool supportsApplyJacobian = false; bool supportsApplyHessian = false; + bool supportsEvaluateShMem = false; + bool supportsGradientShMem = false; + bool supportsApplyJacobianShMem = false; + bool supportsApplyHessianShMem = false; json parse_result_with_error_handling(const httplib::Result& res) const { json response_body; @@ -482,7 +720,47 @@ namespace umbridge { res.set_content(response_body.dump(), "application/json"); }); +#ifdef SUPPORT_POSIX_SHMEM + svr.Post("/EvaluateShMem", [&](const httplib::Request &req, httplib::Response &res) { + json request_body = json::parse(req.body); + if (!check_model_exists(models, request_body["name"], res)) + return; + Model& model = get_model_from_name(models, request_body["name"]); + + if (!model.SupportsEvaluate()) { + write_unsupported_feature_response(res, "Evaluate"); + return; + } + + std::vector> inputs; + for (int i = 0; i < request_body["shmem_num_inputs"].get(); i++) { + SharedMemoryVector shmem_input(request_body["shmem_size_" + std::to_string(i)].get(), request_body["shmem_name"].get() + "_in_" + request_body["tid"].get() + "_" + std::to_string(i), false); + inputs.push_back(shmem_input.GetVector()); + } + std::vector> shmem_outputs; + for (int i = 0; i < model.GetOutputSizes().size(); i++) { + shmem_outputs.push_back(std::make_unique(model.GetOutputSizes()[i], request_body["shmem_name"].get() + "_out_" + request_body["tid"].get() + "_" + std::to_string(i), false)); + } + + json empty_default_config; + json config_json = request_body.value("config", empty_default_config); + + if (!check_input_sizes(inputs, config_json, model, res)) + return; + + const std::lock_guard model_lock(model_mutex); + std::vector> outputs = model.Evaluate(inputs, config_json); + + if (!check_output_sizes(outputs, config_json, model, res)) + return; + + for (std::size_t i = 0; i < outputs.size(); i++) { + shmem_outputs[i]->SetVector(outputs[i]); + } + json response_body; + res.set_content(response_body.dump(), "application/json"); }); +#endif svr.Post("/Gradient", [&](const httplib::Request &req, httplib::Response &res) { json request_body = json::parse(req.body); if (error_checks && !check_model_exists(models, request_body["name"], res)) @@ -531,6 +809,52 @@ namespace umbridge { res.set_content(response_body.dump(), "application/json"); }); +#ifdef SUPPORT_POSIX_SHMEM + svr.Post("/GradientShMem", [&](const httplib::Request &req, httplib::Response &res) { + json request_body = json::parse(req.body); + if (!check_model_exists(models, request_body["name"], res)) + return; + Model& model = get_model_from_name(models, request_body["name"]); + + if (!model.SupportsGradient()) { + write_unsupported_feature_response(res, "Gradient"); + return; + } + + unsigned int inWrt = request_body.at("inWrt"); + unsigned int outWrt = request_body.at("outWrt"); + + std::vector> inputs; + for (int i = 0; i < request_body["shmem_num_inputs"].get(); i++) { + SharedMemoryVector shmem_input(request_body["shmem_size_" + std::to_string(i)].get(), request_body["shmem_name"].get() + "_in_" + request_body["tid"].get() + "_" + std::to_string(i), false); + inputs.push_back(shmem_input.GetVector()); + } + SharedMemoryVector shmem_output(inputs[inWrt].size(), request_body["shmem_name"].get() + "_out_" + request_body["tid"].get() + "_" + std::to_string(0), false); + + std::vector sens = request_body.at("sens"); + + json empty_default_config; + json config_json = request_body.value("config", empty_default_config); + + if (!check_input_wrt(inWrt, config_json, model, res)) + return; + if (!check_output_wrt(outWrt, config_json, model, res)) + return; + if (!check_input_sizes(inputs, config_json, model, res)) + return; + if (!check_sensitivity_size(sens, outWrt, config_json, model, res)) + return; + + const std::lock_guard model_lock(model_mutex); + std::vector gradient = model.Gradient(outWrt, inWrt, inputs, sens, config_json); + + shmem_output.SetVector(gradient); + json response_body; + + + res.set_content(response_body.dump(), "application/json"); + }); +#endif svr.Post("/ApplyJacobian", [&](const httplib::Request &req, httplib::Response &res) { json request_body = json::parse(req.body); @@ -578,9 +902,51 @@ namespace umbridge { json response_body; response_body["output"] = jacobian_action; - res.set_content(response_body.dump(), "application/json"); - }); + res.set_content(response_body.dump(), "application/json"); }); +#ifdef SUPPORT_POSIX_SHMEM + svr.Post("/ApplyJacobianShMem", [&](const httplib::Request &req, httplib::Response &res) { + json request_body = json::parse(req.body); + if (!check_model_exists(models, request_body["name"], res)) + return; + Model& model = get_model_from_name(models, request_body["name"]); + + if (!model.SupportsApplyJacobian()) { + write_unsupported_feature_response(res, "ApplyJacobian"); + return; + } + + unsigned int inWrt = request_body.at("inWrt"); + unsigned int outWrt = request_body.at("outWrt"); + + std::vector> inputs; + for (int i = 0; i < request_body["shmem_num_inputs"].get(); i++) { + SharedMemoryVector shmem_input(request_body["shmem_size_" + std::to_string(i)].get(), request_body["shmem_name"].get() + "_in_" + request_body["tid"].get() + "_" + std::to_string(i), false); + inputs.push_back(shmem_input.GetVector()); + } + SharedMemoryVector shmem_output(model.GetOutputSizes()[outWrt], request_body["shmem_name"].get() + "_out_" + request_body["tid"].get() + "_" + std::to_string(0), false); + + std::vector vec = request_body.at("vec"); + + json empty_default_config; + json config_json = request_body.value("config", empty_default_config); + if (!check_input_wrt(inWrt, config_json, model, res)) + return; + if (!check_output_wrt(outWrt, config_json, model, res)) + return; + if (!check_input_sizes(inputs, config_json, model, res)) + return; + if (!check_vector_size(vec, inWrt, config_json, model, res)) + return; + + const std::lock_guard model_lock(model_mutex); + std::vector jacobian_action = model.ApplyJacobian(outWrt, inWrt, inputs, vec, config_json); + + json response_body; + shmem_output.SetVector(jacobian_action); + + res.set_content(response_body.dump(), "application/json"); }); +#endif svr.Post("/ApplyHessian", [&](const httplib::Request &req, httplib::Response &res) { json request_body = json::parse(req.body); if (error_checks && !check_model_exists(models, request_body["name"], res)) @@ -633,7 +999,55 @@ namespace umbridge { res.set_content(response_body.dump(), "application/json"); }); +#ifdef SUPPORT_POSIX_SHMEM + svr.Post("/ApplyHessianShMem", [&](const httplib::Request &req, httplib::Response &res) { + json request_body = json::parse(req.body); + if (!check_model_exists(models, request_body["name"], res)) + return; + Model& model = get_model_from_name(models, request_body["name"]); + + if (!model.SupportsApplyHessian()) { + write_unsupported_feature_response(res, "ApplyHessian"); + return; + } + + unsigned int outWrt = request_body.at("outWrt"); + unsigned int inWrt1 = request_body.at("inWrt1"); + unsigned int inWrt2 = request_body.at("inWrt2"); + + std::vector> inputs; + for (int i = 0; i < request_body["shmem_num_inputs"].get(); i++) { + SharedMemoryVector shmem_input(request_body["shmem_size_" + std::to_string(i)].get(), request_body["shmem_name"].get() + "_in_" + request_body["tid"].get() + "_" + std::to_string(i), false); + inputs.push_back(shmem_input.GetVector()); + } + SharedMemoryVector shmem_output(model.GetOutputSizes()[outWrt], request_body["shmem_name"].get() + "_out_" + request_body["tid"].get() + "_" + std::to_string(0), false); + + std::vector sens = request_body.at("sens"); + std::vector vec = request_body.at("vec"); + + json empty_default_config; + json config_json = request_body.value("config", empty_default_config); + + if (!check_input_wrt(inWrt1, config_json, model, res)) + return; + if (!check_input_wrt(inWrt2, config_json, model, res)) + return; + if (!check_output_wrt(outWrt, config_json, model, res)) + return; + if (!check_input_sizes(inputs, config_json, model, res)) + return; + if (!check_sensitivity_size(sens, outWrt, config_json, model, res)) + return; + + const std::lock_guard model_lock(model_mutex); + std::vector hessian_action = model.ApplyHessian(outWrt, inWrt1, inWrt2, inputs, sens, vec, config_json); + + json response_body; + shmem_output.SetVector(hessian_action); + res.set_content(response_body.dump(), "application/json"); + }); +#endif svr.Get("/Info", [&](const httplib::Request &, httplib::Response &res) { json response_body; response_body["protocolVersion"] = 1.0; @@ -655,10 +1069,13 @@ namespace umbridge { json response_body; response_body["support"] = {}; response_body["support"]["Evaluate"] = model.SupportsEvaluate(); + response_body["support"]["EvaluateShMem"] = model.SupportsEvaluate(); response_body["support"]["Gradient"] = model.SupportsGradient(); + response_body["support"]["GradientShMem"] = model.SupportsGradient(); response_body["support"]["ApplyJacobian"] = model.SupportsApplyJacobian(); + response_body["support"]["ApplyJacobianShMem"] = model.SupportsApplyJacobian(); response_body["support"]["ApplyHessian"] = model.SupportsApplyHessian(); - + response_body["support"]["ApplyHessianShMem"] = model.SupportsApplyHessian(); res.set_content(response_body.dump(), "application/json"); }); @@ -691,7 +1108,24 @@ namespace umbridge { res.set_content(response_body.dump(), "application/json"); }); - +#ifdef SUPPORT_POSIX_SHMEM + svr.Post("/TestShMem", [&](const httplib::Request &req, httplib::Response &res) { + json request_body = json::parse(req.body); + if (!check_model_exists(models, request_body["name"], res)) + return; + Model &model = get_model_from_name(models, request_body["name"]); + json response_body; + try { + SharedMemoryVector shmem_input(1, "/umbridge_test_shmem_in_" + request_body["tid"].get(), false); + SharedMemoryVector shmem_output(1, "/umbridge_test_shmem_out_" + request_body["tid"].get(), false); + std::vector value = shmem_input.GetVector(); + shmem_output.SetVector(value); + response_body["value"] = value; + } + catch(std::exception){} + res.set_content(response_body.dump(), "application/json"); + }); +#endif std::cout << "Listening on port " << port << "..." << std::endl; #ifdef LOGGING diff --git a/umbridge/um.py b/umbridge/um.py index ee525889..e35f7edf 100755 --- a/umbridge/um.py +++ b/umbridge/um.py @@ -2,6 +2,9 @@ import requests import asyncio from concurrent.futures import ThreadPoolExecutor +from multiprocessing import shared_memory +import numpy as np +import threading class Model(object): @@ -51,6 +54,38 @@ def __init__(self, url, name): self.__supports_gradient = response["support"].get("Gradient", False) self.__supports_apply_jacobian = response["support"].get("ApplyJacobian", False) self.__supports_apply_hessian = response["support"].get("ApplyHessian", False) + self.__supports_evaluate_shmem = response["support"].get("EvaluateShMem", False) + self.__supports_gradient_shmem = response["support"].get("GradientShMem", False) + self.__supports_apply_jacobian_shmem = response["support"].get("ApplyJacobianShMem", False) + self.__supports_apply_hessian_shmem = response["support"].get("ApplyHessianShMem", False) + + #Test whether client and server are able to communicate through shared memory. Disables ShMem if test fails. + testvec = [12345.0] + tid = threading.get_native_id() + input["tid"] = str(tid) + shm_c_in = shared_memory.SharedMemory("/umbridge_test_shmem_in_" + str(tid), True, 8) + raw_shmem_input = np.ndarray(1, dtype=np.float64, buffer=shm_c_in.buf) + raw_shmem_input[:] = testvec[0] + shm_c_out = shared_memory.SharedMemory("/umbridge_test_shmem_out_" + str(tid), create=True, size=8) + raw_shmem_output = np.ndarray(1, dtype=np.float64, buffer=shm_c_out.buf) + try: response = requests.post(f"{self.url}/TestShMem", json=input).json() + except: pass + result = [] + result.append(raw_shmem_output.tolist()[0]) + shm_c_in.close() + shm_c_in.unlink() + shm_c_out.close() + shm_c_out.unlink() + + if(result[0] != testvec[0]): + self.__supports_evaluate_shmem = False + self.__supports_gradient_shmem = False + self.__supports_apply_jacobian_shmem = False + self.__supports_apply_hessian_shmem = False + print("Server not accessible via shared memory") + else: + print("Server accessible via shared memory") + def get_input_sizes(self, config={}): input = {} @@ -78,6 +113,18 @@ def supports_apply_jacobian(self): def supports_apply_hessian(self): return self.__supports_apply_hessian + def supports_evaluate_shmem(self): + return self.__supports_evaluate_shmem + + def supports_gradient_shmem(self): + return self.__supports_gradient_shmem + + def supports_apply_jacobian_shmem(self): + return self.__supports_apply_jacobian_shmem + + def supports_apply_hessian_shmem(self): + return self.__supports_apply_hessian_shmem + def __check_input_is_list_of_lists(self,parameters): if not isinstance(parameters, list): raise Exception("Parameters must be a list of lists!") @@ -88,72 +135,230 @@ def __call__(self, parameters, config={}): if not self.supports_evaluate(): raise Exception('Evaluation not supported by model!') self.__check_input_is_list_of_lists(parameters) - - inputParams = {} - inputParams["name"] = self.name - inputParams["input"] = parameters - inputParams["config"] = config - response = requests.post(f"{self.url}/Evaluate", json=inputParams).json() - - if "error" in response: - raise Exception(f'Model returned error of type {response["error"]["type"]}: {response["error"]["message"]}') - return response["output"] - + if(self.supports_evaluate_shmem()): + tid = threading.get_native_id() + inputParams = {} + inputParams["tid"] = str(tid) + inputParams["name"] = self.name + inputParams["config"] = config + inputParams["shmem_name"] = "/umbridge" + inputParams["shmem_num_inputs"] = len(parameters) + buffers = [] + + for i in range(len(parameters)): + inputParams["shmem_size_" + str(i)] = len(parameters[i]) + shm_c_in = shared_memory.SharedMemory(inputParams["shmem_name"] + "_in_" + str(tid) + f"_{i}", create=True, size=len(parameters[i])*8) + raw_shmem_input = np.ndarray((len(parameters[i]),), dtype=np.float64, buffer=shm_c_in.buf) + raw_shmem_input[:] = parameters[i] + buffers.append(shm_c_in) + output_sizes = self.get_output_sizes(config) + + for i in range(len(output_sizes)): + shm_c_out = shared_memory.SharedMemory(inputParams["shmem_name"] + "_out_" + str(tid) + f"_{i}", create=True, size=output_sizes[i]*8) + raw_shmem_input = np.ndarray((output_sizes[i],), dtype=np.float64, buffer=shm_c_in.buf) + buffers.append(shm_c_out) + response = requests.post(f"{self.url}/EvaluateShMem", json=inputParams).json() + output = [] + for i in range(len(output_sizes)): + shm_c_out = shared_memory.SharedMemory(inputParams["shmem_name"] + "_out_" + str(tid) + f"_{i}", create=False, size=output_sizes[i]*8) + raw_shmem_output = np.ndarray((output_sizes[i],), dtype=np.float64, buffer=shm_c_out.buf) + output.append(raw_shmem_output.tolist()) + + for buffer in buffers: + buffer.close() + buffer.unlink() + + if response is not None and "error" in response: + raise Exception(f'Model returned error of type {response["error"]["type"]}: {response["error"]["message"]}') + return output + + else: + + inputParams = {} + inputParams["name"] = self.name + inputParams["input"] = parameters + inputParams["config"] = config + response = requests.post(f"{self.url}/Evaluate", json=inputParams).json() + + if "error" in response: + raise Exception(f'Model returned error of type {response["error"]["type"]}: {response["error"]["message"]}') + return response["output"] + def gradient(self, out_wrt, in_wrt, parameters, sens, config={}): if not self.supports_gradient(): raise Exception('Gradient not supported by model!') self.__check_input_is_list_of_lists(parameters) - - inputParams = {} - inputParams["name"] = self.name - inputParams["outWrt"] = out_wrt - inputParams["inWrt"] = in_wrt - inputParams["input"] = parameters - inputParams["sens"] = sens - inputParams["config"] = config - response = requests.post(f"{self.url}/Gradient", json=inputParams).json() - - if "error" in response: - raise Exception(f'Model returned error of type {response["error"]["type"]}: {response["error"]["message"]}') - return response["output"] + if(self.supports_gradient_shmem()): + tid = threading.get_native_id() + inputParams = {} + inputParams["tid"] = str(tid) + inputParams["name"] = self.name + inputParams["outWrt"] = out_wrt + inputParams["inWrt"] = in_wrt + inputParams["sens"] = sens + inputParams["config"] = config + inputParams["shmem_name"] = "/umbridge" + inputParams["shmem_num_inputs"] = len(parameters) + buffers = [] + + for i in range(len(parameters)): + inputParams["shmem_size_" + str(i)] = len(parameters[i]) + shm_c_in = shared_memory.SharedMemory(inputParams["shmem_name"] + "_in_" + str(tid) + f"_{i}", create=True, size=len(parameters[i])*8) + raw_shmem_input = np.ndarray((len(parameters[i]),), dtype=np.float64, buffer=shm_c_in.buf) + raw_shmem_input[:] = parameters[i] + buffers.append(shm_c_in) + + shm_c_out = shared_memory.SharedMemory(inputParams["shmem_name"] + "_out_" + str(tid) + f"_{0}", create=True, size=len(parameters[in_wrt])*8) + raw_shmem_input = np.ndarray((len(parameters[in_wrt]),), dtype=np.float64, buffer=shm_c_in.buf) + buffers.append(shm_c_out) + response = requests.post(f"{self.url}/GradientShMem", json=inputParams).json() + + output = [] + shm_c_out = shared_memory.SharedMemory(inputParams["shmem_name"] + "_out_" + str(tid) + f"_{0}", create=False, size=len(parameters[in_wrt])*8) + raw_shmem_output = np.ndarray((len(parameters[in_wrt]),), dtype=np.float64, buffer=shm_c_out.buf) + output = raw_shmem_output.tolist() + for buffer in buffers: + buffer.close() + buffer.unlink() + + if response is not None and "error" in response: + raise Exception(f'Model returned error of type {response["error"]["type"]}: {response["error"]["message"]}') + return output + + else: + + inputParams = {} + inputParams["name"] = self.name + inputParams["outWrt"] = out_wrt + inputParams["inWrt"] = in_wrt + inputParams["input"] = parameters + inputParams["sens"] = sens + inputParams["config"] = config + response = requests.post(f"{self.url}/Gradient", json=inputParams).json() + + if "error" in response: + raise Exception(f'Model returned error of type {response["error"]["type"]}: {response["error"]["message"]}') + return response["output"] def apply_jacobian(self, out_wrt, in_wrt, parameters, vec, config={}): if not self.supports_apply_jacobian(): raise Exception('ApplyJacobian not supported by model!') self.__check_input_is_list_of_lists(parameters) - - inputParams = {} - inputParams["name"] = self.name - inputParams["outWrt"] = out_wrt - inputParams["inWrt"] = in_wrt - inputParams["input"] = parameters - inputParams["vec"] = vec - inputParams["config"] = config - response = requests.post(f"{self.url}/ApplyJacobian", json=inputParams).json() - - if "error" in response: - raise Exception(f'Model returned error of type {response["error"]["type"]}: {response["error"]["message"]}') - return response["output"] + if(self.supports_apply_jacobian_shmem()): + tid = threading.get_native_id() + inputParams = {} + inputParams["tid"] = str(tid) + inputParams["name"] = self.name + inputParams["outWrt"] = out_wrt + inputParams["inWrt"] = in_wrt + inputParams["vec"] = vec + inputParams["config"] = config + inputParams["shmem_name"] = "/umbridge" + inputParams["shmem_num_inputs"] = len(parameters) + buffers = [] + + for i in range(len(parameters)): + inputParams["shmem_size_" + str(i)] = len(parameters[i]) + shm_c_in = shared_memory.SharedMemory(inputParams["shmem_name"] + "_in_" + str(tid) + f"_{i}" , create=True, size=len(parameters[i])*8) + raw_shmem_input = np.ndarray((len(parameters[i]),), dtype=np.float64, buffer=shm_c_in.buf) + raw_shmem_input[:] = parameters[i] + buffers.append(shm_c_in) + + output_sizes = self.get_output_sizes(config) + + shm_c_out = shared_memory.SharedMemory(inputParams["shmem_name"] + "_out_" + str(tid) + f"_{0}", create=True, size=output_sizes[out_wrt]*8) + raw_shmem_input = np.ndarray((output_sizes[out_wrt],), dtype=np.float64, buffer=shm_c_in.buf) + buffers.append(shm_c_out) + + response = requests.post(f"{self.url}/ApplyJacobianShMem", json=inputParams).json() + + output = [] + shm_c_out = shared_memory.SharedMemory(inputParams["shmem_name"] + "_out_" + str(tid) + f"_{0}", create=False, size=output_sizes[out_wrt]*8) + raw_shmem_output = np.ndarray((output_sizes[out_wrt],), dtype=np.float64, buffer=shm_c_out.buf) + output = raw_shmem_output.tolist() + for buffer in buffers: + buffer.close() + buffer.unlink() + if response is not None and "error" in response: + raise Exception(f'Model returned error of type {response["error"]["type"]}: {response["error"]["message"]}') + return output + + else: + + inputParams = {} + inputParams["name"] = self.name + inputParams["outWrt"] = out_wrt + inputParams["inWrt"] = in_wrt + inputParams["input"] = parameters + inputParams["vec"] = vec + inputParams["config"] = config + response = requests.post(f"{self.url}/ApplyJacobian", json=inputParams).json() + + if "error" in response: + raise Exception(f'Model returned error of type {response["error"]["type"]}: {response["error"]["message"]}') + return response["output"] def apply_hessian(self, out_wrt, in_wrt1, in_wrt2, parameters, sens, vec, config={}): if not self.supports_apply_hessian(): raise Exception('ApplyHessian not supported by model!') self.__check_input_is_list_of_lists(parameters) - - inputParams = {} - inputParams["name"] = self.name - inputParams["outWrt"] = out_wrt - inputParams["inWrt1"] = in_wrt1 - inputParams["inWrt2"] = in_wrt2 - inputParams["input"] = parameters - inputParams["sens"] = sens - inputParams["vec"] = vec - inputParams["config"] = config - response = requests.post(f"{self.url}/ApplyHessian", json=inputParams).json() - - if "error" in response: - raise Exception(f'Model returned error of type {response["error"]["type"]}: {response["error"]["message"]}') - return response["output"] + if(self.supports_apply_hessian_shmem()): + tid = threading.get_native_id() + inputParams = {} + inputParams["tid"] = str(tid) + inputParams["name"] = self.name + inputParams["outWrt"] = out_wrt + inputParams["inWrt1"] = in_wrt1 + inputParams["inWrt2"] = in_wrt2 + inputParams["sens"] = sens + inputParams["vec"] = vec + inputParams["config"] = config + inputParams["shmem_name"] = "/umbridge" + inputParams["shmem_num_inputs"] = len(parameters) + buffers = [] + + for i in range(len(parameters)): + inputParams["shmem_size_" + str(i)] = len(parameters[i]) + shm_c_in = shared_memory.SharedMemory(inputParams["shmem_name"] + "_in_" + str(tid) + f"_{i}", create=True, size=len(parameters[i])*8) + raw_shmem_input = np.ndarray((len(parameters[i]),), dtype=np.float64, buffer=shm_c_in.buf) + raw_shmem_input[:] = parameters[i] + buffers.append(shm_c_in) + + output_sizes = self.get_output_sizes(config) + + shm_c_out = shared_memory.SharedMemory(inputParams["shmem_name"] + "_out_" + str(tid) + f"_{0}", create=True, size=output_sizes[out_wrt]*8) + raw_shmem_input = np.ndarray((output_sizes[out_wrt],), dtype=np.float64, buffer=shm_c_in.buf) + buffers.append(shm_c_out) + + response = requests.post(f"{self.url}/ApplyHessianShMem", json=inputParams).json() + + output = [] + shm_c_out = shared_memory.SharedMemory(inputParams["shmem_name"] + "_out_" + str(tid) + f"_{0}", create=False, size=output_sizes[out_wrt]*8) + raw_shmem_output = np.ndarray((output_sizes[out_wrt],), dtype=np.float64, buffer=shm_c_out.buf) + output = raw_shmem_output.tolist() + for buffer in buffers: + buffer.close() + buffer.unlink() + if response is not None and "error" in response: + raise Exception(f'Model returned error of type {response["error"]["type"]}: {response["error"]["message"]}') + return output + + else: + + inputParams = {} + inputParams["name"] = self.name + inputParams["outWrt"] = out_wrt + inputParams["inWrt1"] = in_wrt1 + inputParams["inWrt2"] = in_wrt2 + inputParams["input"] = parameters + inputParams["sens"] = sens + inputParams["vec"] = vec + inputParams["config"] = config + response = requests.post(f"{self.url}/ApplyHessian", json=inputParams).json() + + if "error" in response: + raise Exception(f'Model returned error of type {response["error"]["type"]}: {response["error"]["message"]}') + return response["output"] def serve_models(models, port=4242, max_workers=1): @@ -182,7 +387,6 @@ def get_model_from_name(name): @routes.post('/Evaluate') async def evaluate(request): - req_json = await request.json() model_name = req_json["name"] model = get_model_from_name(model_name) @@ -224,6 +428,59 @@ async def evaluate(request): return web.Response(text=f"{{\"output\": {output} }}") + @routes.post('/EvaluateShMem') + async def evaluate(request): + req_json = await request.json() + model_name = req_json["name"] + model = get_model_from_name(model_name) + if model is None: + return model_not_found_response(req_json["name"]) + if not model.supports_evaluate(): + return error_response("UnsupportedFeature", "Evaluate not supported by model!", 400) + + config = {} + if "config" in req_json: + config = req_json["config"] + + parameters = [] + for i in range(req_json["shmem_num_inputs"]): + shm_c_in = shared_memory.SharedMemory(req_json["shmem_name"] + "_in_" + str(req_json["tid"]) + f"_{i}", False, req_json[f"shmem_size_{i}"]) + raw_shmem_parameter = np.ndarray((req_json[f"shmem_size_{i}"],), dtype=np.float64, buffer=shm_c_in.buf) + parameters.append(raw_shmem_parameter.tolist()) + shm_c_in.close() + + # Check if parameter dimensions match model input sizes + if len(parameters) != len(model.get_input_sizes(config)): + return error_response("InvalidInput", "Number of input parameters does not match model number of model inputs!", 400) + for i in range(len(parameters)): + if len(parameters[i]) != model.get_input_sizes(config)[i]: + return error_response("InvalidInput", f"Input parameter {i} has invalid length! Expected {model.get_input_sizes(config)[i]} but got {len(parameters[i])}.", 400) + + output_future = model_executor.submit(model.__call__, parameters, config) + output = await asyncio.wrap_future(output_future) + + # Check if output is a list of lists + if not isinstance(output, list): + return error_response("InvalidOutput", "Model output is not a list of lists!", 500) + if not all (isinstance(x, list) for x in output): + return error_response("InvalidOutput", "Model output is not a list of lists!", 500) + + # Check if output dimensions match model output sizes + if len(output) != len(model.get_output_sizes(config)): + return error_response("InvalidOutput", "Number of output vectors returned by model does not match number of model outputs declared by model!", 500) + for i in range(len(output)): + if len(output[i]) != model.get_output_sizes(config)[i]: + return error_response("InvalidOutput", f"Output vector {i} has invalid length! Model declared {model.get_output_sizes(config)[i]} but returned {len(output[i])}.", 500) + + # Write output to shared memory + for i in range(len(output)): + shm_c_out = shared_memory.SharedMemory(req_json["shmem_name"] + "_out_" + str(req_json["tid"]) + f"_{i}", create=False, size=len(output[i])*8) + raw_shmem_output = np.ndarray((len(output[i]),), dtype=np.float64, buffer=shm_c_out.buf) + raw_shmem_output[:] = output[i] + shm_c_out.close() + + return web.Response(text="{}") + @routes.post('/Gradient') async def gradient(request): @@ -275,6 +532,65 @@ async def gradient(request): return web.Response(text=f"{{\"output\": {output} }}") + + @routes.post('/GradientShMem') + async def gradient(request): + req_json = await request.json() + model_name = req_json["name"] + model = get_model_from_name(model_name) + if model is None: + return model_not_found_response(req_json["name"]) + if not model.supports_gradient(): + return error_response("UnsupportedFeature", "Gradient not supported by model!", 400) + + out_wrt = req_json["outWrt"] + in_wrt = req_json["inWrt"] + sens = req_json["sens"] + config = {} + if "config" in req_json: + config = req_json["config"] + parameters = [] + for i in range(req_json["shmem_num_inputs"]): + shm_c_in = shared_memory.SharedMemory(req_json["shmem_name"] + "_in_" + str(req_json["tid"]) + f"_{i}", False, req_json[f"shmem_size_{i}"]) + raw_shmem_parameter = np.ndarray((req_json[f"shmem_size_{i}"],), dtype=np.float64, buffer=shm_c_in.buf) + parameters.append(raw_shmem_parameter.tolist()) + shm_c_in.close() + + # Check if parameter dimensions match model input sizes + if len(parameters) != len(model.get_input_sizes(config)): + return error_response("InvalidInput", "Number of input parameters does not match model number of model inputs!", 400) + for i in range(len(parameters)): + if len(parameters[i]) != model.get_input_sizes(config)[i]: + return error_response("InvalidInput", f"Input parameter {i} has invalid length! Expected {model.get_input_sizes(config)[i]} but got {len(parameters[i])}.", 400) + # Check if outWrt is not between zero and number of outputs + if out_wrt < 0 or out_wrt >= len(model.get_output_sizes(config)): + return error_response("InvalidInput", "Invalid outWrt index! Expected between 0 and number of outputs minus one, but got " + str(out_wrt), 400) + # Check if inWrt is between zero and number of inputs + if in_wrt < 0 or in_wrt >= len(model.get_input_sizes(config)): + return error_response("InvalidInput", "Invalid inWrt index! Expected between 0 and number of inputs minus one, but got " + str(in_wrt), 400) + # Check if sensitivity vector length matches model output outWrt + if len(sens) != model.get_output_sizes(config)[out_wrt]: + return error_response("InvalidInput", f"Sensitivity vector sens has invalid length! Expected {model.get_output_sizes(config)[out_wrt]} but got {len(sens)}.", 400) + + output_future = model_executor.submit(model.gradient, out_wrt, in_wrt, parameters, sens, config) + output = await asyncio.wrap_future(output_future) + + # Check if output is a list + if not isinstance(output, list): + return error_response("InvalidOutput", "Model output is not a list!", 500) + + # Check if output dimension matches model ipuut size inWrt + if len(output) != model.get_input_sizes(config)[in_wrt]: + return error_response("InvalidOutput", f"Output vector has invalid length! Model declared {model.get_input_sizes(config)[in_wrt]} but returned {len(output)}.", 500) + + # Write output to shared memory + shm_c_out = shared_memory.SharedMemory(req_json["shmem_name"] + "_out_" + str(req_json["tid"]) + f"_{0}", create=False, size=len(output)*8) + raw_shmem_output = np.ndarray((len(output),), dtype=np.float64, buffer=shm_c_out.buf) + raw_shmem_output[:] = output + shm_c_out.close() + + return web.Response(text="{}") + @routes.post('/ApplyJacobian') async def applyjacobian(request): @@ -326,6 +642,64 @@ async def applyjacobian(request): return web.Response(text=f"{{\"output\": {output} }}") + @routes.post('/ApplyJacobianShMem') + async def applyjacobian(request): + req_json = await request.json() + model_name = req_json["name"] + model = get_model_from_name(model_name) + if model is None: + return model_not_found_response(req_json["name"]) + if not model.supports_apply_jacobian(): + return error_response("UnsupportedFeature", "ApplyJacobian not supported by model!", 400) + + out_wrt = req_json["outWrt"] + in_wrt = req_json["inWrt"] + parameters = [] + for i in range(req_json["shmem_num_inputs"]): + shm_c_in = shared_memory.SharedMemory(req_json["shmem_name"] + "_in_" + str(req_json["tid"]) + f"_{i}", False, req_json[f"shmem_size_{i}"]) + raw_shmem_parameter = np.ndarray((req_json[f"shmem_size_{i}"],), dtype=np.float64, buffer=shm_c_in.buf) + parameters.append(raw_shmem_parameter.tolist()) + shm_c_in.close() + vec = req_json["vec"] + config = {} + if "config" in req_json: + config = req_json["config"] + + # Check if parameter dimensions match model input sizes + if len(parameters) != len(model.get_input_sizes(config)): + return error_response("InvalidInput", "Number of input parameters does not match model number of model inputs!", 400) + for i in range(len(parameters)): + if len(parameters[i]) != model.get_input_sizes(config)[i]: + return error_response("InvalidInput", f"Input parameter {i} has invalid length! Expected {model.get_input_sizes(config)[i]} but got {len(parameters[i])}.", 400) + # Check if outWrt is not between zero and number of outputs + if out_wrt < 0 or out_wrt >= len(model.get_output_sizes(config)): + return error_response("InvalidInput", "Invalid outWrt index! Expected between 0 and number of outputs minus one, but got " + str(out_wrt), 400) + # Check if inWrt is between zero and number of inputs + if in_wrt < 0 or in_wrt >= len(model.get_input_sizes(config)): + return error_response("InvalidInput", "Invalid inWrt index! Expected between 0 and number of inputs minus one, but got " + str(in_wrt), 400) + # Check if vector length matches model input inWrt + if len(vec) != model.get_input_sizes(config)[in_wrt]: + return error_response("InvalidInput", f"Vector vec has invalid length! Expected {model.get_input_sizes(config)[in_wrt]} but got {len(vec)}.", 400) + + output_future = model_executor.submit(model.apply_jacobian, out_wrt, in_wrt, parameters, vec, config) + output = await asyncio.wrap_future(output_future) + + # Check if output is a list + if not isinstance(output, list): + return error_response("InvalidOutput", "Model output is not a list!", 500) + + # Check if output dimension matches model output size outWrt + if len(output) != model.get_output_sizes(config)[out_wrt]: + return error_response("InvalidOutput", f"Output vector has invalid length! Model declared {model.get_output_sizes(config)[out_wrt]} but returned {len(output)}.", 500) + + # Write output to shared memory + shm_c_out = shared_memory.SharedMemory(req_json["shmem_name"] + "_out_" + str(req_json["tid"]) + f"_{0}", create=False, size=len(output)*8) + raw_shmem_output = np.ndarray((len(output),), dtype=np.float64, buffer=shm_c_out.buf) + raw_shmem_output[:] = output + shm_c_out.close() + + return web.Response(text="{}") + @routes.post('/ApplyHessian') async def applyhessian(request): @@ -378,6 +752,66 @@ async def applyhessian(request): return error_response("InvalidOutput", f"Output vector has invalid length! Model declared {output_sizes[out_wrt]} but returned {len(output)}.", 500) return web.Response(text=f"{{\"output\": {output} }}") + + @routes.post('/ApplyHessianShMem') + async def applyhessian(request): + req_json = await request.json() + model_name = req_json["name"] + model = get_model_from_name(model_name) + if model is None: + return model_not_found_response(req_json["name"]) + if not model.supports_apply_hessian(): + return error_response("UnsupportedFeature", "ApplyHessian not supported by model!", 400) + + out_wrt = req_json["outWrt"] + in_wrt1 = req_json["inWrt1"] + in_wrt2 = req_json["inWrt2"] + parameters = [] + for i in range(req_json["shmem_num_inputs"]): + shm_c_in = shared_memory.SharedMemory(req_json["shmem_name"] + "_in_" + str(req_json["tid"]) + f"_{i}", False, req_json[f"shmem_size_{i}"]) + raw_shmem_parameter = np.ndarray((req_json[f"shmem_size_{i}"],), dtype=np.float64, buffer=shm_c_in.buf) + parameters.append(raw_shmem_parameter.tolist()) + shm_c_in.close() + sens = req_json["sens"] + vec = req_json["vec"] + config = {} + if "config" in req_json: + config = req_json["config"] + + # Check if parameter dimensions match model input sizes + if len(parameters) != len(model.get_input_sizes(config)): + return error_response("InvalidInput", "Number of input parameters does not match model number of model inputs!", 400) + for i in range(len(parameters)): + if len(parameters[i]) != model.get_input_sizes(config)[i]: + return error_response("InvalidInput", f"Input parameter {i} has invalid length! Expected {model.get_input_sizes(config)[i]} but got {len(parameters[i])}.", 400) + # Check if outWrt is not between zero and number of outputs + if out_wrt < 0 or out_wrt >= len(model.get_output_sizes(config)): + return error_response("InvalidInput", "Invalid outWrt index! Expected between 0 and number of outputs minus one, but got " + str(out_wrt), 400) + # Check if inWrt is between zero and number of inputs + if in_wrt1 < 0 or in_wrt1 >= len(model.get_input_sizes(config)): + return error_response("InvalidInput", "Invalid inWrt1 index! Expected between 0 and number of inputs minus one, but got " + str(in_wrt1), 400) + # Check if inWrt is between zero and number of inputs + if in_wrt2 < 0 or in_wrt2 >= len(model.get_input_sizes(config)): + return error_response("InvalidInput", "Invalid inWrt2 index! Expected between 0 and number of inputs minus one, but got " + str(in_wrt2), 400) + + output_future = model_executor.submit(model.apply_hessian, out_wrt, in_wrt1, in_wrt2, parameters, sens, vec, config) + output = await asyncio.wrap_future(output_future) + + # Check if output is a list + if not isinstance(output, list): + return error_response("InvalidOutput", "Model output is not a list!", 500) + + # Check if output dimension matches model output size outWrt + if len(output) != model.get_output_sizes(config)[out_wrt]: + return error_response("InvalidOutput", f"Output vector has invalid length! Model declared {model.get_output_sizes(config)[out_wrt]} but returned {len(output)}.", 500) + + # Write output to shared memory + shm_c_out = shared_memory.SharedMemory(req_json["shmem_name"] + "_out_" + str(req_json["tid"]) + f"_{0}", create=False, size=len(output)*8) + raw_shmem_output = np.ndarray((len(output),), dtype=np.float64, buffer=shm_c_out.buf) + raw_shmem_output[:] = output + shm_c_out.close() + + return web.Response(text="{}") @routes.post('/InputSizes') async def get_input_sizes(request): @@ -414,12 +848,40 @@ async def modelinfo(request): return model_not_found_response(req_json["name"]) response_body = {"support": {}} response_body["support"]["Evaluate"] = model.supports_evaluate() + response_body["support"]["EvaluateShMem"] = model.supports_evaluate() response_body["support"]["Gradient"] = model.supports_gradient() + response_body["support"]["GradientShMem"] = model.supports_gradient() response_body["support"]["ApplyJacobian"] = model.supports_apply_jacobian() + response_body["support"]["ApplyJacobianShMem"] = model.supports_apply_jacobian() response_body["support"]["ApplyHessian"] = model.supports_apply_hessian() + response_body["support"]["ApplyHessianShMem"] = model.supports_apply_hessian() return web.json_response(response_body) + @routes.post('/TestShMem') + async def test_shmem(request): + req_json = await request.json() + model_name = req_json["name"] + model = get_model_from_name(model_name) + if model is None: + return model_not_found_response(req_json["name"]) + response_body= {} + try:#in case the test fails, FileNotFoundError will be thrown + parameters = [] + shm_c_in = shared_memory.SharedMemory("/umbridge_test_shmem_in_" + str(req_json["tid"]), False, 8) + raw_shmem_parameter = np.ndarray(1, dtype=np.float64, buffer=shm_c_in.buf) + parameters.append(raw_shmem_parameter.tolist()) + shm_c_in.close() + + shm_c_out = shared_memory.SharedMemory("/umbridge_test_shmem_out_" + str(req_json["tid"]), create=False, size=8) + raw_shmem_output = np.ndarray(1, dtype=np.float64, buffer=shm_c_out.buf) + raw_shmem_output[:] = parameters[0] + shm_c_out.close() + response_body["value"] = parameters[0] + except: + pass + return web.json_response(response_body) + @routes.get('/Info') async def info(request): response_body = {}