diff --git a/Framework/include/QualityControl/CommonSpec.h b/Framework/include/QualityControl/CommonSpec.h index 7041c346fa..8a7cc90dd5 100644 --- a/Framework/include/QualityControl/CommonSpec.h +++ b/Framework/include/QualityControl/CommonSpec.h @@ -44,6 +44,8 @@ struct CommonSpec { LogDiscardParameters infologgerDiscardParameters; double postprocessingPeriod = 30.0; std::string bookkeepingUrl; + std::string kafkaBrokersUrl; + std::string kafkaTopicAliECSRun = "aliecs.run"; }; } // namespace o2::quality_control::core diff --git a/Framework/include/QualityControl/PostProcessingConfig.h b/Framework/include/QualityControl/PostProcessingConfig.h index 88a13d2e62..7e425ed76a 100644 --- a/Framework/include/QualityControl/PostProcessingConfig.h +++ b/Framework/include/QualityControl/PostProcessingConfig.h @@ -39,7 +39,7 @@ struct PostProcessingConfig : public o2::quality_control::core::UserCodeConfig { std::vector updateTriggers = {}; std::vector stopTriggers = {}; std::string kafkaBrokersUrl; - std::string kafkaTopic; + std::string kafkaTopicAliECSRun; core::Activity activity; bool matchAnyRunNumber = false; bool critical; diff --git a/Framework/src/InfrastructureSpecReader.cxx b/Framework/src/InfrastructureSpecReader.cxx index d22d140229..d72641e423 100644 --- a/Framework/src/InfrastructureSpecReader.cxx +++ b/Framework/src/InfrastructureSpecReader.cxx @@ -81,6 +81,8 @@ CommonSpec InfrastructureSpecReader::readSpecEntry(const std::string }; spec.postprocessingPeriod = commonTree.get("postprocessing.periodSeconds", spec.postprocessingPeriod); spec.bookkeepingUrl = commonTree.get("bookkeeping.url", spec.bookkeepingUrl); + spec.kafkaBrokersUrl = commonTree.get("kafka.url", spec.kafkaBrokersUrl); + spec.kafkaTopicAliECSRun = commonTree.get("kafka.topicAliecsRun", spec.kafkaTopicAliECSRun); return spec; } diff --git a/Framework/src/PostProcessingConfig.cxx b/Framework/src/PostProcessingConfig.cxx index 3c33ffda16..1857acb298 100644 --- a/Framework/src/PostProcessingConfig.cxx +++ b/Framework/src/PostProcessingConfig.cxx @@ -39,6 +39,9 @@ PostProcessingConfig::PostProcessingConfig(const std::string& id, const boost::p detectorName = config.get("qc.postprocessing." + id + ".detectorName", "MISC"); ccdbUrl = config.get("qc.config.conditionDB.url", ""); consulUrl = config.get("qc.config.consul.url", ""); + kafkaBrokersUrl = config.get("qc.config.kafka.url", ""); + kafkaTopicAliECSRun = config.get("qc.config.kafka.topicAliecsRun", "aliecs.run"); + // if available, use the source repo as defined in the postprocessing task, otherwise the general QCDB auto sourceRepo = config.get_child_optional("qc.postprocessing." + id + ".sourceRepo"); auto databasePath = sourceRepo ? "qc.postprocessing." + id + ".sourceRepo" : "qc.config.database"; diff --git a/Framework/src/TriggerHelpers.cxx b/Framework/src/TriggerHelpers.cxx index 42708a9c11..e56dd18462 100644 --- a/Framework/src/TriggerHelpers.cxx +++ b/Framework/src/TriggerHelpers.cxx @@ -95,9 +95,9 @@ TriggerFcn triggerFactory(const std::string& trigger, const PostProcessingConfig } else if (triggerLowerCase == "always") { return triggers::Always(activity); } else if (triggerLowerCase == "sor" || triggerLowerCase == "startofrun") { - return triggers::StartOfRun(config.kafkaBrokersUrl, config.kafkaTopic, config.detectorName, config.taskName, activity); + return triggers::StartOfRun(config.kafkaBrokersUrl, config.kafkaTopicAliECSRun, config.detectorName, config.taskName, activity); } else if (triggerLowerCase == "eor" || triggerLowerCase == "endofrun") { - return triggers::EndOfRun(config.kafkaBrokersUrl, config.kafkaTopic, config.detectorName, config.taskName, activity); + return triggers::EndOfRun(config.kafkaBrokersUrl, config.kafkaTopicAliECSRun, config.detectorName, config.taskName, activity); } else if (triggerLowerCase == "sof" || triggerLowerCase == "startoffill") { return triggers::StartOfFill(activity); } else if (triggerLowerCase == "eof" || triggerLowerCase == "endoffill") { diff --git a/doc/Advanced.md b/doc/Advanced.md index d71fd959a2..ada5199da4 100644 --- a/doc/Advanced.md +++ b/doc/Advanced.md @@ -1670,6 +1670,10 @@ should not be present in real configuration files. "bookkeeping": { "": "Configuration of the bookkeeping (optional)", "url": "localhost:4001", "": "Url of the bookkeeping API (port is usually different from web interface)" }, + "kafka": { + "url": "kafka-broker:123", "": "url of the kafka broker", + "topicAliecsRun":"aliecs.run", "": "the topic where AliECS publishes Run Events, 'aliecs.run' by default" + }, "postprocessing": { "": "Configuration parameters for post-processing", "periodSeconds": 10.0, "": "Sets the interval of checking all the triggers. One can put a very small value", "": "for async processing, but use 10 or more seconds for synchronous operations",