diff --git a/CMakeLists.txt b/CMakeLists.txt index aac50d17c..66cd004c0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ set(AGENT_VERSION_MAJOR 2) set(AGENT_VERSION_MINOR 5) set(AGENT_VERSION_PATCH 0) -set(AGENT_VERSION_BUILD 7) +set(AGENT_VERSION_BUILD 8) set(AGENT_VERSION_RC "") # This minimum version is to support Visual Studio 2019 and C++ feature checking and FetchContent diff --git a/demo/agent/Devices.xml b/demo/agent/Devices.xml index f3fff6ef1..94c327256 100644 --- a/demo/agent/Devices.xml +++ b/demo/agent/Devices.xml @@ -984,7 +984,7 @@ - + @@ -1068,11 +1068,11 @@ - + - + diff --git a/demo/compose/agent/agentmqtt.cfg b/demo/compose/agent/agentmqtt.cfg index f68f972cc..09bed6fed 100644 --- a/demo/compose/agent/agentmqtt.cfg +++ b/demo/compose/agent/agentmqtt.cfg @@ -34,7 +34,7 @@ Directories { } Sinks { - Mqtt2Service { + MqttService { MqttHost = mosquitto } } diff --git a/src/mtconnect/agent.hpp b/src/mtconnect/agent.hpp index 493592b8c..c0c1a0821 100644 --- a/src/mtconnect/agent.hpp +++ b/src/mtconnect/agent.hpp @@ -213,7 +213,7 @@ namespace mtconnect { /// @brief Get the MTConnect schema version the agent is supporting /// @return The MTConnect schema version as a string const auto &getSchemaVersion() const { return m_schemaVersion; } - + /// @brief Get the validation state of the agent /// @returns the validation state of the agent bool isValidating() const { return m_validation; } diff --git a/src/mtconnect/entity/entity.hpp b/src/mtconnect/entity/entity.hpp index b8a9578db..d837667b0 100644 --- a/src/mtconnect/entity/entity.hpp +++ b/src/mtconnect/entity/entity.hpp @@ -96,7 +96,7 @@ namespace mtconnect { Entity(const std::string &name, const Properties &props) : m_name(name), m_properties(props) {} Entity(const Entity &entity) - : m_name(entity.m_name), m_properties(entity.m_properties), m_order(entity.m_order) + : m_name(entity.m_name), m_properties(entity.m_properties), m_order(entity.m_order) { if (entity.m_attributes) setAttributes(*entity.m_attributes); diff --git a/src/mtconnect/pipeline/json_mapper.cpp b/src/mtconnect/pipeline/json_mapper.cpp index 8d00e421f..0c05943f6 100644 --- a/src/mtconnect/pipeline/json_mapper.cpp +++ b/src/mtconnect/pipeline/json_mapper.cpp @@ -134,12 +134,12 @@ namespace mtconnect::pipeline { { LOG(warning) << "Error while parsing json: " << e->what(); } - + props.clear(); props["VALUE"] = "UNAVAILABLE"s; if (m_pipelineContext->m_contract->isValidating()) props["quality"] = "INVALID"s; - + obs = observation::Observation::make(dataItem, props, *m_timestamp, errors); } diff --git a/src/mtconnect/pipeline/shdr_token_mapper.cpp b/src/mtconnect/pipeline/shdr_token_mapper.cpp index ad8186663..7db74b6ca 100644 --- a/src/mtconnect/pipeline/shdr_token_mapper.cpp +++ b/src/mtconnect/pipeline/shdr_token_mapper.cpp @@ -280,8 +280,7 @@ namespace mtconnect { if (reqs != nullptr) { auto obs = zipProperties(dataItem, timestamp, *reqs, token, end, errors, - m_contract->getSchemaVersion(), - m_contract->isValidating()); + m_contract->getSchemaVersion(), m_contract->isValidating()); if (dataItem->getConstantValue()) return nullptr; if (obs && source) diff --git a/src/mtconnect/sink/rest_sink/websocket_session.hpp b/src/mtconnect/sink/rest_sink/websocket_session.hpp index 7a84bfbce..94e922b9c 100644 --- a/src/mtconnect/sink/rest_sink/websocket_session.hpp +++ b/src/mtconnect/sink/rest_sink/websocket_session.hpp @@ -189,10 +189,12 @@ namespace mtconnect::sink::rest_sink { if (m_busy || m_messageQueue.size() > 0) { + LOG(debug) << "Queuing Chunk for " << *requestId; m_messageQueue.emplace_back(chunk, complete, *requestId); } else { + LOG(debug) << "Writing Chunk for " << *requestId; send(chunk, complete, *requestId); } } @@ -235,7 +237,7 @@ namespace mtconnect::sink::rest_sink { auto ref = derived().shared_ptr(); - LOG(trace) << "writing chunk for ws: " << requestId; + LOG(debug) << "writing chunk for ws: " << requestId; m_busy = true; @@ -320,7 +322,7 @@ namespace mtconnect::sink::rest_sink { if (len == 0) { - LOG(trace) << "Empty message received"; + LOG(debug) << "Empty message received"; return; } @@ -330,6 +332,8 @@ namespace mtconnect::sink::rest_sink { auto buffer = beast::buffers_to_string(m_buffer.data()); m_buffer.consume(m_buffer.size()); + LOG(debug) << "Received :" << buffer.c_str(); + Document doc; doc.Parse(buffer.c_str(), len); @@ -432,6 +436,8 @@ namespace mtconnect::sink::rest_sink { } else { + LOG(debug) << "Received request id: " << id; + res.first->second.m_request = std::move(request); if (!m_dispatch(derived().shared_ptr(), res.first->second.m_request)) { @@ -454,7 +460,7 @@ namespace mtconnect::sink::rest_sink { beast::flat_buffer m_buffer; std::map m_requests; std::mutex m_mutex; - std::atomic_bool m_busy; + std::atomic_bool m_busy {false}; std::deque m_messageQueue; bool m_isOpen {false}; }; diff --git a/src/mtconnect/source/adapter/adapter_pipeline.cpp b/src/mtconnect/source/adapter/adapter_pipeline.cpp index 1ff5ada6e..cfde83adb 100644 --- a/src/mtconnect/source/adapter/adapter_pipeline.cpp +++ b/src/mtconnect/source/adapter/adapter_pipeline.cpp @@ -21,6 +21,7 @@ #include "mtconnect/configuration/agent_config.hpp" #include "mtconnect/configuration/config_options.hpp" #include "mtconnect/pipeline/convert_sample.hpp" +#include "mtconnect/pipeline/correct_timestamp.hpp" #include "mtconnect/pipeline/deliver.hpp" #include "mtconnect/pipeline/delta_filter.hpp" #include "mtconnect/pipeline/duplicate_filter.hpp" @@ -28,7 +29,6 @@ #include "mtconnect/pipeline/timestamp_extractor.hpp" #include "mtconnect/pipeline/topic_mapper.hpp" #include "mtconnect/pipeline/upcase_value.hpp" -#include "mtconnect/pipeline/correct_timestamp.hpp" #include "mtconnect/pipeline/validator.hpp" #include "mtconnect/source/adapter/adapter.hpp" diff --git a/src/mtconnect/source/loopback_source.cpp b/src/mtconnect/source/loopback_source.cpp index 39f72cd52..65e629ef1 100644 --- a/src/mtconnect/source/loopback_source.cpp +++ b/src/mtconnect/source/loopback_source.cpp @@ -21,6 +21,7 @@ #include "mtconnect/device_model/device.hpp" #include "mtconnect/entity/xml_parser.hpp" #include "mtconnect/pipeline/convert_sample.hpp" +#include "mtconnect/pipeline/correct_timestamp.hpp" #include "mtconnect/pipeline/deliver.hpp" #include "mtconnect/pipeline/delta_filter.hpp" #include "mtconnect/pipeline/duplicate_filter.hpp" @@ -28,7 +29,6 @@ #include "mtconnect/pipeline/timestamp_extractor.hpp" #include "mtconnect/pipeline/upcase_value.hpp" #include "mtconnect/pipeline/validator.hpp" -#include "mtconnect/pipeline/correct_timestamp.hpp" using namespace std; @@ -59,7 +59,7 @@ namespace mtconnect::source { // Convert values if (IsOptionSet(m_options, configuration::ConversionRequired)) next = next->bind(make_shared()); - + if (IsOptionSet(m_options, configuration::CorrectTimestamps)) next = next->bind(make_shared(m_context)); diff --git a/test_package/correct_timestamp_test.cpp b/test_package/correct_timestamp_test.cpp index 5d73e0cae..814f19a6a 100644 --- a/test_package/correct_timestamp_test.cpp +++ b/test_package/correct_timestamp_test.cpp @@ -23,12 +23,12 @@ #include "mtconnect/buffer/checkpoint.hpp" #include "mtconnect/observation/observation.hpp" +#include "mtconnect/pipeline/correct_timestamp.hpp" #include "mtconnect/pipeline/deliver.hpp" #include "mtconnect/pipeline/delta_filter.hpp" #include "mtconnect/pipeline/period_filter.hpp" #include "mtconnect/pipeline/pipeline.hpp" #include "mtconnect/pipeline/shdr_token_mapper.hpp" -#include "mtconnect/pipeline/correct_timestamp.hpp" using namespace mtconnect; using namespace mtconnect::pipeline; @@ -169,46 +169,46 @@ TEST_F(ValidateTimestampTest, should_change_timestamp_if_time_is_moving_backward TEST_F(ValidateTimestampTest, should_handle_timestamp_in_the_future) { makeDataItem({{"id", "a"s}, {"type", "EXECUTION"s}, {"category", "EVENT"s}}); - + auto filter = make_shared(m_context); m_mapper->bind(filter); filter->bind(make_shared(m_context)); - + auto now = chrono::system_clock::now(); - + { auto os = observe({"a", "READY"}, now - 1s); auto list = os->getValue(); ASSERT_EQ(1, list.size()); - + auto obs = dynamic_pointer_cast(list.front()); ASSERT_EQ(now - 1s, obs->getTimestamp()); } - + { auto os = observe({"a", "ACTIVE"}, now + 1s); auto list = os->getValue(); ASSERT_EQ(1, list.size()); - + auto obs = dynamic_pointer_cast(list.front()); ASSERT_EQ(now + 1s, obs->getTimestamp()); } - + { auto os = observe({"a", "READY"}, now); auto list = os->getValue(); ASSERT_EQ(1, list.size()); - + auto obs = dynamic_pointer_cast(list.front()); ASSERT_LT(now, obs->getTimestamp()); ASSERT_GT(now + 10ms, obs->getTimestamp()); } - + { auto os = observe({"a", "ACTIVE"}, now + 2s); auto list = os->getValue(); ASSERT_EQ(1, list.size()); - + auto obs = dynamic_pointer_cast(list.front()); ASSERT_EQ(now + 2s, obs->getTimestamp()); } diff --git a/test_package/observation_validation_test.cpp b/test_package/observation_validation_test.cpp index 50b34b0fe..bd45fbab2 100644 --- a/test_package/observation_validation_test.cpp +++ b/test_package/observation_validation_test.cpp @@ -28,10 +28,10 @@ #include "mtconnect/device_model/data_item/data_item.hpp" #include "mtconnect/entity/entity.hpp" #include "mtconnect/observation/observation.hpp" -#include "mtconnect/pipeline/validator.hpp" +#include "mtconnect/pipeline/json_mapper.hpp" #include "mtconnect/pipeline/shdr_token_mapper.hpp" #include "mtconnect/pipeline/timestamp_extractor.hpp" -#include "mtconnect/pipeline/json_mapper.hpp" +#include "mtconnect/pipeline/validator.hpp" using namespace mtconnect; using namespace mtconnect::pipeline; @@ -52,7 +52,9 @@ int main(int argc, char *argv[]) class MockPipelineContract : public PipelineContract { public: - MockPipelineContract(int32_t schemaVersion, DataItemPtr &dataItem) : m_schemaVersion(schemaVersion), m_dataItem(dataItem) {} + MockPipelineContract(int32_t schemaVersion, DataItemPtr &dataItem) + : m_schemaVersion(schemaVersion), m_dataItem(dataItem) + {} DevicePtr findDevice(const std::string &) override { return m_device; } DataItemPtr findDataItem(const std::string &device, const std::string &name) override { @@ -74,7 +76,7 @@ class MockPipelineContract : public PipelineContract int32_t m_schemaVersion; DataItemPtr &m_dataItem; DevicePtr m_device; - bool m_validation { true }; + bool m_validation {true}; }; /// @brief Validation tests for observations @@ -85,7 +87,7 @@ class ObservationValidationTest : public testing::Test { ErrorList errors; m_dataItem = - DataItem::make({{"id", "exec"s}, {"category", "EVENT"s}, {"type", "EXECUTION"s}}, errors); + DataItem::make({{"id", "exec"s}, {"category", "EVENT"s}, {"type", "EXECUTION"s}}, errors); m_context = make_shared(); m_context->m_contract = make_unique(SCHEMA_VERSION(2, 5), m_dataItem); @@ -248,14 +250,15 @@ TEST_F(ObservationValidationTest, should_validate_data_item_types) { auto contract = static_cast(m_context->m_contract.get()); contract->m_schemaVersion = SCHEMA_VERSION(2, 5); - + shared_ptr mapper; mapper = make_shared(m_context, "", 2); mapper->bind(make_shared(TypeGuard(RUN))); ErrorList errors; - m_dataItem = - DataItem::make({{"id", "pos"s}, {"category", "SAMPLE"s}, {"type", "POSITION"s}, {"units", "MILLIMETER"s}}, errors); + m_dataItem = DataItem::make( + {{"id", "pos"s}, {"category", "SAMPLE"s}, {"type", "POSITION"s}, {"units", "MILLIMETER"s}}, + errors); auto ts = make_shared(); ts->m_tokens = {{"pos"s, "ABC"s}}; @@ -265,21 +268,20 @@ TEST_F(ObservationValidationTest, should_validate_data_item_types) auto observations = (*mapper)(ts); auto &r = *observations; ASSERT_EQ(typeid(Observations), typeid(r)); - + auto oblist = observations->getValue(); ASSERT_EQ(1, oblist.size()); - + auto oi = oblist.begin(); - + { auto sample = dynamic_pointer_cast(*oi++); ASSERT_TRUE(sample); ASSERT_EQ(m_dataItem, sample->getDataItem()); ASSERT_TRUE(sample->isUnavailable()); - + ASSERT_EQ("INVALID", sample->get("quality")); } - } TEST_F(ObservationValidationTest, should_not_validate_if_validation_is_off) @@ -287,62 +289,62 @@ TEST_F(ObservationValidationTest, should_not_validate_if_validation_is_off) auto contract = static_cast(m_context->m_contract.get()); contract->m_schemaVersion = SCHEMA_VERSION(2, 5); contract->m_validation = false; - + shared_ptr mapper; mapper = make_shared(m_context, "", 2); mapper->bind(make_shared(TypeGuard(RUN))); - + ErrorList errors; - m_dataItem = - DataItem::make({{"id", "pos"s}, {"category", "SAMPLE"s}, {"type", "POSITION"s}, {"units", "MILLIMETER"s}}, errors); - + m_dataItem = DataItem::make( + {{"id", "pos"s}, {"category", "SAMPLE"s}, {"type", "POSITION"s}, {"units", "MILLIMETER"s}}, + errors); + auto ts = make_shared(); ts->m_tokens = {{"pos"s, "ABC"s}}; ts->m_timestamp = chrono::system_clock::now(); ts->setProperty("timestamp", ts->m_timestamp); - + auto observations = (*mapper)(ts); auto &r = *observations; ASSERT_EQ(typeid(Observations), typeid(r)); - + auto oblist = observations->getValue(); ASSERT_EQ(1, oblist.size()); - + auto oi = oblist.begin(); - + { auto sample = dynamic_pointer_cast(*oi++); ASSERT_TRUE(sample); ASSERT_EQ(m_dataItem, sample->getDataItem()); ASSERT_TRUE(sample->isUnavailable()); - + ASSERT_FALSE(sample->hasProperty("quality")); } - } TEST_F(ObservationValidationTest, should_validate_json_data_item_types) { using namespace mtconnect::device_model; - + ErrorList errors; auto contract = static_cast(m_context->m_contract.get()); contract->m_schemaVersion = SCHEMA_VERSION(2, 5); Properties dev { - {"id", "3"s}, {"name", "DeviceTest2"s}, {"uuid", "UnivUniqId2"s}, {"iso841Class", "6"s}}; + {"id", "3"s}, {"name", "DeviceTest2"s}, {"uuid", "UnivUniqId2"s}, {"iso841Class", "6"s}}; auto device = dynamic_pointer_cast(Device::getFactory()->make("Device", dev, errors)); - shared_ptr mapper; mapper = make_shared(m_context); mapper->bind(make_shared(TypeGuard(RUN))); - - m_dataItem = - DataItem::make({{"id", "pos"s}, {"category", "SAMPLE"s}, {"type", "POSITION"s}, {"units", "MILLIMETER"s}}, errors); + + m_dataItem = DataItem::make( + {{"id", "pos"s}, {"category", "SAMPLE"s}, {"type", "POSITION"s}, {"units", "MILLIMETER"s}}, + errors); contract->m_dataItem = m_dataItem; device->addDataItem(m_dataItem, errors); ASSERT_EQ(0, errors.size()); - + auto jm = make_shared(); jm->setValue(R"( { @@ -351,19 +353,19 @@ TEST_F(ObservationValidationTest, should_validate_json_data_item_types) } )"s); jm->m_device = device; - - auto observations = (*mapper)(jm); + + auto observations = (*mapper)(jm); auto oblist = observations->getValue(); ASSERT_EQ(1, oblist.size()); - + auto oi = oblist.begin(); - + { auto sample = dynamic_pointer_cast(*oi++); ASSERT_TRUE(sample); ASSERT_EQ(m_dataItem, sample->getDataItem()); ASSERT_TRUE(sample->isUnavailable()); - + ASSERT_EQ("INVALID", sample->get("quality")); - } + } }