Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set(AGENT_VERSION_MAJOR 2)
set(AGENT_VERSION_MINOR 5)
set(AGENT_VERSION_PATCH 0)
set(AGENT_VERSION_BUILD 3)
set(AGENT_VERSION_BUILD 4)
set(AGENT_VERSION_RC "")

# This minimum version is to support Visual Studio 2019 and C++ feature checking and FetchContent
Expand Down
7 changes: 1 addition & 6 deletions src/mtconnect/device_model/agent_device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,11 @@ namespace mtconnect {
GetOption<bool>(adapter->getOptions(), config::SuppressIPAddress).value_or(false);
auto id = adapter->getIdentity();

stringstream name;
name << adapter->getHost() << ':' << adapter->getPort();

ErrorList errors;
Properties attrs {{"id", id}};
if (!suppress)
{
stringstream name;
name << adapter->getHost() << ':' << adapter->getPort();
attrs["name"] = name.str();
attrs["name"] = adapter->getName();
}
else
{
Expand Down
47 changes: 37 additions & 10 deletions src/mtconnect/source/adapter/mqtt/mqtt_adapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ namespace mtconnect {
AddOptions(block, m_options,
{{configuration::UUID, string()},
{configuration::Manufacturer, string()},
{configuration::AdapterIdentity, string()},
{configuration::Station, string()},
{configuration::Url, string()},
{configuration::Topics, StringList()},
Expand All @@ -81,13 +82,13 @@ namespace mtconnect {
{configuration::RealTime, false},
{configuration::RelativeTime, false}});
loadTopics(block, m_options);

if (!HasOption(m_options, configuration::MqttHost) &&
HasOption(m_options, configuration::Host))
{
m_options[configuration::MqttHost] = m_options[configuration::Host];
}

if (!HasOption(m_options, configuration::MqttPort))
{
if (HasOption(m_options, configuration::Port))
Expand All @@ -99,29 +100,29 @@ namespace mtconnect {
m_options[configuration::MqttPort] = 1883;
}
}

m_handler = m_pipeline.makeHandler();
auto clientHandler = make_unique<ClientHandler>();

m_pipeline.m_handler = m_handler.get();

clientHandler->m_connecting = [this](shared_ptr<MqttClient> client) {
m_handler->m_connecting(client->getIdentity());
m_handler->m_connecting(m_identity);
};

clientHandler->m_connected = [this](shared_ptr<MqttClient> client) {
client->connectComplete();
m_handler->m_connected(client->getIdentity());
m_handler->m_connected(m_identity);
subscribeToTopics();
};

clientHandler->m_disconnected = [this](shared_ptr<MqttClient> client) {
m_handler->m_disconnected(client->getIdentity());
m_handler->m_disconnected(m_identity);
};

clientHandler->m_receive = [this](shared_ptr<MqttClient> client, const std::string &topic,
const std::string &payload) {
m_handler->m_processMessage(topic, payload, client->getIdentity());
m_handler->m_processMessage(topic, payload, m_identity);
};

if (IsOptionSet(m_options, configuration::MqttTls) &&
Expand All @@ -146,11 +147,37 @@ namespace mtconnect {
m_client = make_shared<mtconnect::mqtt_client::MqttTcpClient>(m_ioContext, m_options,
std::move(clientHandler));
}

m_identity = m_client->getIdentity();

m_name = m_client->getUrl();

if (auto ident = GetOption<string>(m_options, configuration::AdapterIdentity))
{
m_identity = *ident;
}
else
{
stringstream identity;

identity << m_name;
auto topics = GetOption<StringList>(m_options, configuration::Topics);
if (topics)
{
for (const auto &s : *topics)
identity << s;
}

boost::uuids::detail::sha1 sha1;
sha1.process_bytes(identity.str().c_str(), identity.str().length());
boost::uuids::detail::sha1::digest_type digest;
sha1.get_digest(digest);

identity.str("");
identity << std::hex << digest[0] << digest[1] << digest[2];
m_identity = string("_") + (identity.str()).substr(0, 10);

m_options[configuration::AdapterIdentity] = m_identity;
}

m_options[configuration::AdapterIdentity] = m_name;
m_pipeline.build(m_options);
}

Expand Down
2 changes: 1 addition & 1 deletion test_package/agent_device_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ TEST_F(AgentDeviceTest, should_add_component_and_data_items_for_adapter)

ASSERT_XML_PATH_COUNT(doc, ADAPTERS_PATH "/*", 1);
ASSERT_XML_PATH_EQUAL(doc, ADAPTER_PATH "@id", ID_PREFIX);
ASSERT_XML_PATH_EQUAL(doc, ADAPTER_PATH "@name", "127.0.0.1:21788");
ASSERT_XML_PATH_EQUAL(doc, ADAPTER_PATH "@name", "shdr://127.0.0.1:21788");

ASSERT_XML_PATH_EQUAL(
doc, ADAPTER_DATA_ITEMS_PATH "/m:DataItem[@id='" ID_PREFIX "_adapter_uri']@type",
Expand Down
139 changes: 138 additions & 1 deletion test_package/mqtt_adapter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@
#include <string>
#include <vector>

#include "mtconnect/configuration/config_options.hpp"
#include "mtconnect/pipeline/pipeline_context.hpp"
#include "mtconnect/source/adapter/mqtt/mqtt_adapter.hpp"

using namespace std;
using namespace mtconnect;
using namespace mtconnect::source::adapter;
using namespace mtconnect::source::adapter::mqtt_adapter;
using namespace mtconnect::pipeline;
using namespace mtconnect::asset;

namespace asio = boost::asio;
using namespace std::literals;

// main
int main(int argc, char *argv[])
Expand All @@ -43,6 +48,32 @@ int main(int argc, char *argv[])
return RUN_ALL_TESTS();
}

class MockPipelineContract : public PipelineContract
{
public:
MockPipelineContract(int32_t schemaVersion)
: m_schemaVersion(schemaVersion)
{}
DevicePtr findDevice(const std::string &) override { return nullptr; }
DataItemPtr findDataItem(const std::string &device, const std::string &name) override
{
return nullptr;
}
void eachDataItem(EachDataItem fun) override {}
void deliverObservation(observation::ObservationPtr obs) override {}
void deliverAsset(AssetPtr) override {}
void deliverDevices(std::list<DevicePtr>) override {}
void deliverDevice(DevicePtr) override {}
int32_t getSchemaVersion() const override { return m_schemaVersion; }
void deliverAssetCommand(entity::EntityPtr) override {}
void deliverCommand(entity::EntityPtr) override {}
void deliverConnectStatus(entity::EntityPtr, const StringList &, bool) override {}
void sourceFailed(const std::string &id) override {}
const ObservationPtr checkDuplicate(const ObservationPtr &obs) const override { return obs; }

int32_t m_schemaVersion;
};

class MqttAdapterTest : public testing::Test
{
protected:
Expand All @@ -51,4 +82,110 @@ class MqttAdapterTest : public testing::Test
void TearDown() override {}
};

TEST_F(MqttAdapterTest, should_find_data_item_from_topic) {}
TEST_F(MqttAdapterTest, should_create_a_unique_id_based_on_topic)
{
asio::io_context ioc;
asio::io_context::strand strand(ioc);
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
{configuration::Host, "mybroker.com"s},
{configuration::Port, 1883},
{configuration::Protocol, "mqtt"s},
{configuration::Topics, StringList {"pipeline/#"s}}};
boost::property_tree::ptree tree;
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
ASSERT_EQ("_89c11f795e", adapter->getIdentity());
}

TEST_F(MqttAdapterTest, should_change_if_topics_change)
{
asio::io_context ioc;
asio::io_context::strand strand(ioc);
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
{configuration::Host, "mybroker.com"s},
{configuration::Port, 1883},
{configuration::Protocol, "mqtt"s},
{configuration::Topics, StringList {"pipeline/#"s}}};
boost::property_tree::ptree tree;
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
ASSERT_EQ("_89c11f795e", adapter->getIdentity());

options[configuration::Topics] = StringList {"pipline/#"s, "topic/"s};
adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("_29e17b8870", adapter->getIdentity());
}

TEST_F(MqttAdapterTest, should_change_if_port_changes)
{
asio::io_context ioc;
asio::io_context::strand strand(ioc);
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
{configuration::Host, "mybroker.com"s},
{configuration::Port, 1883},
{configuration::Protocol, "mqtt"s},
{configuration::Topics, StringList {"pipeline/#"s}}};
boost::property_tree::ptree tree;
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
ASSERT_EQ("_89c11f795e", adapter->getIdentity());

options[configuration::Port] = 1882;
adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://mybroker.com:1882/", adapter->getName());
ASSERT_EQ("_7042e8f45e", adapter->getIdentity());
}

TEST_F(MqttAdapterTest, should_change_if_host_changes)
{
asio::io_context ioc;
asio::io_context::strand strand(ioc);
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
{configuration::Host, "mybroker.com"s},
{configuration::Port, 1883},
{configuration::Protocol, "mqtt"s},
{configuration::Topics, StringList {"pipeline/#"s}}};
boost::property_tree::ptree tree;
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
ASSERT_EQ("_89c11f795e", adapter->getIdentity());

options[configuration::Host] = "localhost"s;
adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://localhost:1883/", adapter->getName());
ASSERT_EQ("_4cd2e64d4e", adapter->getIdentity());
}

TEST_F(MqttAdapterTest, should_be_able_to_set_adapter_identity)
{
asio::io_context ioc;
asio::io_context::strand strand(ioc);
ConfigOptions options {{configuration::Url, "mqtt://mybroker.com:1883"s},
{configuration::Host, "mybroker.com"s},
{configuration::Port, 1883},
{configuration::Protocol, "mqtt"s},
{configuration::AdapterIdentity, "MyIdentity"s},
{configuration::Topics, StringList {"pipeline/#"s}}};
boost::property_tree::ptree tree;
pipeline::PipelineContextPtr context = make_shared<pipeline::PipelineContext>();
context->m_contract = make_unique<MockPipelineContract>(SCHEMA_VERSION(2, 5));
auto adapter = make_unique<MqttAdapter>(ioc, context, options, tree);

ASSERT_EQ("mqtt://mybroker.com:1883/", adapter->getName());
ASSERT_EQ("MyIdentity", adapter->getIdentity());
}