diff --git a/CMakeLists.txt b/CMakeLists.txt index 2e85fc9f0..cdef6a6ee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,7 +2,7 @@ set(AGENT_VERSION_MAJOR 2) set(AGENT_VERSION_MINOR 5) set(AGENT_VERSION_PATCH 0) -set(AGENT_VERSION_BUILD 4) +set(AGENT_VERSION_BUILD 5) set(AGENT_VERSION_RC "") # This minimum version is to support Visual Studio 2019 and C++ feature checking and FetchContent diff --git a/README.md b/README.md index c911b749c..2d4fb6962 100755 --- a/README.md +++ b/README.md @@ -672,6 +672,10 @@ Configuration Parameters *Default*: `false` +* `CorrectTimestamps` - Verify time is always progressing forward for each data item and correct if not. + + *Default*: `false` + * `Validation` - Turns on validation of model components and observations *Default*: `false` diff --git a/agent_lib/CMakeLists.txt b/agent_lib/CMakeLists.txt index 0943f4de6..bf25989d4 100644 --- a/agent_lib/CMakeLists.txt +++ b/agent_lib/CMakeLists.txt @@ -189,6 +189,7 @@ set(AGENT_SOURCES "${SOURCE_DIR}/pipeline/topic_mapper.hpp" "${SOURCE_DIR}/pipeline/transform.hpp" "${SOURCE_DIR}/pipeline/upcase_value.hpp" + "${SOURCE_DIR}/pipeline/correct_timestamp.hpp" "${SOURCE_DIR}/pipeline/validator.hpp" # src/pipeline SOURCE_FILES_ONLY diff --git a/src/mtconnect/agent.hpp b/src/mtconnect/agent.hpp index 3556d02b4..493592b8c 100644 --- a/src/mtconnect/agent.hpp +++ b/src/mtconnect/agent.hpp @@ -213,6 +213,10 @@ namespace mtconnect { /// @brief Get the MTConnect schema version the agent is supporting /// @return The MTConnect schema version as a string const auto &getSchemaVersion() const { return m_schemaVersion; } + + /// @brief Get the validation state of the agent + /// @returns the validation state of the agent + bool isValidating() const { return m_validation; } /// @brief Get the integer schema version based on configuration. /// @returns the schema version as an integer [major * 100 + minor] as a 32bit integer. @@ -589,6 +593,7 @@ namespace mtconnect { } } int32_t getSchemaVersion() const override { return m_agent->getIntSchemaVersion(); } + bool isValidating() const override { return m_agent->isValidating(); } void deliverObservation(observation::ObservationPtr obs) override { m_agent->receiveObservation(obs); diff --git a/src/mtconnect/configuration/agent_config.cpp b/src/mtconnect/configuration/agent_config.cpp index 19b576fd1..45cd0b240 100644 --- a/src/mtconnect/configuration/agent_config.cpp +++ b/src/mtconnect/configuration/agent_config.cpp @@ -830,7 +830,8 @@ namespace mtconnect::configuration { {configuration::TlsClientCAs, ""s}, {configuration::SuppressIPAddress, false}, {configuration::AllowPutFrom, ""s}, - {configuration::Validation, false}}); + {configuration::Validation, false}, + {configuration::CorrectTimestamps, false}}); m_workerThreadCount = *GetOption(options, configuration::WorkerThreads); m_monitorFiles = *GetOption(options, configuration::MonitorConfigFiles); diff --git a/src/mtconnect/configuration/config_options.hpp b/src/mtconnect/configuration/config_options.hpp index 9c2857c48..b0ca2e951 100644 --- a/src/mtconnect/configuration/config_options.hpp +++ b/src/mtconnect/configuration/config_options.hpp @@ -77,6 +77,7 @@ namespace mtconnect { DECLARE_CONFIGURATION(EnableSourceDeviceModels); DECLARE_CONFIGURATION(WorkerThreads); DECLARE_CONFIGURATION(Validation); + DECLARE_CONFIGURATION(CorrectTimestamps); ///@} /// @name MQTT Configuration diff --git a/src/mtconnect/entity/entity.hpp b/src/mtconnect/entity/entity.hpp index 8f45cd319..b8a9578db 100644 --- a/src/mtconnect/entity/entity.hpp +++ b/src/mtconnect/entity/entity.hpp @@ -95,7 +95,12 @@ namespace mtconnect { /// @param props entity properties Entity(const std::string &name, const Properties &props) : m_name(name), m_properties(props) {} - Entity(const Entity &entity) = default; + Entity(const Entity &entity) + : m_name(entity.m_name), m_properties(entity.m_properties), m_order(entity.m_order) + { + if (entity.m_attributes) + setAttributes(*entity.m_attributes); + } virtual ~Entity() {} /// @brief Get a shared pointer @@ -348,10 +353,21 @@ namespace mtconnect { auto erase(Properties::iterator &it) { return m_properties.erase(it); } /// @brief tells the entity which properties are attributes for XML generation /// @param[in] a the attributes - void setAttributes(AttributeSet a) { m_attributes = a; } + void setAttributes(AttributeSet a) + { + std::unique_ptr set(new AttributeSet(a)); + m_attributes.swap(set); + } /// @brief get the attributes for XML generation /// @return attribute set - const auto &getAttributes() const { return m_attributes; } + const auto &getAttributes() const + { + static AttributeSet empty; + if (m_attributes) + return *m_attributes; + else + return empty; + } /// @brief compare two entities for equality /// @param other the other entity @@ -436,7 +452,8 @@ namespace mtconnect { QName m_name; Properties m_properties; OrderMapPtr m_order; - AttributeSet m_attributes; + std::unique_ptr m_attributes; + std::unique_ptr m_errors; }; /// @brief variant visitor to compare two entity parameter values for equality diff --git a/src/mtconnect/pipeline/correct_timestamp.hpp b/src/mtconnect/pipeline/correct_timestamp.hpp new file mode 100644 index 000000000..0ea340911 --- /dev/null +++ b/src/mtconnect/pipeline/correct_timestamp.hpp @@ -0,0 +1,86 @@ +// +// Copyright Copyright 2009-2024, AMT – The Association For Manufacturing Technology (“AMT”) +// All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#pragma once + +#include "mtconnect/config.hpp" +#include "transform.hpp" + +namespace mtconnect::pipeline { + /// @brief Filter duplicates + class AGENT_LIB_API CorrectTimestamp : public Transform + { + protected: + struct State : TransformState + { + std::unordered_map m_timestamps; + }; + + public: + CorrectTimestamp(const CorrectTimestamp &) = default; + /// @brief Create a duplicate filter with shared state from the context + /// @param context the context + CorrectTimestamp(PipelineContextPtr context) + : Transform("ValidateTimestamp"), + m_context(context), + m_state(context->getSharedState(m_name)) + { + m_guard = TypeGuard(RUN); + } + ~CorrectTimestamp() override = default; + + /// @brief check if the entity is a duplicate + /// @param[in] entity the entity to check + /// @return the result of the transform if not a duplicate or an empty entity + entity::EntityPtr operator()(entity::EntityPtr &&entity) override + { + using namespace observation; + + auto obs = std::dynamic_pointer_cast(entity); + if (obs->isOrphan()) + return entity::EntityPtr(); + + auto di = obs->getDataItem(); + auto &id = di->getId(); + auto ts = obs->getTimestamp(); + + std::lock_guard guard(*m_state); + + auto last = m_state->m_timestamps.find(id); + if (last != m_state->m_timestamps.end()) + { + if (ts < last->second) + { + LOG(debug) << "Observation for data item " << id << " has timestamp " << format(ts) + << " thats is before " << format(last->second); + + // Set the timestamp to now. + ts = std::chrono::system_clock::now(); + obs->setTimestamp(ts); + } + } + + m_state->m_timestamps.emplace(id, ts); + + return obs; + } + + protected: + PipelineContextPtr m_context; + std::shared_ptr m_state; + }; +} // namespace mtconnect::pipeline diff --git a/src/mtconnect/pipeline/duplicate_filter.hpp b/src/mtconnect/pipeline/duplicate_filter.hpp index c9fbf09d0..0493baa3b 100644 --- a/src/mtconnect/pipeline/duplicate_filter.hpp +++ b/src/mtconnect/pipeline/duplicate_filter.hpp @@ -25,12 +25,6 @@ namespace mtconnect::pipeline { class AGENT_LIB_API DuplicateFilter : public Transform { public: - /// @brief Shared states to check for duplicates - struct State : TransformState - { - std::unordered_map m_values; - }; - DuplicateFilter(const DuplicateFilter &) = default; /// @brief Create a duplicate filter with shared state from the context /// @param context the context diff --git a/src/mtconnect/pipeline/json_mapper.cpp b/src/mtconnect/pipeline/json_mapper.cpp index 9be7b4e52..8d00e421f 100644 --- a/src/mtconnect/pipeline/json_mapper.cpp +++ b/src/mtconnect/pipeline/json_mapper.cpp @@ -134,14 +134,19 @@ namespace mtconnect::pipeline { { LOG(warning) << "Error while parsing json: " << e->what(); } + + props.clear(); + props["VALUE"] = "UNAVAILABLE"s; + if (m_pipelineContext->m_contract->isValidating()) + props["quality"] = "INVALID"s; + + obs = observation::Observation::make(dataItem, props, *m_timestamp, errors); } - else - { - if (m_source) - dataItem->setDataSource(*m_source); - m_entities.push_back(obs); - m_forward(std::move(obs)); - } + + if (m_source) + dataItem->setDataSource(*m_source); + m_entities.push_back(obs); + m_forward(std::move(obs)); } } diff --git a/src/mtconnect/pipeline/json_mapper.hpp b/src/mtconnect/pipeline/json_mapper.hpp index d437a6b5f..34835ecf0 100644 --- a/src/mtconnect/pipeline/json_mapper.hpp +++ b/src/mtconnect/pipeline/json_mapper.hpp @@ -24,8 +24,6 @@ #include "mtconnect/entity/entity.hpp" #include "mtconnect/observation/observation.hpp" #include "mtconnect/pipeline/timestamp_extractor.hpp" -#include "shdr_tokenizer.hpp" -#include "timestamp_extractor.hpp" #include "topic_mapper.hpp" #include "transform.hpp" diff --git a/src/mtconnect/pipeline/pipeline_contract.hpp b/src/mtconnect/pipeline/pipeline_contract.hpp index 0066834c1..0e38733fc 100644 --- a/src/mtconnect/pipeline/pipeline_contract.hpp +++ b/src/mtconnect/pipeline/pipeline_contract.hpp @@ -76,6 +76,9 @@ namespace mtconnect { /// @brief get the current schema version as an integer /// @returns the schema version as an integer [major * 100 + minor] as a 32bit integer. virtual int32_t getSchemaVersion() const = 0; + /// @brief `true` if validation is turned on for the agent. + /// @returns the validation state for the pipeline + virtual bool isValidating() const = 0; /// @brief iterate through all the data items calling `fun` for each /// @param[in] fun The function or lambda to call virtual void eachDataItem(EachDataItem fun) = 0; diff --git a/src/mtconnect/pipeline/shdr_token_mapper.cpp b/src/mtconnect/pipeline/shdr_token_mapper.cpp index 264476613..ad8186663 100644 --- a/src/mtconnect/pipeline/shdr_token_mapper.cpp +++ b/src/mtconnect/pipeline/shdr_token_mapper.cpp @@ -137,7 +137,7 @@ namespace mtconnect { const entity::Requirements &reqs, TokenList::const_iterator &token, const TokenList::const_iterator &end, ErrorList &errors, - int32_t schemaVersion) + int32_t schemaVersion, bool validation) { NAMED_SCOPE("zipProperties"); Properties props; @@ -168,6 +168,10 @@ namespace mtconnect { { LOG(warning) << "Cannot convert value for data item id '" << dataItem->getId() << "': " << *token << " - " << e.what(); + if (schemaVersion >= SCHEMA_VERSION(2, 5) && validation) + { + props.insert_or_assign("quality", "INVALID"s); + } } } @@ -276,7 +280,8 @@ namespace mtconnect { if (reqs != nullptr) { auto obs = zipProperties(dataItem, timestamp, *reqs, token, end, errors, - m_contract->getSchemaVersion()); + m_contract->getSchemaVersion(), + m_contract->isValidating()); if (dataItem->getConstantValue()) return nullptr; if (obs && source) diff --git a/src/mtconnect/pipeline/validator.hpp b/src/mtconnect/pipeline/validator.hpp index 352cc30f6..ff3abf09d 100644 --- a/src/mtconnect/pipeline/validator.hpp +++ b/src/mtconnect/pipeline/validator.hpp @@ -39,7 +39,7 @@ namespace mtconnect::pipeline { Validator(PipelineContextPtr context) : Transform("Validator"), m_contract(context->m_contract.get()) { - m_guard = TypeGuard(RUN) || TypeGuard(SKIP); + m_guard = TypeGuard(RUN) || TypeGuard(SKIP); } /// @brief validate the Event @@ -49,14 +49,14 @@ namespace mtconnect::pipeline { { using namespace observation; using namespace mtconnect::validation::observations; - auto evt = std::dynamic_pointer_cast(entity); + auto obs = std::dynamic_pointer_cast(entity); - auto di = evt->getDataItem(); - if (evt->isUnavailable() || di->isDataSet()) + auto di = obs->getDataItem(); + if (obs->isUnavailable() || di->isDataSet()) { - evt->setProperty("quality", std::string("VALID")); + obs->setProperty("quality", std::string("VALID")); } - else + else if (auto evt = std::dynamic_pointer_cast(obs)) { auto &value = evt->getValue(); @@ -114,8 +114,15 @@ namespace mtconnect::pipeline { evt->setProperty("quality", std::string("UNVERIFIABLE")); } } + else if (auto spl = std::dynamic_pointer_cast(obs)) + { + } + else + { + obs->setProperty("quality", std::string("VALID")); + } - return next(std::move(evt)); + return next(std::move(obs)); } protected: diff --git a/src/mtconnect/source/adapter/adapter_pipeline.cpp b/src/mtconnect/source/adapter/adapter_pipeline.cpp index 6432a6cfc..1ff5ada6e 100644 --- a/src/mtconnect/source/adapter/adapter_pipeline.cpp +++ b/src/mtconnect/source/adapter/adapter_pipeline.cpp @@ -28,6 +28,7 @@ #include "mtconnect/pipeline/timestamp_extractor.hpp" #include "mtconnect/pipeline/topic_mapper.hpp" #include "mtconnect/pipeline/upcase_value.hpp" +#include "mtconnect/pipeline/correct_timestamp.hpp" #include "mtconnect/pipeline/validator.hpp" #include "mtconnect/source/adapter/adapter.hpp" @@ -140,6 +141,9 @@ namespace mtconnect { if (IsOptionSet(m_options, configuration::ConversionRequired)) next = next->bind(make_shared()); + if (IsOptionSet(m_options, configuration::CorrectTimestamps)) + next = next->bind(make_shared(m_context)); + // Validate Values if (IsOptionSet(m_options, configuration::Validation)) next = next->bind(make_shared(m_context)); diff --git a/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp b/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp index 379b3534e..583c979cc 100644 --- a/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp +++ b/src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp @@ -82,13 +82,13 @@ namespace mtconnect { {configuration::RealTime, false}, {configuration::RelativeTime, false}}); loadTopics(block, m_options); - + if (!HasOption(m_options, configuration::MqttHost) && HasOption(m_options, configuration::Host)) { m_options[configuration::MqttHost] = m_options[configuration::Host]; } - + if (!HasOption(m_options, configuration::MqttPort)) { if (HasOption(m_options, configuration::Port)) @@ -100,7 +100,7 @@ namespace mtconnect { m_options[configuration::MqttPort] = 1883; } } - + m_handler = m_pipeline.makeHandler(); auto clientHandler = make_unique(); @@ -147,9 +147,9 @@ namespace mtconnect { m_client = make_shared(m_ioContext, m_options, std::move(clientHandler)); } - + m_name = m_client->getUrl(); - + if (auto ident = GetOption(m_options, configuration::AdapterIdentity)) { m_identity = *ident; @@ -157,7 +157,7 @@ namespace mtconnect { else { stringstream identity; - + identity << m_name; auto topics = GetOption(m_options, configuration::Topics); if (topics) @@ -170,7 +170,7 @@ namespace mtconnect { sha1.process_bytes(identity.str().c_str(), identity.str().length()); boost::uuids::detail::sha1::digest_type digest; sha1.get_digest(digest); - + identity.str(""); identity << std::hex << digest[0] << digest[1] << digest[2]; m_identity = string("_") + (identity.str()).substr(0, 10); diff --git a/test_package/CMakeLists.txt b/test_package/CMakeLists.txt index c4f0a1a0a..4a3702dbd 100644 --- a/test_package/CMakeLists.txt +++ b/test_package/CMakeLists.txt @@ -281,6 +281,7 @@ add_agent_test(mtconnect_xml_transform FALSE pipeline) add_agent_test(response_document FALSE pipeline) add_agent_test(json_mapping FALSE pipeline) add_agent_test(observation_validation TRUE pipeline) +add_agent_test(correct_timestamp TRUE pipeline) add_agent_test(agent TRUE core) add_agent_test(globals FALSE core) diff --git a/test_package/agent_adapter_test.cpp b/test_package/agent_adapter_test.cpp index bad9f9600..b086e69a0 100644 --- a/test_package/agent_adapter_test.cpp +++ b/test_package/agent_adapter_test.cpp @@ -84,6 +84,7 @@ struct MockPipelineContract : public PipelineContract void deliverConnectStatus(entity::EntityPtr, const StringList &dev, bool flag) override {} void sourceFailed(const std::string &id) override { m_failed = true; } const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } + bool isValidating() const override { return false; } bool m_failed = false; std::string m_result; diff --git a/test_package/correct_timestamp_test.cpp b/test_package/correct_timestamp_test.cpp new file mode 100644 index 000000000..da41bc36c --- /dev/null +++ b/test_package/correct_timestamp_test.cpp @@ -0,0 +1,167 @@ +// +// Copyright Copyright 2009-2024, AMT – The Association For Manufacturing Technology (“AMT”) +// All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Ensure that gtest is the first header otherwise Windows raises an error +#include +// Keep this comment to keep gtest.h above. (clang-format off/on is not working here!) + +#include + +#include "mtconnect/buffer/checkpoint.hpp" +#include "mtconnect/observation/observation.hpp" +#include "mtconnect/pipeline/deliver.hpp" +#include "mtconnect/pipeline/delta_filter.hpp" +#include "mtconnect/pipeline/period_filter.hpp" +#include "mtconnect/pipeline/pipeline.hpp" +#include "mtconnect/pipeline/shdr_token_mapper.hpp" +#include "mtconnect/pipeline/correct_timestamp.hpp" + +using namespace mtconnect; +using namespace mtconnect::pipeline; +using namespace mtconnect::observation; +using namespace mtconnect::asset; +using namespace device_model; +using namespace data_item; +using namespace entity; +using namespace std; +using namespace std::literals; +using namespace std::chrono_literals; + +// main +int main(int argc, char *argv[]) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +class MockPipelineContract : public PipelineContract +{ +public: + MockPipelineContract(std::map &items) : m_dataItems(items) {} + DevicePtr findDevice(const std::string &device) override { return nullptr; } + DataItemPtr findDataItem(const std::string &device, const std::string &name) override + { + return m_dataItems[name]; + } + void eachDataItem(EachDataItem fun) override {} + void deliverObservation(observation::ObservationPtr obs) override {} + void deliverAsset(AssetPtr) override {} + void deliverDevices(std::list) override {} + void deliverDevice(DevicePtr) override {} + void deliverAssetCommand(entity::EntityPtr) override {} + int32_t getSchemaVersion() const override { return IntDefaultSchemaVersion(); } + bool isValidating() const override { return false; } + void deliverCommand(entity::EntityPtr) override {} + void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} + void sourceFailed(const std::string &id) override {} + const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return nullptr; } + + std::map &m_dataItems; +}; + +class ValidateTimestampTest : public testing::Test +{ +protected: + void SetUp() override + { + ErrorList errors; + m_component = Component::make("Linear", {{"id", "x"s}, {"name", "X"s}}, errors); + + m_context = make_shared(); + m_context->m_contract = make_unique(m_dataItems); + m_mapper = make_shared(m_context); + m_mapper->bind(make_shared(TypeGuard(RUN))); + } + + void TearDown() override { m_dataItems.clear(); } + + DataItemPtr makeDataItem(Properties attributes) + { + ErrorList errors; + auto di = DataItem::make(attributes, errors); + m_dataItems.emplace(di->getId(), di); + m_component->addDataItem(di, errors); + + return di; + } + + const EntityPtr observe(TokenList tokens, Timestamp now) + { + auto ts = make_shared(); + ts->m_tokens = tokens; + ts->m_timestamp = now; + ts->setProperty("timestamp", ts->m_timestamp); + + return (*m_mapper)(ts); + } + + shared_ptr m_mapper; + std::map m_dataItems; + shared_ptr m_context; + ComponentPtr m_component; +}; + +TEST_F(ValidateTimestampTest, should_not_change_timestamp_if_time_is_moving_forward) +{ + makeDataItem({{"id", "a"s}, {"type", "EXECUTION"s}, {"category", "EVENT"s}}); + + auto filter = make_shared(m_context); + m_mapper->bind(filter); + filter->bind(make_shared(m_context)); + + auto now = chrono::system_clock::now(); + + auto os1 = observe({"a", "READY"}, now); + auto list1 = os1->getValue(); + ASSERT_EQ(1, list1.size()); + + auto obs1 = dynamic_pointer_cast(list1.front()); + ASSERT_EQ(now, obs1->getTimestamp()); + + auto os2 = observe({"a", "ACTIVE"}, now + 1s); + auto list2 = os2->getValue(); + ASSERT_EQ(1, list2.size()); + + auto obs2 = dynamic_pointer_cast(list2.front()); + ASSERT_EQ(now + 1s, obs2->getTimestamp()); +} + +TEST_F(ValidateTimestampTest, should_change_timestamp_if_time_is_moving_backward) +{ + makeDataItem({{"id", "a"s}, {"type", "EXECUTION"s}, {"category", "EVENT"s}}); + + auto filter = make_shared(m_context); + m_mapper->bind(filter); + filter->bind(make_shared(m_context)); + + auto now = chrono::system_clock::now(); + + auto os1 = observe({"a", "READY"}, now); + auto list1 = os1->getValue(); + ASSERT_EQ(1, list1.size()); + + auto obs1 = dynamic_pointer_cast(list1.front()); + ASSERT_EQ(now, obs1->getTimestamp()); + + auto os2 = observe({"a", "ACTIVE"}, now - 1s); + auto list2 = os2->getValue(); + ASSERT_EQ(1, list2.size()); + + auto obs2 = dynamic_pointer_cast(list2.front()); + ASSERT_NE(now - 1s, obs2->getTimestamp()); + ASSERT_LE(now, obs2->getTimestamp()); +} diff --git a/test_package/data_item_mapping_test.cpp b/test_package/data_item_mapping_test.cpp index a17523e1b..f6bb107d6 100644 --- a/test_package/data_item_mapping_test.cpp +++ b/test_package/data_item_mapping_test.cpp @@ -58,6 +58,7 @@ class MockPipelineContract : public PipelineContract void deliverDevices(std::list) override {} void deliverDevice(DevicePtr) override {} int32_t getSchemaVersion() const override { return m_schemaVersion; } + bool isValidating() const override { return false; } void deliverAssetCommand(entity::EntityPtr) override {} void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} diff --git a/test_package/duplicate_filter_test.cpp b/test_package/duplicate_filter_test.cpp index 744f43205..3d891457c 100644 --- a/test_package/duplicate_filter_test.cpp +++ b/test_package/duplicate_filter_test.cpp @@ -74,6 +74,7 @@ class MockPipelineContract : public PipelineContract { return m_checkpoint.checkDuplicate(obs); } + bool isValidating() const override { return false; } std::map &m_dataItems; buffer::Checkpoint m_checkpoint; diff --git a/test_package/embedded_ruby_test.cpp b/test_package/embedded_ruby_test.cpp index 5469a091b..8cffce101 100644 --- a/test_package/embedded_ruby_test.cpp +++ b/test_package/embedded_ruby_test.cpp @@ -92,6 +92,7 @@ namespace { void deliverDevice(DevicePtr) override {} void deliverAssetCommand(entity::EntityPtr c) override { m_command = c; } int32_t getSchemaVersion() const override { return IntDefaultSchemaVersion(); } + bool isValidating() const override { return false; } void deliverCommand(entity::EntityPtr c) override { m_command = c; } void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} diff --git a/test_package/json_mapping_test.cpp b/test_package/json_mapping_test.cpp index 72a0ee72e..d08c1d292 100644 --- a/test_package/json_mapping_test.cpp +++ b/test_package/json_mapping_test.cpp @@ -67,6 +67,7 @@ class MockPipelineContract : public PipelineContract void sourceFailed(const std::string &id) override {} const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } int32_t getSchemaVersion() const override { return SCHEMA_VERSION(2, 3); }; + bool isValidating() const override { return false; } std::map &m_dataItems; std::map &m_devices; diff --git a/test_package/mqtt_adapter_test.cpp b/test_package/mqtt_adapter_test.cpp index b1a9a6403..da4efeddb 100644 --- a/test_package/mqtt_adapter_test.cpp +++ b/test_package/mqtt_adapter_test.cpp @@ -51,9 +51,7 @@ int main(int argc, char *argv[]) class MockPipelineContract : public PipelineContract { public: - MockPipelineContract(int32_t schemaVersion) - : m_schemaVersion(schemaVersion) - {} + MockPipelineContract(int32_t schemaVersion) : m_schemaVersion(schemaVersion) {} DevicePtr findDevice(const std::string &) override { return nullptr; } DataItemPtr findDataItem(const std::string &device, const std::string &name) override { @@ -70,7 +68,8 @@ class MockPipelineContract : public PipelineContract void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } - + bool isValidating() const override { return false; } + int32_t m_schemaVersion; }; @@ -87,10 +86,10 @@ TEST_F(MqttAdapterTest, should_create_a_unique_id_based_on_topic) asio::io_context ioc; asio::io_context::strand strand(ioc); ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s}, - {configuration::Host, "mybroker.com"s}, - {configuration::Port, 1883}, - {configuration::Protocol, "mqtt"s}, - {configuration::Topics, StringList {"pipeline/#"s}}}; + {configuration::Host, "mybroker.com"s}, + {configuration::Port, 1883}, + {configuration::Protocol, "mqtt"s}, + {configuration::Topics, StringList {"pipeline/#"s}}}; boost::property_tree::ptree tree; pipeline::PipelineContextPtr context = make_shared(); context->m_contract = make_unique(SCHEMA_VERSION(2, 5)); @@ -105,18 +104,18 @@ TEST_F(MqttAdapterTest, should_change_if_topics_change) asio::io_context ioc; asio::io_context::strand strand(ioc); ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s}, - {configuration::Host, "mybroker.com"s}, - {configuration::Port, 1883}, - {configuration::Protocol, "mqtt"s}, - {configuration::Topics, StringList {"pipeline/#"s}}}; + {configuration::Host, "mybroker.com"s}, + {configuration::Port, 1883}, + {configuration::Protocol, "mqtt"s}, + {configuration::Topics, StringList {"pipeline/#"s}}}; boost::property_tree::ptree tree; pipeline::PipelineContextPtr context = make_shared(); context->m_contract = make_unique(SCHEMA_VERSION(2, 5)); auto adapter = make_unique(ioc, context, options, tree); - + ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName()); ASSERT_EQ("_89c11f795e", adapter->getIdentity()); - + options[configuration::Topics] = StringList {"pipline/#"s, "topic/"s}; adapter = make_unique(ioc, context, options, tree); @@ -128,21 +127,21 @@ TEST_F(MqttAdapterTest, should_change_if_port_changes) asio::io_context ioc; asio::io_context::strand strand(ioc); ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s}, - {configuration::Host, "mybroker.com"s}, - {configuration::Port, 1883}, - {configuration::Protocol, "mqtt"s}, - {configuration::Topics, StringList {"pipeline/#"s}}}; + {configuration::Host, "mybroker.com"s}, + {configuration::Port, 1883}, + {configuration::Protocol, "mqtt"s}, + {configuration::Topics, StringList {"pipeline/#"s}}}; boost::property_tree::ptree tree; pipeline::PipelineContextPtr context = make_shared(); context->m_contract = make_unique(SCHEMA_VERSION(2, 5)); auto adapter = make_unique(ioc, context, options, tree); - + ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName()); ASSERT_EQ("_89c11f795e", adapter->getIdentity()); - + options[configuration::Port] = 1882; adapter = make_unique(ioc, context, options, tree); - + ASSERT_EQ("mqtt://mybroker.com:1882/", adapter->getName()); ASSERT_EQ("_7042e8f45e", adapter->getIdentity()); } @@ -152,21 +151,21 @@ TEST_F(MqttAdapterTest, should_change_if_host_changes) asio::io_context ioc; asio::io_context::strand strand(ioc); ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s}, - {configuration::Host, "mybroker.com"s}, - {configuration::Port, 1883}, - {configuration::Protocol, "mqtt"s}, - {configuration::Topics, StringList {"pipeline/#"s}}}; + {configuration::Host, "mybroker.com"s}, + {configuration::Port, 1883}, + {configuration::Protocol, "mqtt"s}, + {configuration::Topics, StringList {"pipeline/#"s}}}; boost::property_tree::ptree tree; pipeline::PipelineContextPtr context = make_shared(); context->m_contract = make_unique(SCHEMA_VERSION(2, 5)); auto adapter = make_unique(ioc, context, options, tree); - + ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName()); ASSERT_EQ("_89c11f795e", adapter->getIdentity()); - + options[configuration::Host] = "localhost"s; adapter = make_unique(ioc, context, options, tree); - + ASSERT_EQ("mqtt://localhost:1883/", adapter->getName()); ASSERT_EQ("_4cd2e64d4e", adapter->getIdentity()); } @@ -176,16 +175,16 @@ TEST_F(MqttAdapterTest, should_be_able_to_set_adapter_identity) asio::io_context ioc; asio::io_context::strand strand(ioc); ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s}, - {configuration::Host, "mybroker.com"s}, - {configuration::Port, 1883}, - {configuration::Protocol, "mqtt"s}, - {configuration::AdapterIdentity, "MyIdentity"s}, - {configuration::Topics, StringList {"pipeline/#"s}}}; + {configuration::Host, "mybroker.com"s}, + {configuration::Port, 1883}, + {configuration::Protocol, "mqtt"s}, + {configuration::AdapterIdentity, "MyIdentity"s}, + {configuration::Topics, StringList {"pipeline/#"s}}}; boost::property_tree::ptree tree; pipeline::PipelineContextPtr context = make_shared(); context->m_contract = make_unique(SCHEMA_VERSION(2, 5)); auto adapter = make_unique(ioc, context, options, tree); - + ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName()); ASSERT_EQ("MyIdentity", adapter->getIdentity()); } diff --git a/test_package/mtconnect_xml_transform_test.cpp b/test_package/mtconnect_xml_transform_test.cpp index 226826e48..21e3b010e 100644 --- a/test_package/mtconnect_xml_transform_test.cpp +++ b/test_package/mtconnect_xml_transform_test.cpp @@ -61,6 +61,7 @@ class MockPipelineContract : public PipelineContract void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } + bool isValidating() const override { return false; } DevicePtr m_device; }; diff --git a/test_package/observation_validation_test.cpp b/test_package/observation_validation_test.cpp index f2e014848..50b34b0fe 100644 --- a/test_package/observation_validation_test.cpp +++ b/test_package/observation_validation_test.cpp @@ -29,6 +29,9 @@ #include "mtconnect/entity/entity.hpp" #include "mtconnect/observation/observation.hpp" #include "mtconnect/pipeline/validator.hpp" +#include "mtconnect/pipeline/shdr_token_mapper.hpp" +#include "mtconnect/pipeline/timestamp_extractor.hpp" +#include "mtconnect/pipeline/json_mapper.hpp" using namespace mtconnect; using namespace mtconnect::pipeline; @@ -49,11 +52,11 @@ int main(int argc, char *argv[]) class MockPipelineContract : public PipelineContract { public: - MockPipelineContract(int32_t schemaVersion) : m_schemaVersion(schemaVersion) {} - DevicePtr findDevice(const std::string &) override { return nullptr; } + MockPipelineContract(int32_t schemaVersion, DataItemPtr &dataItem) : m_schemaVersion(schemaVersion), m_dataItem(dataItem) {} + DevicePtr findDevice(const std::string &) override { return m_device; } DataItemPtr findDataItem(const std::string &device, const std::string &name) override { - return nullptr; + return m_dataItem; } void eachDataItem(EachDataItem fun) override {} void deliverObservation(observation::ObservationPtr obs) override {} @@ -61,6 +64,7 @@ class MockPipelineContract : public PipelineContract void deliverDevices(std::list) override {} void deliverDevice(DevicePtr device) override {} int32_t getSchemaVersion() const override { return m_schemaVersion; } + bool isValidating() const override { return m_validation; } void deliverAssetCommand(entity::EntityPtr) override {} void deliverCommand(entity::EntityPtr) override {} void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} @@ -68,6 +72,9 @@ class MockPipelineContract : public PipelineContract const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } int32_t m_schemaVersion; + DataItemPtr &m_dataItem; + DevicePtr m_device; + bool m_validation { true }; }; /// @brief Validation tests for observations @@ -76,15 +83,15 @@ class ObservationValidationTest : public testing::Test protected: void SetUp() override { + ErrorList errors; + m_dataItem = + DataItem::make({{"id", "exec"s}, {"category", "EVENT"s}, {"type", "EXECUTION"s}}, errors); + m_context = make_shared(); - m_context->m_contract = make_unique(SCHEMA_VERSION(2, 5)); + m_context->m_contract = make_unique(SCHEMA_VERSION(2, 5), m_dataItem); m_validator = make_shared(m_context); m_validator->bind(make_shared(TypeGuard(RUN))); m_time = Timestamp(date::sys_days(2021_y / jan / 19_d)) + 10h + 1min; - - ErrorList errors; - m_dataItem = - DataItem::make({{"id", "exec"s}, {"category", "EVENT"s}, {"type", "EXECUTION"s}}, errors); } void TearDown() override @@ -236,3 +243,127 @@ TEST_F(ObservationValidationTest, should_be_invalid_if_entry_has_not_been_introd ASSERT_EQ("INVALID", quality); ASSERT_FALSE(evt->hasProperty("deprecated")); } + +TEST_F(ObservationValidationTest, should_validate_data_item_types) +{ + auto contract = static_cast(m_context->m_contract.get()); + contract->m_schemaVersion = SCHEMA_VERSION(2, 5); + + shared_ptr mapper; + mapper = make_shared(m_context, "", 2); + mapper->bind(make_shared(TypeGuard(RUN))); + + ErrorList errors; + m_dataItem = + DataItem::make({{"id", "pos"s}, {"category", "SAMPLE"s}, {"type", "POSITION"s}, {"units", "MILLIMETER"s}}, errors); + + auto ts = make_shared(); + ts->m_tokens = {{"pos"s, "ABC"s}}; + ts->m_timestamp = chrono::system_clock::now(); + ts->setProperty("timestamp", ts->m_timestamp); + + auto observations = (*mapper)(ts); + auto &r = *observations; + ASSERT_EQ(typeid(Observations), typeid(r)); + + auto oblist = observations->getValue(); + ASSERT_EQ(1, oblist.size()); + + auto oi = oblist.begin(); + + { + auto sample = dynamic_pointer_cast(*oi++); + ASSERT_TRUE(sample); + ASSERT_EQ(m_dataItem, sample->getDataItem()); + ASSERT_TRUE(sample->isUnavailable()); + + ASSERT_EQ("INVALID", sample->get("quality")); + } + +} + +TEST_F(ObservationValidationTest, should_not_validate_if_validation_is_off) +{ + auto contract = static_cast(m_context->m_contract.get()); + contract->m_schemaVersion = SCHEMA_VERSION(2, 5); + contract->m_validation = false; + + shared_ptr mapper; + mapper = make_shared(m_context, "", 2); + mapper->bind(make_shared(TypeGuard(RUN))); + + ErrorList errors; + m_dataItem = + DataItem::make({{"id", "pos"s}, {"category", "SAMPLE"s}, {"type", "POSITION"s}, {"units", "MILLIMETER"s}}, errors); + + auto ts = make_shared(); + ts->m_tokens = {{"pos"s, "ABC"s}}; + ts->m_timestamp = chrono::system_clock::now(); + ts->setProperty("timestamp", ts->m_timestamp); + + auto observations = (*mapper)(ts); + auto &r = *observations; + ASSERT_EQ(typeid(Observations), typeid(r)); + + auto oblist = observations->getValue(); + ASSERT_EQ(1, oblist.size()); + + auto oi = oblist.begin(); + + { + auto sample = dynamic_pointer_cast(*oi++); + ASSERT_TRUE(sample); + ASSERT_EQ(m_dataItem, sample->getDataItem()); + ASSERT_TRUE(sample->isUnavailable()); + + ASSERT_FALSE(sample->hasProperty("quality")); + } + +} + +TEST_F(ObservationValidationTest, should_validate_json_data_item_types) +{ + using namespace mtconnect::device_model; + + ErrorList errors; + auto contract = static_cast(m_context->m_contract.get()); + contract->m_schemaVersion = SCHEMA_VERSION(2, 5); + Properties dev { + {"id", "3"s}, {"name", "DeviceTest2"s}, {"uuid", "UnivUniqId2"s}, {"iso841Class", "6"s}}; + auto device = dynamic_pointer_cast(Device::getFactory()->make("Device", dev, errors)); + + + shared_ptr mapper; + mapper = make_shared(m_context); + mapper->bind(make_shared(TypeGuard(RUN))); + + m_dataItem = + DataItem::make({{"id", "pos"s}, {"category", "SAMPLE"s}, {"type", "POSITION"s}, {"units", "MILLIMETER"s}}, errors); + contract->m_dataItem = m_dataItem; + device->addDataItem(m_dataItem, errors); + ASSERT_EQ(0, errors.size()); + + auto jm = make_shared(); + jm->setValue(R"( +{ + "timestamp": "2023-11-09T11:20:00Z", + "pos": "ABC" +} +)"s); + jm->m_device = device; + + auto observations = (*mapper)(jm); + auto oblist = observations->getValue(); + ASSERT_EQ(1, oblist.size()); + + auto oi = oblist.begin(); + + { + auto sample = dynamic_pointer_cast(*oi++); + ASSERT_TRUE(sample); + ASSERT_EQ(m_dataItem, sample->getDataItem()); + ASSERT_TRUE(sample->isUnavailable()); + + ASSERT_EQ("INVALID", sample->get("quality")); + } +} diff --git a/test_package/period_filter_test.cpp b/test_package/period_filter_test.cpp index 02d5029ea..49ee0833c 100644 --- a/test_package/period_filter_test.cpp +++ b/test_package/period_filter_test.cpp @@ -71,6 +71,7 @@ struct MockPipelineContract : public PipelineContract void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } + bool isValidating() const override { return false; } std::map &m_dataItems; diff --git a/test_package/response_document_test.cpp b/test_package/response_document_test.cpp index ae38d50e6..5bd4896bc 100644 --- a/test_package/response_document_test.cpp +++ b/test_package/response_document_test.cpp @@ -63,6 +63,7 @@ class MockPipelineContract : public PipelineContract void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } + bool isValidating() const override { return false; } DevicePtr m_device; }; diff --git a/test_package/topic_mapping_test.cpp b/test_package/topic_mapping_test.cpp index de279ac71..a83824e5e 100644 --- a/test_package/topic_mapping_test.cpp +++ b/test_package/topic_mapping_test.cpp @@ -63,6 +63,7 @@ class MockPipelineContract : public PipelineContract void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {} void sourceFailed(const std::string &id) override {} const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; } + bool isValidating() const override { return false; } std::map &m_dataItems; std::map &m_devices;