diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index 5f48077f10..7666082d51 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -99,7 +99,6 @@ add_library(O2QualityControl src/Check.cxx src/Aggregator.cxx src/HashDataDescription.cxx - src/ServiceDiscovery.cxx src/Triggers.cxx src/TriggerHelpers.cxx src/PostProcessingRunner.cxx diff --git a/Framework/include/QualityControl/AggregatorRunner.h b/Framework/include/QualityControl/AggregatorRunner.h index bec022a223..a31e4b5c9d 100644 --- a/Framework/include/QualityControl/AggregatorRunner.h +++ b/Framework/include/QualityControl/AggregatorRunner.h @@ -51,10 +51,6 @@ class ConfigurationInterface; namespace o2::quality_control { -namespace core -{ -class ServiceDiscovery; -} namespace checker { class Aggregator; @@ -150,7 +146,6 @@ class AggregatorRunner : public framework::Task void initDatabase(); void initMonitoring(); - void initServiceDiscovery(); void initLibraries(); void initAggregators(); @@ -203,9 +198,6 @@ class AggregatorRunner : public framework::Task int mTotalNumberObjectsReceived; int mTotalNumberAggregatorExecuted; int mTotalNumberObjectsProduced; - - // Service discovery - std::shared_ptr mServiceDiscovery; }; } // namespace o2::quality_control::checker diff --git a/Framework/include/QualityControl/ObjectsManager.h b/Framework/include/QualityControl/ObjectsManager.h index dbd8968cae..7a3f3bbd32 100644 --- a/Framework/include/QualityControl/ObjectsManager.h +++ b/Framework/include/QualityControl/ObjectsManager.h @@ -23,10 +23,7 @@ #include "QualityControl/MonitorObjectCollection.h" #include // stl -#include #include -#include -#include class TObject; @@ -49,8 +46,6 @@ enum class PublicationPolicy { Forever }; -class ServiceDiscovery; - /// \brief Keeps the list of encapsulated objects to publish and does the actual publication. /// /// Keeps a list of the objects to publish, encapsulates them and does the actual publication. @@ -69,7 +64,7 @@ class ObjectsManager * @param parallelTaskID ID of a parallel Task, use 0 if there is only one. * @param noDiscovery If true disables the use of ServiceDiscovery */ - ObjectsManager(std::string taskName, std::string taskClass, std::string detectorName, std::string consulUrl, int parallelTaskID = 0, bool noDiscovery = false); + ObjectsManager(std::string taskName, std::string taskClass, std::string detectorName, int parallelTaskID = 0); virtual ~ObjectsManager(); static const std::string gDrawOptionsKey; @@ -202,19 +197,6 @@ class ObjectsManager */ MonitorObject* getMonitorObject(size_t index); - /** - * \brief Update the list of objects stored in the Service Discovery. - * Update the list of objects stored in the Service Discovery. - */ - void updateServiceDiscovery(); - - /** - * \brief Remove all objects from the ServiceDiscovery. - * Remove all objects from the ServiceDiscovery even though they still might be published by the task. - * This is typically used at End of Activity. - */ - void removeAllFromServiceDiscovery(); - /** * \brief Sets the validity interval of all registered objects. */ @@ -237,8 +219,6 @@ class ObjectsManager std::string mTaskName; std::string mTaskClass; std::string mDetectorName; - std::unique_ptr mServiceDiscovery; - bool mUpdateServiceDiscovery; Activity mActivity; std::vector mMovingWindowsList; diff --git a/Framework/include/QualityControl/PostProcessingRunner.h b/Framework/include/QualityControl/PostProcessingRunner.h index ce67fa157d..779e0edb63 100644 --- a/Framework/include/QualityControl/PostProcessingRunner.h +++ b/Framework/include/QualityControl/PostProcessingRunner.h @@ -19,8 +19,8 @@ #include #include -#include #include +#include #include "QualityControl/PostProcessingConfig.h" #include "QualityControl/PostProcessingInterface.h" #include "QualityControl/PostProcessingRunnerConfig.h" diff --git a/Framework/include/QualityControl/ServiceDiscovery.h b/Framework/include/QualityControl/ServiceDiscovery.h deleted file mode 100644 index f46f06e426..0000000000 --- a/Framework/include/QualityControl/ServiceDiscovery.h +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2019-2020 CERN and copyright holders of ALICE O2. -// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. -// All rights not expressly granted are reserved. -// -// This software is distributed under the terms of the GNU General Public -// License v3 (GPL Version 3), copied verbatim in the file "COPYING". -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// -/// \author Adam Wegrzynek -/// - -#ifndef QC_SERVICEDISCOVERY_H -#define QC_SERVICEDISCOVERY_H - -#include -#include -#include -#include -#include -#include - -typedef void CURL; - -namespace o2::quality_control::core -{ - -/// \brief Information service for QC -/// -/// Register a endpoint to Consul which then performs health checks it -/// Allow to publish list of online objects -class ServiceDiscovery -{ - public: - /// Sets up CURL and health check - /// \param url Consul URL - /// \param name Service name - /// \param id Unique instance ID - /// \param healthEndUrl Local endpoint that is then used for health checks - ServiceDiscovery(const std::string& url, const std::string& name, const std::string& id, const std::string& healthEndUrl = ""); - - /// Stops the health thread and deregisteres from Consul health checks - ~ServiceDiscovery(); - - /// Registers list of online objects by sending HTTP PUT request to Consul server - /// \param objects List of comma separated objects - bool _register(const std::string& objects); - - /// Deregisters service - void deregister(); - - static constexpr size_t HealthPortRangeStart = 47800; ///< Health check port range start - static constexpr size_t HealthPortRangeEnd = 47899; ///< Health check port range end - - private: - /// Custom deleter of CURL object - static void deleteCurl(CURL* curl); - - /// CURL instance - std::unique_ptr curlHandle; - - const std::string mConsulUrl; ///< Consul URL - const std::string mName; ///< Instance (service) Name - const std::string mId; ///< Instance (service) ID - std::string mHealthUrl; ///< hostname of health check endpoint - size_t mHealthPort = 0; ///< Port of the health check endpoint - std::mutex mHealthPortMutex; ///< Mutex for the port - std::condition_variable mHealthPortCV; ///< Condition varaible for the port - std::atomic mHealthPortAssigned; ///< Port of the health check is ready. - std::thread mHealthThread; ///< Health check thread - std::atomic mThreadRunning; ///< Health check thread running flag - - /// Initializes CURL - CURL* initCurl(); - - /// Sends PUT request - bool send(const std::string& path, std::string&& request); - - /// Health check thread loop + port computation - void runHealthServer(); -}; - -} // namespace o2::quality_control::core -#endif // QC_SERVICEDISCOVERY_H diff --git a/Framework/src/AggregatorRunner.cxx b/Framework/src/AggregatorRunner.cxx index 6225d8178a..4ce15a2aa0 100644 --- a/Framework/src/AggregatorRunner.cxx +++ b/Framework/src/AggregatorRunner.cxx @@ -33,7 +33,6 @@ // QC #include "QualityControl/DatabaseFactory.h" #include "QualityControl/QcInfoLogger.h" -#include "QualityControl/ServiceDiscovery.h" #include "QualityControl/Aggregator.h" #include "QualityControl/runnerUtils.h" #include "QualityControl/InfrastructureSpecReader.h" @@ -73,9 +72,6 @@ AggregatorRunner::AggregatorRunner(AggregatorRunnerConfig arc, const std::vector AggregatorRunner::~AggregatorRunner() { ILOG(Debug, Trace) << "AggregatorRunner destructor (" << this << ")" << ENDM; - if (mServiceDiscovery != nullptr) { - mServiceDiscovery->deregister(); - } } void AggregatorRunner::prepareInputs() @@ -127,7 +123,6 @@ void AggregatorRunner::init(framework::InitContext& iCtx) } initDatabase(); initMonitoring(); - initServiceDiscovery(); initAggregators(); } catch (...) { ILOG(Fatal) << "Unexpected exception during initialization: " @@ -253,18 +248,6 @@ void AggregatorRunner::initMonitoring() mTimer.reset(1000000); // 10 s. } -void AggregatorRunner::initServiceDiscovery() -{ - auto consulUrl = mRunnerConfig.consulUrl; - if (consulUrl.empty()) { - mServiceDiscovery = nullptr; - ILOG(Warning, Support) << "Service Discovery disabled" << ENDM; - return; - } - mServiceDiscovery = std::make_shared(consulUrl, mDeviceName, mDeviceName); - ILOG(Info, Devel) << "ServiceDiscovery initialized"; -} - void AggregatorRunner::initAggregators() { ILOG(Info, Devel) << "Initialization of the aggregators" << ENDM; diff --git a/Framework/src/CheckRunner.cxx b/Framework/src/CheckRunner.cxx index b677294e5e..416fdbe889 100644 --- a/Framework/src/CheckRunner.cxx +++ b/Framework/src/CheckRunner.cxx @@ -29,14 +29,12 @@ #include // QC #include "QualityControl/DatabaseFactory.h" -#include "QualityControl/ServiceDiscovery.h" #include "QualityControl/runnerUtils.h" #include "QualityControl/InfrastructureSpecReader.h" #include "QualityControl/CheckRunnerFactory.h" #include "QualityControl/RootClassFactory.h" #include "QualityControl/ConfigParamGlo.h" #include "QualityControl/Bookkeeping.h" -#include "QualityControl/WorkflowType.h" #include @@ -164,9 +162,6 @@ CheckRunner::CheckRunner(CheckRunnerConfig checkRunnerConfig, InputSpec input) CheckRunner::~CheckRunner() { ILOG(Debug, Trace) << "CheckRunner destructor (" << this << ")" << ENDM; - if (mServiceDiscovery != nullptr) { - mServiceDiscovery->deregister(); - } } void CheckRunner::init(framework::InitContext& iCtx) @@ -176,7 +171,6 @@ void CheckRunner::init(framework::InitContext& iCtx) Bookkeeping::getInstance().init(mConfig.bookkeepingUrl); initDatabase(); initMonitoring(); - initServiceDiscovery(); initLibraries(); // we have to load libraries before we load ConfigurableParams, otherwise the corresponding ROOT dictionaries won't be found if (!ConfigParamGlo::keyValues.empty()) { @@ -413,7 +407,6 @@ void CheckRunner::updateServiceDiscovery(const QualityObjectsType& qualityObject objects += path + ","; } objects.pop_back(); // remove last comma - mServiceDiscovery->_register(objects); } void CheckRunner::initDatabase() @@ -431,17 +424,6 @@ void CheckRunner::initMonitoring() mTimer.reset(10000000); // 10 s. } -void CheckRunner::initServiceDiscovery() -{ - if (mConfig.consulUrl.empty()) { - mServiceDiscovery = nullptr; - ILOG(Warning, Support) << "Service Discovery disabled" << ENDM; - return; - } - mServiceDiscovery = std::make_shared(mConfig.consulUrl, mDeviceName, mDeviceName); - ILOG(Info, Support) << "ServiceDiscovery initialized" << ENDM; -} - void CheckRunner::initLibraries() { std::set moduleNames; diff --git a/Framework/src/ObjectsManager.cxx b/Framework/src/ObjectsManager.cxx index 0345dac7c1..5cdbc095fc 100644 --- a/Framework/src/ObjectsManager.cxx +++ b/Framework/src/ObjectsManager.cxx @@ -17,7 +17,6 @@ #include "QualityControl/ObjectsManager.h" #include "QualityControl/QcInfoLogger.h" -#include "QualityControl/ServiceDiscovery.h" #include "QualityControl/MonitorObjectCollection.h" #include #include @@ -36,23 +35,14 @@ namespace o2::quality_control::core const std::string ObjectsManager::gDrawOptionsKey = "drawOptions"; const std::string ObjectsManager::gDisplayHintsKey = "displayHints"; -ObjectsManager::ObjectsManager(std::string taskName, std::string taskClass, std::string detectorName, std::string consulUrl, int parallelTaskID, bool noDiscovery) - : mTaskName(std::move(taskName)), mTaskClass(std::move(taskClass)), mDetectorName(std::move(detectorName)), mUpdateServiceDiscovery(false) +ObjectsManager::ObjectsManager(std::string taskName, std::string taskClass, std::string detectorName, int parallelTaskID) + : mTaskName(std::move(taskName)), mTaskClass(std::move(taskClass)), mDetectorName(std::move(detectorName)) { mMonitorObjects = std::make_unique(); mMonitorObjects->SetOwner(true); mMonitorObjects->SetName(mTaskName.c_str()); mMonitorObjects->setDetector(mDetectorName); mMonitorObjects->setTaskName(mTaskName); - - // register with the discovery service - if (!noDiscovery && !consulUrl.empty()) { - std::string uniqueTaskID = mTaskName + "_" + std::to_string(parallelTaskID); - mServiceDiscovery = std::make_unique(consulUrl, mTaskName, uniqueTaskID); - } else { - ILOG(Warning, Support) << "Service Discovery disabled" << ENDM; - mServiceDiscovery = nullptr; - } } ObjectsManager::~ObjectsManager() @@ -81,39 +71,9 @@ void ObjectsManager::startPublishingImpl(TObject* object, PublicationPolicy publ newObject->setActivity(mActivity); newObject->setCreateMovingWindow(std::find(mMovingWindowsList.begin(), mMovingWindowsList.end(), object->GetName()) != mMovingWindowsList.end()); mMonitorObjects->Add(newObject); - mUpdateServiceDiscovery = true; mPublicationPoliciesForMOs[newObject] = publicationPolicy; } -void ObjectsManager::updateServiceDiscovery() -{ - if (!mUpdateServiceDiscovery || mServiceDiscovery == nullptr) { - return; - } - // prepare the string of comma separated objects and publish it - string objects; - for (auto tobj : *mMonitorObjects) { - auto* mo = dynamic_cast(tobj); - if (mo) { - objects += mo->getPath() + ","; - } else { - ILOG(Error, Devel) << "ObjectsManager::updateServiceDiscovery : dynamic_cast returned nullptr." << ENDM; - } - } - objects.pop_back(); - mServiceDiscovery->_register(objects); - mUpdateServiceDiscovery = false; -} - -void ObjectsManager::removeAllFromServiceDiscovery() -{ - if (mServiceDiscovery == nullptr) { - return; - } - mServiceDiscovery->_register(""); - mUpdateServiceDiscovery = true; -} - void ObjectsManager::stopPublishing(TObject* object) { if (!object) { @@ -161,7 +121,6 @@ void ObjectsManager::stopPublishing(PublicationPolicy policy) void ObjectsManager::stopPublishingAll() { - removeAllFromServiceDiscovery(); mMonitorObjects->Clear(); mPublicationPoliciesForMOs.clear(); } diff --git a/Framework/src/PostProcessingRunner.cxx b/Framework/src/PostProcessingRunner.cxx index b2a7636c82..d54cb50c24 100644 --- a/Framework/src/PostProcessingRunner.cxx +++ b/Framework/src/PostProcessingRunner.cxx @@ -94,7 +94,7 @@ void PostProcessingRunner::init(const PostProcessingRunnerConfig& runnerConfig, mSourceDatabase = configureDatabase(mRunnerConfig.sourceDatabase, "Source"); mDestinationDatabase = configureDatabase(mRunnerConfig.destinationDatabase, "Destination"); - mObjectManager = std::make_shared(mTaskConfig.taskName, mTaskConfig.className, mTaskConfig.detectorName, mRunnerConfig.consulUrl); + mObjectManager = std::make_shared(mTaskConfig.taskName, mTaskConfig.className, mTaskConfig.detectorName); mObjectManager->setActivity(mActivity); mServices.registerService(mSourceDatabase.get()); if (mPublicationCallback == nullptr) { diff --git a/Framework/src/ServiceDiscovery.cxx b/Framework/src/ServiceDiscovery.cxx deleted file mode 100644 index ca21e77dbc..0000000000 --- a/Framework/src/ServiceDiscovery.cxx +++ /dev/null @@ -1,227 +0,0 @@ -// Copyright 2019-2020 CERN and copyright holders of ALICE O2. -// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. -// All rights not expressly granted are reserved. -// -// This software is distributed under the terms of the GNU General Public -// License v3 (GPL Version 3), copied verbatim in the file "COPYING". -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// -/// \author Adam Wegrzynek -/// - -#include "QualityControl/ServiceDiscovery.h" -#include "QualityControl/QcInfoLogger.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using boost::asio::ip::tcp; -using namespace boost::system; -namespace o2::quality_control::core -{ - -ServiceDiscovery::ServiceDiscovery(const std::string& url, const std::string& name, const std::string& id, const std::string& healthEndUrl) - : curlHandle(initCurl(), &ServiceDiscovery::deleteCurl), mConsulUrl(url), mName(name), mId(id), mHealthUrl(healthEndUrl) -{ - mHealthUrl = mHealthUrl.empty() ? boost::asio::ip::host_name() : mHealthUrl; - mHealthPortAssigned = false; - - mHealthThread = std::thread([this] { -#ifdef __linux__ - std::string threadName = "QC/SrvcDiscov"; - pthread_setname_np(pthread_self(), threadName.c_str()); -#endif - runHealthServer(); - }); - if (!_register("")) { - ILOG(Error, Support) << "Could not register to ServiceDiscovery." << ENDM; - } -} - -ServiceDiscovery::~ServiceDiscovery() -{ - ILOG(Debug, Devel) << "ServiceDiscovery destructor" << ENDM; - mThreadRunning = false; - if (mHealthThread.joinable()) { - mHealthThread.join(); - } - deregister(); -} - -CURL* ServiceDiscovery::initCurl() -{ - CURLcode globalInitResult = curl_global_init(CURL_GLOBAL_ALL); - if (globalInitResult != CURLE_OK) { - throw std::runtime_error(std::string("cURL init") + curl_easy_strerror(globalInitResult)); - } - CURL* curl = curl_easy_init(); - curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 2); - curl_easy_setopt(curl, CURLOPT_TIMEOUT, 2); - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT"); - curl_easy_setopt(curl, CURLOPT_TCP_KEEPIDLE, 120L); - curl_easy_setopt(curl, CURLOPT_TCP_KEEPINTVL, 60L); - FILE* devnull = fopen("/dev/null", "w+"); - curl_easy_setopt(curl, CURLOPT_WRITEDATA, devnull); - return curl; -} - -bool ServiceDiscovery::_register(const std::string& objects) -{ - boost::property_tree::ptree pt; - if (!objects.empty()) { - std::vector objectsVec; - boost::split(objectsVec, objects, boost::is_any_of(","), boost::token_compress_on); - boost::property_tree::ptree tag, tags; - for (auto& object : objectsVec) { - tag.put("", object); - tags.push_back(std::make_pair("", tag)); - } - pt.add_child("Tags", tags); - } - - boost::property_tree::ptree checks, check; - check.put("Name", "Health check " + mId); - check.put("Interval", "5s"); - check.put("DeregisterCriticalServiceAfter", "1m"); - // Wait until port is set by the thread - { - std::unique_lock lk(mHealthPortMutex); - mHealthPortCV.wait(lk, [this] { return mHealthPortAssigned == true; }); - } - check.put("TCP", mHealthUrl + ":" + std::to_string(mHealthPort)); - checks.push_back(std::make_pair("", check)); - - pt.put("Name", mName); - pt.put("ID", mId); - pt.add_child("Checks", checks); - - std::stringstream ss; - boost::property_tree::json_parser::write_json(ss, pt); - - ILOG(Debug, Devel) << "Registration to ServiceDiscovery: " << objects << ENDM; - return send("/v1/agent/service/register", ss.str()); -} - -void ServiceDiscovery::deregister() -{ - send("/v1/agent/service/deregister/" + mId, ""); - ILOG(Debug, Devel) << "Deregistration from ServiceDiscovery" << ENDM; -} - -void ServiceDiscovery::runHealthServer() -{ - using boost::asio::ip::tcp; - mThreadRunning = true; - - // InfoLogger is not thread safe, we create a new instance for this thread. - AliceO2::InfoLogger::InfoLogger threadInfoLogger; - infoContext context; - context.setField(infoContext::FieldName::Facility, "ServiceDiscovery"); - context.setField(infoContext::FieldName::System, "QC"); - threadInfoLogger.setContext(context); - - // Find a free port and create the endpoint and the acceptor - boost::asio::io_context io_context; - tcp::acceptor* acceptor = nullptr; - size_t port = 0; - std::random_device rd; // obtain a random number from hardware - std::mt19937 gen(rd()); // seed the generator - size_t rangeLength = HealthPortRangeEnd - HealthPortRangeStart + 1; - std::uniform_int_distribution<> distr(0, rangeLength - 1); // define the inclusive range - size_t index = distr(gen); // get a random index in the range - size_t cycle = 0; // count how many ports we tried - while (cycle < rangeLength) { // until we exhaust the range or we find a free port - try { - index = (index + 1) % rangeLength; // pick the next index - port = HealthPortRangeStart + index; - ILOG(Debug, Trace) << "ServiceDiscovery test port: " << port << ENDM; - cycle++; - - tcp::endpoint endpoint(tcp::v4(), port); - acceptor = new tcp::acceptor(io_context, endpoint); - } catch (boost::system::system_error& e) { - ILOG(Debug, Trace) << "ServiceDiscovery::runHealthServer - cound not bind to " << port << ENDM; - continue; // try the next one - } - break; - } - - // assign the port and unblock the main thread (we got a port or we failed) - { - ILOG(Debug, Devel) << "ServiceDiscovery selected port: " << mHealthPort << ENDM; - std::lock_guard lk(mHealthPortMutex); - mHealthPort = port; - mHealthPortAssigned = true; - } - mHealthPortCV.notify_one(); - - if (cycle == rangeLength) { - ILOG(Error, Support) << "Could not find a free port for the ServiceDiscovery, aborting the ServiceDiscovery health check" << ENDM; - return; - } - - // run the thread - try { - boost::asio::deadline_timer timer(io_context); - while (mThreadRunning) { - io_context.restart(); - timer.expires_from_now(boost::posix_time::seconds(1)); - acceptor->async_accept([this](boost::system::error_code ec, tcp::socket socket) { - }); - timer.async_wait([&](boost::system::error_code ec) { - io_context.stop(); - }); - io_context.run(); - } - } catch (std::exception& e) { - mThreadRunning = false; - threadInfoLogger << AliceO2::InfoLogger::InfoLogger::Severity::Warning << "ServiceDiscovery::runHealthServer - " << e.what() << ENDM; - } -} - -void ServiceDiscovery::deleteCurl(CURL* curl) -{ - curl_easy_cleanup(curl); - curl_global_cleanup(); -} - -bool ServiceDiscovery::send(const std::string& path, std::string&& post) -{ - std::string uri = mConsulUrl + path; - CURLcode response; - long responseCode; - CURL* curl = curlHandle.get(); - curl_easy_setopt(curl, CURLOPT_URL, uri.c_str()); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, post.c_str()); - response = curl_easy_perform(curl); - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &responseCode); - static AliceO2::InfoLogger::InfoLogger::AutoMuteToken msgLimit(LogWarningDevel, 1, 600); // send it only every 10 minutes - if (response != CURLE_OK) { - std::string s = std::string("ServiceDiscovery::send(...) ") + curl_easy_strerror(response) + "\n URI: " + uri; - ILOG_INST.log(msgLimit, "%s", s.c_str()); - return false; - } - if (responseCode < 200 || responseCode > 206) { - std::string s = std::string("ServiceDiscovery::send(...) Response code: ") + std::to_string(responseCode); - ILOG_INST.log(msgLimit, "%s", s.c_str()); - return false; - } - return true; -} - -} // namespace o2::quality_control::core diff --git a/Framework/src/TaskRunner.cxx b/Framework/src/TaskRunner.cxx index 8450c7f580..2820b9c512 100644 --- a/Framework/src/TaskRunner.cxx +++ b/Framework/src/TaskRunner.cxx @@ -106,7 +106,7 @@ void TaskRunner::init(InitContext& iCtx) mCollector->addGlobalTag("DetectorName", mTaskConfig.detectorName); // setup publisher - mObjectsManager = std::make_shared(mTaskConfig.taskName, mTaskConfig.className, mTaskConfig.detectorName, mTaskConfig.consulUrl, mTaskConfig.parallelTaskID); + mObjectsManager = std::make_shared(mTaskConfig.taskName, mTaskConfig.className, mTaskConfig.detectorName, mTaskConfig.parallelTaskID); mObjectsManager->setMovingWindowsList(mTaskConfig.movingWindows); // setup timekeeping @@ -408,7 +408,6 @@ void TaskRunner::startOfActivity() mCollector->setRunNumber(mActivity.mId); mTask->startOfActivity(mActivity); - mObjectsManager->updateServiceDiscovery(); } void TaskRunner::endOfActivity() @@ -420,7 +419,6 @@ void TaskRunner::endOfActivity() mTimekeeper->setEndOfActivity(mActivity.mValidity.getMax(), mTaskConfig.fallbackActivity.mValidity.getMax(), now, activity_helpers::getCcdbEorTimeAccessor(mActivity.mId)); mTask->endOfActivity(mObjectsManager->getActivity()); - mObjectsManager->removeAllFromServiceDiscovery(); mObjectsManager->stopPublishing(PublicationPolicy::ThroughStop); double rate = mTotalNumberObjectsPublished / mTimerTotalDurationActivity.getTime(); @@ -480,7 +478,6 @@ void TaskRunner::finishCycle(DataAllocator& outputs) saveToFile(); publishCycleStats(); - mObjectsManager->updateServiceDiscovery(); mCycleNumber++; mCycleOn = false; diff --git a/Framework/test/testObjectsManager.cxx b/Framework/test/testObjectsManager.cxx index 0d9618eb0e..17dfec2080 100644 --- a/Framework/test/testObjectsManager.cxx +++ b/Framework/test/testObjectsManager.cxx @@ -43,7 +43,7 @@ BOOST_AUTO_TEST_CASE(invalid_url_test) Config config; config.taskName = "test"; config.consulUrl = "bad-url:1234"; - ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, config.consulUrl, 0, true); + ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, 0); } BOOST_AUTO_TEST_CASE(duplicate_object_test) @@ -51,7 +51,7 @@ BOOST_AUTO_TEST_CASE(duplicate_object_test) Config config; config.taskName = "test"; config.consulUrl = ""; - ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, config.consulUrl, 0, true); + ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, 0); TObjString s("content"); objectsManager.startPublishing(&s, PublicationPolicy::Forever); BOOST_CHECK_NO_THROW(objectsManager.startPublishing(&s, PublicationPolicy::Forever)); @@ -70,7 +70,7 @@ BOOST_AUTO_TEST_CASE(is_being_published_test) Config config; config.taskName = "test"; config.consulUrl = ""; - ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, config.consulUrl, 0, true); + ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, 0); TObjString s("content"); BOOST_CHECK(!objectsManager.isBeingPublished("content")); objectsManager.startPublishing(&s, PublicationPolicy::Forever); @@ -82,7 +82,7 @@ BOOST_AUTO_TEST_CASE(unpublish_test) { Config config; config.taskName = "test"; - ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, config.consulUrl, 0, true); + ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, 0); TObjString s("content"); objectsManager.startPublishing(&s, PublicationPolicy::Forever); BOOST_CHECK_EQUAL(objectsManager.getNumberPublishedObjects(), 1); @@ -140,7 +140,7 @@ BOOST_AUTO_TEST_CASE(getters_test) Config config; config.taskName = "test"; config.consulUrl = ""; - ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, config.consulUrl, 0, true); + ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, 0); TObjString s("content"); TH1F h("histo", "h", 100, 0, 99); @@ -170,7 +170,7 @@ BOOST_AUTO_TEST_CASE(metadata_test) Config config; config.taskName = "test"; config.consulUrl = ""; - ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, config.consulUrl, 0, true); + ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, 0); TObjString s("content"); TH1F h("histo", "h", 100, 0, 99); @@ -186,7 +186,7 @@ BOOST_AUTO_TEST_CASE(drawOptions_test) Config config; config.taskName = "test"; config.consulUrl = ""; - ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, config.consulUrl, 0, true); + ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, 0); TH1F h("histo", "h", 100, 0, 99); objectsManager.startPublishing(&h, PublicationPolicy::Forever); @@ -209,7 +209,7 @@ BOOST_AUTO_TEST_CASE(feed_with_nullptr) Config config; config.taskName = "test"; config.consulUrl = ""; - ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, config.consulUrl, 0, true); + ObjectsManager objectsManager(config.taskName, config.taskClass, config.detectorName, 0); BOOST_CHECK_NO_THROW(objectsManager.startPublishing(nullptr, PublicationPolicy::Forever)); BOOST_CHECK_NO_THROW(objectsManager.setDefaultDrawOptions(nullptr, "")); diff --git a/Framework/test/testPublisher.cxx b/Framework/test/testPublisher.cxx index 713a52da39..697ad01202 100644 --- a/Framework/test/testPublisher.cxx +++ b/Framework/test/testPublisher.cxx @@ -36,7 +36,7 @@ BOOST_AUTO_TEST_CASE(publisher_test) std::string taskName = "test"; std::string detectorName = "TST"; std::string consulUrl = "invalid"; - ObjectsManager objectsManager(taskName, "taskClass", detectorName, consulUrl, 0, true); + ObjectsManager objectsManager(taskName, "taskClass", detectorName, 0); TObjString s("content"); objectsManager.startPublishing(&s, PublicationPolicy::Forever); diff --git a/Framework/test/testTaskInterface.cxx b/Framework/test/testTaskInterface.cxx index 7b93b3ad21..3d7c8f03c4 100644 --- a/Framework/test/testTaskInterface.cxx +++ b/Framework/test/testTaskInterface.cxx @@ -132,7 +132,7 @@ TEST_CASE("test_invoke_all_TaskRunnerConfig_methods") { // This is maximum that we can do until we are able to test the DPL algorithms in isolation. TaskRunnerConfig taskConfig; - auto* objectsManager = new ObjectsManager(taskConfig.taskName, taskConfig.className, taskConfig.detectorName, taskConfig.consulUrl, 0, true); + auto* objectsManager = new ObjectsManager(taskConfig.taskName, taskConfig.className, taskConfig.detectorName, 0); test::TestTask testTask(objectsManager); CHECK(testTask.test == 0); @@ -182,7 +182,7 @@ TEST_CASE("test_task_factory") "" }; - auto objectsManager = make_shared(config.taskName, config.className, config.detectorName, config.consulUrl); + auto objectsManager = make_shared(config.taskName, config.className, config.detectorName, 0); TaskFactory taskFactory; auto task = taskFactory.create(config, objectsManager); @@ -205,7 +205,7 @@ TEST_CASE("retrieveCondition") // retrieve it TaskRunnerConfig taskConfig; - auto* objectsManager = new ObjectsManager(taskConfig.taskName, taskConfig.className, taskConfig.detectorName, taskConfig.consulUrl, 0, true); + auto* objectsManager = new ObjectsManager(taskConfig.taskName, taskConfig.className, taskConfig.detectorName, 0); test::TestTask testTask(objectsManager); testTask.setCcdbUrl("ccdb-test.cern.ch:8080"); o2::emcal::BadChannelMap* bcm = testTask.testRetrieveCondition(); diff --git a/Framework/test/testTrendingTask.cxx b/Framework/test/testTrendingTask.cxx index de9cd3c64f..e2b32216c3 100644 --- a/Framework/test/testTrendingTask.cxx +++ b/Framework/test/testTrendingTask.cxx @@ -163,7 +163,7 @@ TEST_CASE("test_trending_task") // Test "trendIfAllInputs". There should not be anything in DB so we don't have any input sources available { - auto objectManager = std::make_shared(taskName, "o2::quality_control::postprocessing::TrendingTask", "TST", ""); + auto objectManager = std::make_shared(taskName, "o2::quality_control::postprocessing::TrendingTask", "TST", 0); ServiceRegistry services; services.registerService(repository.get()); @@ -238,7 +238,7 @@ TEST_CASE("test_trending_task") // Running the task ServiceRegistry services; services.registerService(repository.get()); - auto objectManager = std::make_shared(taskName, "o2::quality_control::postprocessing::TrendingTask", "TST", ""); + auto objectManager = std::make_shared(taskName, "o2::quality_control::postprocessing::TrendingTask", "TST"); TrendingTask task; task.setName(trendingTaskName); diff --git a/Modules/Daq/test/testQcDaq.cxx b/Modules/Daq/test/testQcDaq.cxx index fc5785d3c7..e9333367ff 100644 --- a/Modules/Daq/test/testQcDaq.cxx +++ b/Modules/Daq/test/testQcDaq.cxx @@ -32,7 +32,7 @@ BOOST_AUTO_TEST_CASE(instantiate_task) config.consulUrl = ""; config.taskName = "qcDaqTest"; config.detectorName = "DAQ"; - auto manager = make_shared(config.taskName, "DaqTask", config.detectorName, config.consulUrl, 0, true); + auto manager = make_shared(config.taskName, "DaqTask", config.detectorName, 0); task.setObjectsManager(manager); // o2::framework::InitContext ctx; // task.initialize(ctx); // TODO diff --git a/Modules/Example/test/testFactory.cxx b/Modules/Example/test/testFactory.cxx index fb4faea2a5..1449e2bff4 100644 --- a/Modules/Example/test/testFactory.cxx +++ b/Modules/Example/test/testFactory.cxx @@ -30,7 +30,7 @@ BOOST_AUTO_TEST_CASE(Task_Factory) config.moduleName = "QcCommon"; config.className = "o2::quality_control_modules::example::ExampleTask"; config.detectorName = "DAQ"; - auto manager = make_shared(config.taskName, config.className, config.detectorName, config.consulUrl, 0, true); + auto manager = make_shared(config.taskName, config.className, config.detectorName, 0); try { gSystem->AddDynamicPath("lib:../../lib:../../../lib:.:"); // add local paths for the test factory.create(config, manager); @@ -45,7 +45,7 @@ BOOST_AUTO_TEST_CASE(Task_Factory_failures, *utf::depends_on("Task_Factory") /* { TaskFactory factory; TaskRunnerConfig config; - auto manager = make_shared(config.taskName, config.className, config.detectorName, config.consulUrl, 0, true); + auto manager = make_shared(config.taskName, config.className, config.detectorName, 0); config.taskName = "task"; config.moduleName = "WRONGNAME"; diff --git a/Modules/Example/test/testQcExample.cxx b/Modules/Example/test/testQcExample.cxx index ac06c9f24c..2193668e0b 100644 --- a/Modules/Example/test/testQcExample.cxx +++ b/Modules/Example/test/testQcExample.cxx @@ -27,7 +27,7 @@ BOOST_AUTO_TEST_CASE(insantiate_task) config.consulUrl = ""; config.taskName = "qcExampleTest"; config.detectorName = "TST"; - auto manager = make_shared(config.taskName, "ExampleTask", config.detectorName, config.consulUrl, 0, true); + auto manager = make_shared(config.taskName, "ExampleTask", config.detectorName, 0); task.setObjectsManager(manager); // task.initialize();// TODO diff --git a/Modules/Skeleton/test/testQcSkeleton.cxx b/Modules/Skeleton/test/testQcSkeleton.cxx index 16bab8f266..2345baa0ea 100644 --- a/Modules/Skeleton/test/testQcSkeleton.cxx +++ b/Modules/Skeleton/test/testQcSkeleton.cxx @@ -38,7 +38,7 @@ BOOST_AUTO_TEST_CASE(instantiate_task) config.consulUrl = ""; config.taskName = "qcSkeletonTest"; config.detectorName = "TST"; - auto manager = make_shared(config.taskName, "SkeletonTask", config.detectorName, config.consulUrl, 0, true); + auto manager = make_shared(config.taskName, "SkeletonTask", config.detectorName, 0); task.setObjectsManager(manager); // o2::framework::InitContext ctx; // task.initialize(ctx);