diff --git a/server/demo.conf b/server/demo.conf index c850ec5..e55d266 100644 --- a/server/demo.conf +++ b/server/demo.conf @@ -66,7 +66,7 @@ [cf] # cf-store application # a space-separated list of infotags to set as CF Properties -#infotags = archive foo bar blah +#infotags = archive, foo, bar, blah # Uncomment line below to turn off the feature to add CA/PVA port info for name server to channelfinder #iocConnectionInfo = False diff --git a/server/pyproject.toml b/server/pyproject.toml index 2b00e59..c577c92 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -12,6 +12,7 @@ version="1.5" readme = "README.md" requires-python = ">=3.6" dependencies = [ + "dataclasses; python_version < '3.7'", "requests", "twisted", "channelfinder @ https://github.com/ChannelFinder/pyCFClient/archive/refs/tags/v3.0.0.zip" diff --git a/server/recceiver/cfstore.py b/server/recceiver/cfstore.py index 1054900..e0cedef 100755 --- a/server/recceiver/cfstore.py +++ b/server/recceiver/cfstore.py @@ -1,14 +1,13 @@ # -*- coding: utf-8 -*- import datetime -import json +import enum import logging -import os import socket import time from collections import defaultdict -from operator import itemgetter -from typing import Dict, List, Set +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional, Set, Tuple from channelfinder import ChannelFinderClient from requests import ConnectionError, RequestException @@ -20,37 +19,248 @@ from twisted.internet.threads import deferToThread from . import interfaces +from .interfaces import CommitTransaction +from .processors import ConfigAdapter _log = logging.getLogger(__name__) -# ITRANSACTION FORMAT: -# -# source_address = source address -# records_to_add = records ein added ( recname, rectype, {key:val}) -# records_to_delete = a set() of records which are being removed -# client_infos = dictionary of client client_infos -# record_infos_to_add = additional client_infos being added to existing records -# "recid: {key:value}" -# - __all__ = ["CFProcessor"] -RECCEIVERID_KEY = "recceiverID" RECCEIVERID_DEFAULT = socket.gethostname() +DEFAULT_MAX_CHANNEL_NAME_QUERY_LENGTH = 600 +DEFAULT_QUERY_LIMIT = 10_000 + + +class PVStatus(enum.StrEnum): + """PV Status values.""" + + ACTIVE = "Active" + INACTIVE = "Inactive" + + +@dataclass +class CFConfig: + """Configuration options for the CF Processor""" + + alias_enabled: bool = False + record_type_enabled: bool = False + environment_variables: str = "" + info_tags: str = "" + ioc_connection_info: bool = True + record_description_enabled: bool = False + clean_on_start: bool = True + clean_on_stop: bool = True + username: str = "cfstore" + recceiver_id: str = RECCEIVERID_DEFAULT + timezone: Optional[str] = None + cf_query_limit: int = DEFAULT_QUERY_LIMIT + + @classmethod + def loads(cls, conf: ConfigAdapter) -> "CFConfig": + """Load configuration from a ConfigAdapter instance. + + Args: + conf: ConfigAdapter instance containing configuration data. + """ + return CFConfig( + alias_enabled=conf.get("alias", False), + record_type_enabled=conf.get("recordType", False), + environment_variables=conf.get("environment_vars", ""), + info_tags=conf.get("infotags", ""), + ioc_connection_info=conf.get("iocConnectionInfo", True), + record_description_enabled=conf.get("recordDesc", False), + clean_on_start=conf.get("cleanOnStart", True), + clean_on_stop=conf.get("cleanOnStop", True), + username=conf.get("username", "cfstore"), + recceiver_id=conf.get("recceiverId", RECCEIVERID_DEFAULT), + timezone=conf.get("timezone", ""), + cf_query_limit=conf.get("findSizeLimit", DEFAULT_QUERY_LIMIT), + ) + + +@dataclass +class CFProperty: + name: str + owner: str + value: Optional[str] = None + + def as_dict(self) -> Dict[str, str]: + """Convert to dictionary for Channelfinder API.""" + return {"name": self.name, "owner": self.owner, "value": self.value or ""} + + @classmethod + def from_dict(cls, prop_dict: Dict[str, str]) -> "CFProperty": + """Create CFProperty from Channelfinder json output. + + Args: + prop_dict: Dictionary representing a property from Channelfinder. + """ + return cls( + name=prop_dict.get("name", ""), + owner=prop_dict.get("owner", ""), + value=prop_dict.get("value"), + ) + + @classmethod + def record_type(cls, owner: str, record_type: str) -> "CFProperty": + """Create a Channelfinder recordType property. + + Args: + owner: The owner of the property. + recordType: The recordType of the property. + """ + return cls(CFPropertyName.RECORD_TYPE.value, owner, record_type) + + @classmethod + def alias(cls, owner: str, alias: str) -> "CFProperty": + """Create a Channelfinder alias property. + + Args: + owner: The owner of the property. + alias: The alias of the property. + """ + return cls(CFPropertyName.ALIAS.value, owner, alias) + + @classmethod + def pv_status(cls, owner: str, pv_status: PVStatus) -> "CFProperty": + """Create a Channelfinder pvStatus property. + + Args: + owner: The owner of the property. + pvStatus: The pvStatus of the property. + """ + return cls(CFPropertyName.PV_STATUS.value, owner, pv_status.value) + + @classmethod + def active(cls, owner: str) -> "CFProperty": + """Create a Channelfinder active property. + + Args: + owner: The owner of the property. + """ + return cls.pv_status(owner, PVStatus.ACTIVE) + + @classmethod + def inactive(cls, owner: str) -> "CFProperty": + """Create a Channelfinder inactive property. + + Args: + owner: The owner of the property. + """ + return cls.pv_status(owner, PVStatus.INACTIVE) + + @classmethod + def time(cls, owner: str, time: str) -> "CFProperty": + """Create a Channelfinder time property. + + Args: + owner: The owner of the property. + time: The time of the property. + """ + return cls(CFPropertyName.TIME.value, owner, time) + + +@dataclass +class RecordInfo: + """Information about a record to be stored in Channelfinder.""" + + pv_name: str + record_type: Optional[str] = None + info_properties: List[CFProperty] = field(default_factory=list) + aliases: List[str] = field(default_factory=list) + + +class CFPropertyName(enum.StrEnum): + """Standard property names used in Channelfinder.""" + + HOSTNAME = "hostName" + IOC_NAME = "iocName" + IOC_ID = "iocid" + IOC_IP = "iocIP" + PV_STATUS = "pvStatus" + TIME = "time" + RECCEIVER_ID = "recceiverID" + ALIAS = "alias" + RECORD_TYPE = "recordType" + RECORD_DESC = "recordDesc" + CA_PORT = "caPort" + PVA_PORT = "pvaPort" + + +@dataclass +class IocInfo: + """Information about an IOC instance.""" + + host: str + hostname: str + ioc_name: str + ioc_IP: str + owner: str + time: str + port: int + channelcount: int = 0 + + @property + def ioc_id(self): + """Generate a unique IOC ID based on hostname and port.""" + return self.host + ":" + str(self.port) + + +@dataclass +class CFChannel: + """Representation of a Channelfinder channel.""" + + name: str + owner: str + properties: List[CFProperty] + + def as_dict(self) -> Dict[str, Any]: + """Convert to dictionary for conversion to json in Channelfinder API.""" + return { + "name": self.name, + "owner": self.owner, + "properties": [p.as_dict() for p in self.properties], + } + + @classmethod + def from_dict(cls, channel_dict: Dict[str, Any]) -> "CFChannel": + """Create CFChannel from Channelfinder json output. + + Args: + channel_dict: Dictionary representing a channel from Channelfinder. + """ + return cls( + name=channel_dict.get("name", ""), + owner=channel_dict.get("owner", ""), + properties=[CFProperty.from_dict(p) for p in channel_dict.get("properties", [])], + ) @implementer(interfaces.IProcessor) class CFProcessor(service.Service): - def __init__(self, name, conf): - _log.info("CF_INIT {name}".format(name=name)) - self.name, self.conf = name, conf - self.channel_dict = defaultdict(list) - self.iocs = dict() - self.client = None - self.currentTime = getCurrentTime - self.lock = DeferredLock() + """Processor for committing IOC and Record information to Channelfinder.""" + + def __init__(self, name: Optional[str], conf: ConfigAdapter): + """Initialize the CFProcessor with configuration. + + Args: + name: The name of the processor. + conf: The configuration for the processor. + """ + self.cf_config = CFConfig.loads(conf) + _log.info("CF_INIT %s", self.cf_config) + self.name = name # Override name from service.Service + self.channel_ioc_ids: Dict[str, List[str]] = defaultdict(list) + self.iocs: Dict[str, IocInfo] = dict() + self.client: Optional[ChannelFinderClient] = None + self.current_time: Callable[[Optional[str]], str] = get_current_time + self.lock: DeferredLock = DeferredLock() def startService(self): + """Start the CFProcessor service. + + Overridden method of service.Service.startService() + """ service.Service.startService(self) # Returning a Deferred is not supported by startService(), # so instead attempt to acquire the lock synchonously! @@ -61,39 +271,40 @@ def startService(self): raise RuntimeError("Failed to acquired CF Processor lock for service start") try: - self._startServiceWithLock() + self._start_service_with_lock() except: service.Service.stopService(self) raise finally: self.lock.release() - def _startServiceWithLock(self): + def _start_service_with_lock(self): + """Start the CFProcessor service with lock held. + + Using the default python cf-client. The url, username, and + password are provided by the channelfinder._conf module. + """ _log.info("CF_START") if self.client is None: # For setting up mock test client - """ - Using the default python cf-client. The url, username, and - password are provided by the channelfinder._conf module. - """ self.client = ChannelFinderClient() try: - cf_properties = [cf_property["name"] for cf_property in self.client.getAllProperties()] + cf_properties = {cf_property["name"] for cf_property in self.client.getAllProperties()} required_properties = { - "hostName", - "iocName", - "pvStatus", - "time", - "iocid", - "iocIP", - RECCEIVERID_KEY, + CFPropertyName.HOSTNAME.value, + CFPropertyName.IOC_NAME.value, + CFPropertyName.IOC_ID.value, + CFPropertyName.IOC_IP.value, + CFPropertyName.PV_STATUS.value, + CFPropertyName.TIME.value, + CFPropertyName.RECCEIVER_ID.value, } - if self.conf.getboolean("alias"): - required_properties.add("alias") - if self.conf.getboolean("recordType"): - required_properties.add("recordType") - env_vars_setting = self.conf.get("environment_vars") + if self.cf_config.alias_enabled: + required_properties.add(CFPropertyName.ALIAS.value) + if self.cf_config.record_type_enabled: + required_properties.add(CFPropertyName.RECORD_TYPE.value) + env_vars_setting = self.cf_config.environment_variables self.env_vars = {} if env_vars_setting != "" and env_vars_setting is not None: env_vars_dict = dict(item.strip().split(":") for item in env_vars_setting.split(",")) @@ -103,73 +314,93 @@ def _startServiceWithLock(self): # Standard property names for CA/PVA name server connections. These are # environment variables from reccaster so take advantage of env_vars # iocConnectionInfo enabled by default - if self.conf.getboolean("iocConnectionInfo", True): + if self.cf_config.ioc_connection_info: self.env_vars["RSRV_SERVER_PORT"] = "caPort" self.env_vars["PVAS_SERVER_PORT"] = "pvaPort" - required_properties.add("caPort") - required_properties.add("pvaPort") - infotags_whitelist = self.conf.get("infotags", list()) - if infotags_whitelist: - record_property_names_list = [s.strip(", ") for s in infotags_whitelist.split()] - else: - record_property_names_list = [] - if self.conf.getboolean("recordDesc"): - record_property_names_list.append("recordDesc") + required_properties.add(CFPropertyName.CA_PORT.value) + required_properties.add(CFPropertyName.PVA_PORT.value) + + record_property_names_list = {s.strip(", ") for s in self.cf_config.info_tags.split()} + if self.cf_config.record_description_enabled: + record_property_names_list.add(CFPropertyName.RECORD_DESC.value) # Are any required properties not already present on CF? - properties = required_properties - set(cf_properties) + properties = required_properties - cf_properties # Are any whitelisted properties not already present on CF? # If so, add them too. - properties.update(set(record_property_names_list) - set(cf_properties)) + properties.update(record_property_names_list - cf_properties) - owner = self.conf.get("username", "cfstore") - for cf_property in properties: - self.client.set(property={"name": cf_property, "owner": owner}) + owner = self.cf_config.username + for cf_property_name in properties: + self.client.set(property={"name": cf_property_name, "owner": owner}) - self.record_property_names_list = set(record_property_names_list) + self.record_property_names_list = record_property_names_list self.managed_properties = required_properties.union(record_property_names_list) - _log.debug("record_property_names_list = {}".format(self.record_property_names_list)) + _log.debug("record_property_names_list = %s", self.record_property_names_list) except ConnectionError: _log.exception("Cannot connect to Channelfinder service") raise else: - if self.conf.getboolean("cleanOnStart", True): + if self.cf_config.clean_on_start: self.clean_service() def stopService(self): + """Stop the CFProcessor service. + + Overridden method of service.Service.stopService() + """ _log.info("CF_STOP") service.Service.stopService(self) - return self.lock.run(self._stopServiceWithLock) + return self.lock.run(self._stop_service_with_lock) + + def _stop_service_with_lock(self): + """Stop the CFProcessor service with lock held. - def _stopServiceWithLock(self): - # Set channels to inactive and close connection to client - if self.conf.getboolean("cleanOnStop", True): + If clean_on_stop is enabled, mark all channels as inactive. + """ + if self.cf_config.clean_on_stop: self.clean_service() _log.info("CF_STOP with lock") # @defer.inlineCallbacks # Twisted v16 does not support cancellation! - def commit(self, transaction_record): - return self.lock.run(self._commitWithLock, transaction_record) + def commit(self, transaction_record: interfaces.ITransaction) -> defer.Deferred: + """Commit a transaction to Channelfinder. - def _commitWithLock(self, transaction): + Args: + transaction_record: The transaction to commit. + """ + return self.lock.run(self._commit_with_lock, transaction_record) + + def _commit_with_lock(self, transaction: interfaces.ITransaction) -> defer.Deferred: + """Commit a transaction to Channelfinder with lock held. + + Args: + transaction: The transaction to commit. + """ self.cancelled = False - t = deferToThread(self._commitWithThread, transaction) + t = deferToThread(self._commit_with_thread, transaction) - def cancelCommit(d): + def cancel_commit(d: defer.Deferred): + """Cancel the commit operation.""" self.cancelled = True d.callback(None) - d = defer.Deferred(cancelCommit) + d: defer.Deferred = defer.Deferred(cancel_commit) - def waitForThread(_ignored): + def wait_for_thread(_ignored): + """Wait for the commit thread to finish.""" if self.cancelled: return t - d.addCallback(waitForThread) + d.addCallback(wait_for_thread) - def chainError(err): + def chain_error(err): + """Handle errors from the commit thread. + + Note this is not foolproof as the thread may still be running. + """ if not err.check(defer.CancelledError): - _log.error("CF_COMMIT FAILURE: {s}".format(s=err)) + _log.error("CF_COMMIT FAILURE: %s", err) if self.cancelled: if not err.check(defer.CancelledError): raise defer.CancelledError() @@ -177,170 +408,190 @@ def chainError(err): else: d.callback(None) - def chainResult(_ignored): + def chain_result(result): + """Handle successful completion of the commit thread. + + If the commit was cancelled, raise CancelledError. + """ if self.cancelled: - raise defer.CancelledError() + raise defer.CancelledError(f"CF Processor is cancelled, due to {result}") else: d.callback(None) - t.addCallbacks(chainResult, chainError) + t.addCallbacks(chain_result, chain_error) return d - def _commitWithThread(self, transaction): - if not self.running: - raise defer.CancelledError( - "CF Processor is not running (transaction: {host}:{port})", - host=transaction.source_address.host, - port=transaction.source_address.port, - ) + def transaction_to_record_infos(self, ioc_info: IocInfo, transaction: CommitTransaction) -> Dict[str, RecordInfo]: + """Convert a CommitTransaction and IocInfo to a dictionary of RecordInfo objects. - _log.info("CF_COMMIT: {transaction}".format(transaction=transaction)) - _log.debug("CF_COMMIT: transaction: {s}".format(s=repr(transaction))) - """ - a dictionary with a list of records with their associated property info - pvInfo - {record_id: { "pvName":"recordName", - "infoProperties":{propName:value, ...}}} - """ - - host = transaction.source_address.host - port = transaction.source_address.port - iocName = transaction.client_infos.get("IOCNAME") or transaction.source_address.port - hostName = transaction.client_infos.get("HOSTNAME") or transaction.source_address.host - owner = ( - transaction.client_infos.get("ENGINEER") - or transaction.client_infos.get("CF_USERNAME") - or self.conf.get("username", "cfstore") - ) - time = self.currentTime(timezone=self.conf.get("timezone")) - - """The unique identifier for a particular IOC""" - iocid = host + ":" + str(port) - _log.debug("transaction: {s}".format(s=repr(transaction))) + Combines record additions, info tags, aliases, and environment variables. - recordInfo = {} + Args: + ioc_info: Information from the IOC + transaction: transaction from reccaster + """ + record_infos: Dict[str, RecordInfo] = {} for record_id, (record_name, record_type) in transaction.records_to_add.items(): - recordInfo[record_id] = {"pvName": record_name} - if self.conf.getboolean("recordType"): - recordInfo[record_id]["recordType"] = record_type + record_infos[record_id] = RecordInfo(pv_name=record_name, record_type=None, info_properties=[], aliases=[]) + if self.cf_config.record_type_enabled: + record_infos[record_id].record_type = record_type + for record_id, (record_infos_to_add) in transaction.record_infos_to_add.items(): # find intersection of these sets - if record_id not in recordInfo: - _log.warning( - "IOC: {iocid}: PV not found for recinfo with RID: {record_id}".format( - iocid=iocid, record_id=record_id - ) - ) + if record_id not in record_infos: + _log.warning("IOC: %s: PV not found for recinfo with RID: {record_id}", ioc_info, record_id) continue recinfo_wl = [p for p in self.record_property_names_list if p in record_infos_to_add.keys()] if recinfo_wl: - recordInfo[record_id]["infoProperties"] = list() for infotag in recinfo_wl: - recordInfo[record_id]["infoProperties"].append( - create_property(owner, infotag, record_infos_to_add[infotag]) + record_infos[record_id].info_properties.append( + CFProperty(infotag, ioc_info.owner, record_infos_to_add[infotag]) ) - for record_id, alias in transaction.aliases.items(): - if record_id not in recordInfo: - _log.warning( - "IOC: {iocid}: PV not found for alias with RID: {record_id}".format( - iocid=iocid, record_id=record_id - ) - ) + for record_id, record_aliases in transaction.aliases.items(): + if record_id not in record_infos: + _log.warning("IOC: %s: PV not found for alias with RID: %s", ioc_info, record_id) continue - recordInfo[record_id]["aliases"] = alias + record_infos[record_id].aliases = record_aliases - for record_id in recordInfo: + for record_id in record_infos: for epics_env_var_name, cf_prop_name in self.env_vars.items(): if transaction.client_infos.get(epics_env_var_name) is not None: - if "infoProperties" not in recordInfo[record_id]: - recordInfo[record_id]["infoProperties"] = list() - recordInfo[record_id]["infoProperties"].append( - create_property(owner, cf_prop_name, transaction.client_infos.get(epics_env_var_name)) + record_infos[record_id].info_properties.append( + CFProperty(cf_prop_name, ioc_info.owner, transaction.client_infos.get(epics_env_var_name)) ) else: _log.debug( "EPICS environment var %s listed in environment_vars setting list not found in this IOC: %s", epics_env_var_name, - iocName, + ioc_info, ) + return record_infos - records_to_delete = list(transaction.records_to_delete) - _log.debug("Delete records: {s}".format(s=records_to_delete)) - - recordInfoByName = {} - for record_id, (info) in recordInfo.items(): - if info["pvName"] in recordInfoByName: - _log.warning( - "Commit contains multiple records with PV name: {pv} ({iocid})".format( - pv=info["pvName"], iocid=iocid - ) - ) - continue - recordInfoByName[info["pvName"]] = info + @staticmethod + def record_info_by_name(record_infos: Dict[str, RecordInfo], ioc_info: IocInfo) -> Dict[str, RecordInfo]: + """Create a dictionary of RecordInfo objects keyed by pvName. + Args: + record_infos: Dictionary of RecordInfo objects keyed by record_id. + ioc_info: Information from the IOC. + """ + record_info_by_name = {} + for record_id, (info) in record_infos.items(): + if info.pv_name in record_info_by_name: + _log.warning("Commit contains multiple records with PV name: %s (%s)", info.pv_name, ioc_info) + continue + record_info_by_name[info.pv_name] = info + return record_info_by_name + + def update_ioc_infos( + self, + transaction: CommitTransaction, + ioc_info: IocInfo, + records_to_delete: List[str], + record_info_by_name: Dict[str, RecordInfo], + ) -> None: + """Update the internal IOC information based on the transaction. + + Makes changed to self.iocs and self.channel_ioc_ids and records_to_delete. + + Args: + transaction: The CommitTransaction being processed. + ioc_info: The IocInfo for the IOC in the transaction. + records_to_delete: List of record names to delete. + record_info_by_name: Dictionary of RecordInfo objects keyed by pvName. + """ + iocid = ioc_info.ioc_id if transaction.initial: - """Add IOC to source list """ - self.iocs[iocid] = { - "iocname": iocName, - "hostname": hostName, - "iocIP": host, - "owner": owner, - "time": time, - "channelcount": 0, - } + # Add IOC to source list + self.iocs[iocid] = ioc_info if not transaction.connected: - records_to_delete.extend(self.channel_dict.keys()) - for record_name in recordInfoByName.keys(): - self.channel_dict[record_name].append(iocid) - self.iocs[iocid]["channelcount"] += 1 - """In case, alias exists""" - if self.conf.getboolean("alias"): - if record_name in recordInfoByName and "aliases" in recordInfoByName[record_name]: - for alias in recordInfoByName[record_name]["aliases"]: - self.channel_dict[alias].append(iocid) # add iocname to pvName in dict - self.iocs[iocid]["channelcount"] += 1 + records_to_delete.extend(self.channel_ioc_ids.keys()) + for record_name in record_info_by_name.keys(): + self.channel_ioc_ids[record_name].append(iocid) + self.iocs[iocid].channelcount += 1 + # In case, alias exists + if self.cf_config.alias_enabled: + if record_name in record_info_by_name: + for record_aliases in record_info_by_name[record_name].aliases: + self.channel_ioc_ids[record_aliases].append(iocid) # add iocname to pvName in dict + self.iocs[iocid].channelcount += 1 for record_name in records_to_delete: - if iocid in self.channel_dict[record_name]: + if iocid in self.channel_ioc_ids[record_name]: self.remove_channel(record_name, iocid) - """In case, alias exists""" - if self.conf.getboolean("alias"): - if record_name in recordInfoByName and "aliases" in recordInfoByName[record_name]: - for alias in recordInfoByName[record_name]["aliases"]: - self.remove_channel(alias, iocid) - poll( - __updateCF__, - self, - recordInfoByName, - records_to_delete, - hostName, - iocName, - host, - iocid, - owner, - time, + # In case, alias exists + if self.cf_config.alias_enabled: + if record_name in record_info_by_name: + for record_aliases in record_info_by_name[record_name].aliases: + self.remove_channel(record_aliases, iocid) + + def _commit_with_thread(self, transaction: CommitTransaction): + """Commit the transaction to Channelfinder. + + Collects the ioc info from the transaction. + Collects the record infos from the transaction. + Collects the records to delete from the transaction. + Calculates the records by names. + Updates the local IOC information. + Polls Channelfinder with the required updates until it passes. + + Args: + transaction: The transaction to commit. + """ + if not self.running: + host = transaction.source_address.host + port = transaction.source_address.port + raise defer.CancelledError(f"CF Processor is not running (transaction: {host}:{port})") + + _log.info("CF_COMMIT: %s", transaction) + _log.debug("CF_COMMIT: transaction: %s", repr(transaction)) + + ioc_info = IocInfo( + host=transaction.source_address.host, + hostname=transaction.client_infos.get("HOSTNAME") or transaction.source_address.host, + ioc_name=transaction.client_infos.get("IOCNAME") or str(transaction.source_address.port), + ioc_IP=transaction.source_address.host, + owner=( + transaction.client_infos.get("ENGINEER") + or transaction.client_infos.get("CF_USERNAME") + or self.cf_config.username + ), + time=self.current_time(self.cf_config.timezone), + port=transaction.source_address.port, ) - dict_to_file(self.channel_dict, self.iocs, self.conf) - def remove_channel(self, recordName, iocid): - self.channel_dict[recordName].remove(iocid) - if iocid in self.iocs: - self.iocs[iocid]["channelcount"] -= 1 - if self.iocs[iocid]["channelcount"] == 0: - self.iocs.pop(iocid, None) - elif self.iocs[iocid]["channelcount"] < 0: - _log.error("Channel count negative: {s}", s=iocid) - if len(self.channel_dict[recordName]) <= 0: # case: channel has no more iocs - del self.channel_dict[recordName] - - def clean_service(self): - """ - Marks all channels as "Inactive" until the recsync server is back up + record_infos = self.transaction_to_record_infos(ioc_info, transaction) + + records_to_delete = list(transaction.records_to_delete) + _log.debug("Delete records: %s", records_to_delete) + + record_info_by_name = CFProcessor.record_info_by_name(record_infos, ioc_info) + self.update_ioc_infos(transaction, ioc_info, records_to_delete, record_info_by_name) + poll(_update_channelfinder, self, record_info_by_name, records_to_delete, ioc_info) + + def remove_channel(self, recordName: str, iocid: str) -> None: + """Remove channel from self.iocs and self.channel_ioc_ids. + + Args: + recordName: The name of the record to remove. + iocid: The IOC ID of the record to remove from. """ + self.channel_ioc_ids[recordName].remove(iocid) + if iocid in self.iocs: + self.iocs[iocid].channelcount -= 1 + if self.iocs[iocid].channelcount == 0: + self.iocs.pop(iocid) + elif self.iocs[iocid].channelcount < 0: + _log.error("Channel count negative: %s", iocid) + if len(self.channel_ioc_ids[recordName]) <= 0: # case: channel has no more iocs + del self.channel_ioc_ids[recordName] + + def clean_service(self) -> None: + """Marks all channels belonging to this recceiver (as found by the recceiver id) as 'Inactive'.""" sleep = 1 retry_limit = 5 - owner = self.conf.get("username", "cfstore") - recceiverid = self.conf.get(RECCEIVERID_KEY, RECCEIVERID_DEFAULT) + owner = self.cf_config.username + recceiverid = self.cf_config.recceiver_id while 1: try: _log.info("CF Clean Started") @@ -355,460 +606,695 @@ def clean_service(self): _log.info("CF Clean Completed") return except RequestException as e: - _log.error("Clean service failed: {s}".format(s=e)) + _log.error("Clean service failed: %s", e) retry_seconds = min(60, sleep) - _log.info("Clean service retry in {retry_seconds} seconds".format(retry_seconds=retry_seconds)) + _log.info("Clean service retry in %s seconds", retry_seconds) time.sleep(retry_seconds) sleep *= 1.5 if self.running == 0 and sleep >= retry_limit: - _log.info("Abandoning clean after {retry_limit} seconds".format(retry_limit=retry_limit)) + _log.info("Abandoning clean after %s seconds", retry_limit) return - def get_active_channels(self, recceiverid): - return self.client.findByArgs( - prepareFindArgs(self.conf, [("pvStatus", "Active"), (RECCEIVERID_KEY, recceiverid)]) - ) + def get_active_channels(self, recceiverid: str) -> List[CFChannel]: + """Gets all the channels which are active for the given recceiver id. + + Args: + recceiverid: The current recceiver id. + """ + return [ + CFChannel.from_dict(ch) + for ch in self.client.findByArgs( + prepare_find_args( + cf_config=self.cf_config, + args=[ + (CFPropertyName.PV_STATUS.value, PVStatus.ACTIVE.value), + (CFPropertyName.RECCEIVER_ID.value, recceiverid), + ], + ) + ) + ] + + def clean_channels(self, owner: str, channels: List[CFChannel]) -> None: + """Set the pvStatus property to 'Inactive' for the given channels. - def clean_channels(self, owner, channels): + Args: + owner: The owner of the channels. + channels: The channels to set to 'Inactive'. + """ new_channels = [] for cf_channel in channels or []: - new_channels.append(cf_channel["name"]) - _log.info("Total channels to update: {nChannels}".format(nChannels=len(new_channels))) - _log.debug( - 'Update "pvStatus" property to "Inactive" for {n_channels} channels'.format(n_channels=len(new_channels)) - ) + new_channels.append(cf_channel.name) + _log.info("Cleaning %s channels.", len(new_channels)) + _log.debug('Update "pvStatus" property to "Inactive" for %s channels', len(new_channels)) self.client.update( - property=create_inactive_property(owner), + property=CFProperty.inactive(owner).as_dict(), channelNames=new_channels, ) -def dict_to_file(dict, iocs, conf): - filename = conf.get("debug_file_loc", None) - if filename: - if os.path.isfile(filename): - os.remove(filename) - list = [] - for key in dict: - list.append([key, iocs[dict[key][-1]]["hostname"], iocs[dict[key][-1]]["iocname"]]) - - list.sort(key=itemgetter(0)) - - with open(filename, "w+") as f: - json.dump(list, f) - - -def create_channel(name: str, owner: str, properties: List[Dict[str, str]]): - return { - "name": name, - "owner": owner, - "properties": properties, - } - - -def create_property(owner: str, name: str, value: str) -> Dict[str, str]: - return { - "name": name, - "owner": owner, - "value": value, - } +def handle_channel_is_old( + channel_ioc_ids: Dict[str, List[str]], + cf_channel: CFChannel, + iocs: Dict[str, IocInfo], + ioc_info: IocInfo, + recceiverid: str, + managed_properties: Set[str], + cf_config: CFConfig, + channels: List[CFChannel], + record_info_by_name: Dict[str, RecordInfo], +) -> None: + """Handle the case when the channel exists in channelfinder but not in the recceiver. + + Modifies: + channels + + Args: + channel_ioc_ids: mapping of channels to ioc ids + cf_channel: The channel that is old + iocs: List of all known iocs + ioc_info: Current ioc + recceiverid: id of current recceiver + managed_properties: List of managed properties + cf_config: Configuration used for processor + channels: list of the current channel changes + record_info_by_name: Input information from the transaction + """ + last_ioc_id = channel_ioc_ids[cf_channel.name][-1] + cf_channel.owner = iocs[last_ioc_id].owner + cf_channel.properties = __merge_property_lists( + create_default_properties(ioc_info, recceiverid, channel_ioc_ids, iocs, cf_channel), + cf_channel, + managed_properties, + ) + channels.append(cf_channel) + _log.debug("Add existing channel %s to previous IOC %s", cf_channel, last_ioc_id) + # In case alias exist, also delete them + if cf_config.alias_enabled: + if cf_channel.name in record_info_by_name: + for alias_name in record_info_by_name[cf_channel.name].aliases: + # TODO Remove? This code couldn't have been working.... + alias_channel = CFChannel(alias_name, "", []) + if alias_name in channel_ioc_ids: + last_alias_ioc_id = channel_ioc_ids[alias_name][-1] + alias_channel.owner = iocs[last_alias_ioc_id].owner + alias_channel.properties = __merge_property_lists( + create_default_properties( + ioc_info, + recceiverid, + channel_ioc_ids, + iocs, + cf_channel, + ), + alias_channel, + managed_properties, + ) + channels.append(alias_channel) + _log.debug("Add existing alias %s to previous IOC: %s", alias_channel, last_alias_ioc_id) + + +def orphan_channel( + cf_channel: CFChannel, + ioc_info: IocInfo, + channels: List[CFChannel], + cf_config: CFConfig, + record_info_by_name: Dict[str, RecordInfo], +) -> None: + """Handle a channel that exists in channelfinder but not on this recceiver. + + Modifies: + channels + + Args: + cf_channel: The channel to orphan + ioc_info: Info of the current ioc + channels: The current list of channel changes + cf_config: Configuration of the proccessor + record_info_by_name: information from the transaction + """ + cf_channel.properties = __merge_property_lists( + [ + CFProperty.inactive(ioc_info.owner), + CFProperty.time(ioc_info.owner, ioc_info.time), + ], + cf_channel, + ) + channels.append(cf_channel) + _log.debug("Add orphaned channel %s with no IOC: %s", cf_channel, ioc_info) + # Also orphan any alias + if cf_config.alias_enabled: + if cf_channel.name in record_info_by_name: + for alias_name in record_info_by_name[cf_channel.name].aliases: + alias_channel = CFChannel(alias_name, "", []) + alias_channel.properties = __merge_property_lists( + [ + CFProperty.inactive(ioc_info.owner), + CFProperty.time(ioc_info.owner, ioc_info.time), + ], + alias_channel, + ) + channels.append(alias_channel) + _log.debug("Add orphaned alias %s with no IOC: %s", alias_channel, ioc_info) + + +def handle_channel_old_and_new( + cf_channel: CFChannel, + iocid: str, + ioc_info: IocInfo, + managed_properties: Set[str], + channels: List[CFChannel], + new_channels: Set[str], + cf_config: CFConfig, + record_info_by_name: Dict[str, RecordInfo], + old_channels: List[CFChannel], +) -> None: + """ + Channel exists in Channelfinder with same iocid. + Update the status to ensure it is marked active and update the time. + + Modifies: + channels + new_channels + + Args: + cf_channel: The channel to update + iocid: The IOC ID of the channel + ioc_info: Info of the current ioc + managed_properties: List of managed properties + channels: The current list of channel changes + new_channels: The list of new channels + cf_config: Configuration of the processor + record_info_by_name: information from the transaction + old_channels: The list of old channels + """ + _log.debug("Channel %s exists in Channelfinder with same iocid %s", cf_channel.name, iocid) + cf_channel.properties = __merge_property_lists( + [ + CFProperty.active(ioc_info.owner), + CFProperty.time(ioc_info.owner, ioc_info.time), + ], + cf_channel, + managed_properties, + ) + channels.append(cf_channel) + _log.debug("Add existing channel with same IOC: %s", cf_channel) + new_channels.remove(cf_channel.name) + + # In case, alias exist + if cf_config.alias_enabled: + if cf_channel.name in record_info_by_name: + for alias_name in record_info_by_name[cf_channel.name].aliases: + if alias_name in old_channels: + # alias exists in old list + alias_channel = CFChannel(alias_name, "", []) + alias_channel.properties = __merge_property_lists( + [ + CFProperty.active(ioc_info.owner), + CFProperty.time(ioc_info.owner, ioc_info.time), + ], + alias_channel, + managed_properties, + ) + channels.append(alias_channel) + new_channels.remove(alias_name) + else: + # alias exists but not part of old list + aprops = __merge_property_lists( + [ + CFProperty.active(ioc_info.owner), + CFProperty.time(ioc_info.owner, ioc_info.time), + CFProperty.alias( + ioc_info.owner, + cf_channel.name, + ), + ], + cf_channel, + managed_properties, + ) + channels.append( + CFChannel( + alias_name, + ioc_info.owner, + aprops, + ) + ) + new_channels.remove(alias_name) + _log.debug("Add existing alias with same IOC: %s", cf_channel) -def create_recordType_property(owner: str, recordType: str): - return create_property(owner, "recordType", recordType) +def get_existing_channels( + new_channels: Set[str], client: ChannelFinderClient, cf_config: CFConfig +) -> Dict[str, CFChannel]: + """Get the channels existing in channelfinder from the list of new channels. + Args: + new_channels: The list of new channels. + client: The client to contact channelfinder + cf_config: The configuration for the processor. + """ + existing_channels: Dict[str, CFChannel] = {} -def create_alias_property(owner: str, alias: str): - return create_property(owner, "alias", alias) + # The list of pv's is searched keeping in mind the limitations on the URL length + search_strings = [] + search_string = "" + for channel_name in new_channels: + if not search_string: + search_string = channel_name + elif len(search_string) + len(channel_name) < 600: + search_string = search_string + "|" + channel_name + else: + search_strings.append(search_string) + search_string = channel_name + if search_string: + search_strings.append(search_string) + + for each_search_string in search_strings: + _log.debug("Find existing channels by name: %s", each_search_string) + for found_channel in client.findByArgs( + prepare_find_args(cf_config=cf_config, args=[("~name", each_search_string)]) + ): + existing_channels[found_channel["name"]] = CFChannel.from_dict(found_channel) + return existing_channels + + +def handle_channels( + old_channels: List[CFChannel], + new_channels: Set[str], + records_to_delete: List[str], + channel_ioc_ids: Dict[str, List[str]], + iocs: Dict[str, IocInfo], + ioc_info: IocInfo, + recceiverid: str, + managed_properties: Set[str], + cf_config: CFConfig, + channels: List[CFChannel], + record_info_by_name: Dict[str, RecordInfo], + iocid: str, +) -> None: + """Handle channels already present in Channelfinder for this IOC. + + Loops through all the old_channels, + if it is on another ioc clean up reference to old ioc + if it is not on another ioc set as Inactive + if it is on current ioc update the properties + + Modifies: + channels: The list of channels. + iocs: The dictionary of IOCs. + channel_ioc_ids: The dictionary of channel names to IOC IDs. + new_channels: The list of new channels. + + Args: + old_channels: The list of old channels. + new_channels: The list of new channels. + channel_ioc_ids: The dictionary of channel names to IOC IDs. + recceiver_id: The recceiver ID. + iocs: The dictionary of IOCs. + records_to_delete: The list of records to delete. + ioc_info: The IOC information. + managed_properties: The properites managed by this recceiver. + channels: The list of channels. + alias_enabled: Whether aliases are enabled. + record_type_enabled: Whether record types are enabled. + record_info_by_name: The dictionary of record names to information. + iocid: The IOC ID. + cf_config: The configuration for the processor. + """ + for cf_channel in old_channels: + if ( + not new_channels or cf_channel.name in records_to_delete + ): # case: empty commit/del, remove all reference to ioc + _log.debug("Channel %s exists in Channelfinder not in new_channels", cf_channel) + if cf_channel.name in channel_ioc_ids: + handle_channel_is_old( + channel_ioc_ids, + cf_channel, + iocs, + ioc_info, + recceiverid, + managed_properties, + cf_config, + channels, + record_info_by_name, + ) + else: + orphan_channel(cf_channel, ioc_info, channels, cf_config, record_info_by_name) + else: + if cf_channel.name in new_channels: # case: channel in old and new + handle_channel_old_and_new( + cf_channel, + iocid, + ioc_info, + managed_properties, + channels, + new_channels, + cf_config, + record_info_by_name, + old_channels, + ) -def create_pvStatus_property(owner: str, pvStatus: str): - return create_property(owner, "pvStatus", pvStatus) +def update_existing_channel_diff_iocid( + existing_channels: Dict[str, CFChannel], + channel_name: str, + new_properties: List[CFProperty], + managed_properties: Set[str], + channels: List[CFChannel], + cf_config: CFConfig, + record_info_by_name: Dict[str, RecordInfo], + ioc_info: IocInfo, + iocid: str, +) -> None: + """Update existing channel with the changed properties. + + Modifies: + channels + + Args: + existing_channels: The dictionary of existing channels. + channel_name: The name of the channel. + new_properties: The new properties. + managed_properties: The managed properties. + channels: The list of channels. + cf_config: configuration of processor + record_info_by_name: The dictionary of record names to information. + ioc_info: The IOC information. + iocid: The IOC ID. + """ + existing_channel = existing_channels[channel_name] + existing_channel.properties = __merge_property_lists( + new_properties, + existing_channel, + managed_properties, + ) + channels.append(existing_channel) + _log.debug("Add existing channel with different IOC: %s", existing_channel) + # in case, alias exists, update their properties too + if cf_config.alias_enabled: + if channel_name in record_info_by_name: + alias_properties = [CFProperty.alias(ioc_info.owner, channel_name)] + for p in new_properties: + alias_properties.append(p) + for alias_name in record_info_by_name[channel_name].aliases: + if alias_name in existing_channels: + ach = existing_channels[alias_name] + ach.properties = __merge_property_lists( + alias_properties, + ach, + managed_properties, + ) + channels.append(ach) + else: + channels.append(CFChannel(alias_name, ioc_info.owner, alias_properties)) + _log.debug("Add existing alias %s of %s with different IOC from %s", alias_name, channel_name, iocid) + + +def create_new_channel( + channels: List[CFChannel], + channel_name: str, + ioc_info: IocInfo, + new_properties: List[CFProperty], + cf_config: CFConfig, + record_info_by_name: Dict[str, RecordInfo], +) -> None: + """Create a new channel. + + Modifies: + channels + + Args: + channels: The list of channels. + channel_name: The name of the channel. + ioc_info: The IOC information. + new_properties: The new properties. + cf_config: configuration of processor + record_info_by_name: The dictionary of record names to information. + """ + channels.append(CFChannel(channel_name, ioc_info.owner, new_properties)) + _log.debug("Add new channel: %s", channel_name) + if cf_config.alias_enabled: + if channel_name in record_info_by_name: + alias_properties = [CFProperty.alias(ioc_info.owner, channel_name)] + for p in new_properties: + alias_properties.append(p) + for alias in record_info_by_name[channel_name].aliases: + channels.append(CFChannel(alias, ioc_info.owner, alias_properties)) + _log.debug("Add new alias: %s from %s", alias, channel_name) -def create_active_property(owner: str): - return create_pvStatus_property(owner, "Active") +class IOCMissingInfoError(Exception): + """Raised when an IOC is missing required information.""" -def create_inactive_property(owner: str): - return create_pvStatus_property(owner, "Inactive") + def __init__(self, ioc_info: IocInfo): + super().__init__(f"Missing hostName {ioc_info.hostname} or iocName {ioc_info.ioc_name}") + self.ioc_info = ioc_info -def create_time_property(owner: str, time: str): - return create_property(owner, "time", time) +def _update_channelfinder( + processor: CFProcessor, record_info_by_name: Dict[str, RecordInfo], records_to_delete, ioc_info: IocInfo +) -> None: + """Update Channelfinder with the provided IOC and Record information. + Calculates the changes required to the channels list and pushes the update the channelfinder. -def __updateCF__( - processor, - recordInfoByName, - records_to_delete, - hostName, - iocName, - iocIP, - iocid, - owner, - iocTime, -): - _log.info("CF Update IOC: {iocid}".format(iocid=iocid)) - _log.debug( - "CF Update IOC: {iocid} recordInfoByName {recordInfoByName}".format( - iocid=iocid, recordInfoByName=recordInfoByName - ) - ) + Args: + processor: The processor. + record_info_by_name: The dictionary of record names to information. + records_to_delete: The list of records to delete. + ioc_info: The IOC information. + """ + _log.info("CF Update IOC: %s", ioc_info) + _log.debug("CF Update IOC: %s record_info_by_name %s", ioc_info, record_info_by_name) # Consider making this function a class methed then 'processor' simply becomes 'self' client = processor.client - channels_dict = processor.channel_dict + channel_ioc_ids = processor.channel_ioc_ids iocs = processor.iocs - conf = processor.conf - recceiverid = conf.get(RECCEIVERID_KEY, RECCEIVERID_DEFAULT) - new_channels = set(recordInfoByName.keys()) - - if iocid in iocs: - hostName = iocs[iocid]["hostname"] - iocName = iocs[iocid]["iocname"] - owner = iocs[iocid]["owner"] - iocTime = iocs[iocid]["time"] - iocIP = iocs[iocid]["iocIP"] - else: - _log.warning("IOC Env Info not found: {iocid}".format(iocid=iocid)) + cf_config = processor.cf_config + recceiverid = processor.cf_config.recceiver_id + new_channels = set(record_info_by_name.keys()) + iocid = ioc_info.ioc_id - if hostName is None or iocName is None: - raise Exception("missing hostName or iocName") + if iocid not in iocs: + _log.warning("IOC Env Info %s not found in ioc list: %s", ioc_info, iocs) - if processor.cancelled: - raise defer.CancelledError() + if ioc_info.hostname is None or ioc_info.ioc_name is None: + raise IOCMissingInfoError(ioc_info) - channels = [] - """A list of channels in channelfinder with the associated hostName and iocName""" - _log.debug("Find existing channels by IOCID: {iocid}".format(iocid=iocid)) - old_channels = client.findByArgs(prepareFindArgs(conf, [("iocid", iocid)])) if processor.cancelled: - raise defer.CancelledError() + raise defer.CancelledError(f"Processor cancelled in _update_channelfinder for {ioc_info}") + + channels: List[CFChannel] = [] + # A list of channels in channelfinder with the associated hostName and iocName + _log.debug("Find existing channels by IOCID: %s", ioc_info) + old_channels: List[CFChannel] = [ + CFChannel.from_dict(ch) + for ch in client.findByArgs(prepare_find_args(cf_config=cf_config, args=[("iocid", iocid)])) + ] if old_channels is not None: - for cf_channel in old_channels: - if ( - len(new_channels) == 0 or cf_channel["name"] in records_to_delete - ): # case: empty commit/del, remove all reference to ioc - _log.debug("Channel {s} exists in Channelfinder not in new_channels".format(s=cf_channel["name"])) - if cf_channel["name"] in channels_dict: - cf_channel["owner"] = iocs[channels_dict[cf_channel["name"]][-1]]["owner"] - cf_channel["properties"] = __merge_property_lists( - create_default_properties(owner, iocTime, recceiverid, channels_dict, iocs, cf_channel), - cf_channel, - processor.managed_properties, - ) - if conf.getboolean("recordType"): - cf_channel["properties"] = __merge_property_lists( - cf_channel["properties"].append( - create_recordType_property( - owner, iocs[channels_dict[cf_channel["name"]][-1]]["recordType"] - ) - ), - cf_channel, - processor.managed_properties, - ) - channels.append(cf_channel) - _log.debug("Add existing channel to previous IOC: {s}".format(s=channels[-1])) - """In case alias exist, also delete them""" - if conf.getboolean("alias"): - if cf_channel["name"] in recordInfoByName and "aliases" in recordInfoByName[cf_channel["name"]]: - for alias in recordInfoByName[cf_channel["name"]]["aliases"]: - if alias["name"] in channels_dict: - alias["owner"] = iocs[channels_dict[alias["name"]][-1]]["owner"] - alias["properties"] = __merge_property_lists( - create_default_properties( - owner, - iocTime, - recceiverid, - channels_dict, - iocs, - cf_channel, - ), - alias, - processor.managed_properties, - ) - if conf.getboolean("recordType"): - cf_channel["properties"] = __merge_property_lists( - cf_channel["properties"].append( - create_recordType_property( - owner, - iocs[channels_dict[alias["name"]][-1]]["recordType"], - ) - ), - cf_channel, - processor.managed_properties, - ) - channels.append(alias) - _log.debug("Add existing alias to previous IOC: {s}".format(s=channels[-1])) - - else: - """Orphan the channel : mark as inactive, keep the old hostName and iocName""" - cf_channel["properties"] = __merge_property_lists( - [ - create_inactive_property(owner), - create_time_property(owner, iocTime), - ], - cf_channel, - ) - channels.append(cf_channel) - _log.debug("Add orphaned channel with no IOC: {s}".format(s=channels[-1])) - """Also orphan any alias""" - if conf.getboolean("alias"): - if cf_channel["name"] in recordInfoByName and "aliases" in recordInfoByName[cf_channel["name"]]: - for alias in recordInfoByName[cf_channel["name"]]["aliases"]: - alias["properties"] = __merge_property_lists( - [ - create_inactive_property(owner), - create_time_property(owner, iocTime), - ], - alias, - ) - channels.append(alias) - _log.debug("Add orphaned alias with no IOC: {s}".format(s=channels[-1])) - else: - if cf_channel["name"] in new_channels: # case: channel in old and new - """ - Channel exists in Channelfinder with same hostname and iocname. - Update the status to ensure it is marked active and update the time. - """ - _log.debug( - "Channel {s} exists in Channelfinder with same hostname and iocname".format( - s=cf_channel["name"] - ) - ) - cf_channel["properties"] = __merge_property_lists( - [ - create_active_property(owner), - create_time_property(owner, iocTime), - ], - cf_channel, - processor.managed_properties, - ) - channels.append(cf_channel) - _log.debug("Add existing channel with same IOC: {s}".format(s=channels[-1])) - new_channels.remove(cf_channel["name"]) - - """In case, alias exist""" - if conf.getboolean("alias"): - if cf_channel["name"] in recordInfoByName and "aliases" in recordInfoByName[cf_channel["name"]]: - for alias in recordInfoByName[cf_channel["name"]]["aliases"]: - if alias in old_channels: - """alias exists in old list""" - alias["properties"] = __merge_property_lists( - [ - create_active_property(owner), - create_time_property(owner, iocTime), - ], - alias, - processor.managed_properties, - ) - channels.append(alias) - new_channels.remove(alias["name"]) - else: - """alias exists but not part of old list""" - aprops = __merge_property_lists( - [ - create_active_property(owner), - create_time_property(owner, iocTime), - create_alias_property( - owner, - cf_channel["name"], - ), - ], - cf_channel, - processor.managed_properties, - ) - channels.append( - create_channel( - alias["name"], - owner, - aprops, - ) - ) - new_channels.remove(alias["name"]) - _log.debug("Add existing alias with same IOC: {s}".format(s=channels[-1])) + handle_channels( + old_channels, + new_channels, + records_to_delete, + channel_ioc_ids, + iocs, + ioc_info, + recceiverid, + processor.managed_properties, + cf_config, + channels, + record_info_by_name, + iocid, + ) # now pvNames contains a list of pv's new on this host/ioc - """A dictionary representing the current channelfinder information associated with the pvNames""" - existingChannels = {} + existing_channels = get_existing_channels(new_channels, client, cf_config) - """ - The list of pv's is searched keeping in mind the limitations on the URL length - The search is split into groups to ensure that the size does not exceed 600 characters - """ - searchStrings = [] - searchString = "" - for channel_name in new_channels: - if not searchString: - searchString = channel_name - elif len(searchString) + len(channel_name) < 600: - searchString = searchString + "|" + channel_name - else: - searchStrings.append(searchString) - searchString = channel_name - if searchString: - searchStrings.append(searchString) - - for eachSearchString in searchStrings: - _log.debug("Find existing channels by name: {search}".format(search=eachSearchString)) - for cf_channel in client.findByArgs(prepareFindArgs(conf, [("~name", eachSearchString)])): - existingChannels[cf_channel["name"]] = cf_channel - if processor.cancelled: - raise defer.CancelledError() + if processor.cancelled: + raise defer.CancelledError(f"CF Processor is cancelled, after fetching existing channels for {ioc_info}") for channel_name in new_channels: - newProps = create_properties(owner, iocTime, recceiverid, hostName, iocName, iocIP, iocid) - if conf.getboolean("recordType"): - newProps.append(create_recordType_property(owner, recordInfoByName[channel_name]["recordType"])) - if channel_name in recordInfoByName and "infoProperties" in recordInfoByName[channel_name]: - newProps = newProps + recordInfoByName[channel_name]["infoProperties"] - - if channel_name in existingChannels: - _log.debug( - f"""update existing channel{channel_name}: exists but with a different hostName and/or iocName""" - ) - - existingChannel = existingChannels[channel_name] - existingChannel["properties"] = __merge_property_lists( - newProps, - existingChannel, + new_properties = create_ioc_properties( + ioc_info.owner, + ioc_info.time, + recceiverid, + ioc_info.hostname, + ioc_info.ioc_name, + ioc_info.ioc_IP, + ioc_info.ioc_id, + ) + if ( + cf_config.record_type_enabled + and channel_name in record_info_by_name + and record_info_by_name[channel_name].record_type + ): + new_properties.append(CFProperty.record_type(ioc_info.owner, record_info_by_name[channel_name].record_type)) + if channel_name in record_info_by_name: + new_properties = new_properties + record_info_by_name[channel_name].info_properties + + if channel_name in existing_channels: + _log.debug("update existing channel %s: exists but with a different iocid from %s", channel_name, iocid) + update_existing_channel_diff_iocid( + existing_channels, + channel_name, + new_properties, processor.managed_properties, + channels, + cf_config, + record_info_by_name, + ioc_info, + iocid, ) - channels.append(existingChannel) - _log.debug("Add existing channel with different IOC: {s}".format(s=channels[-1])) - """in case, alias exists, update their properties too""" - if conf.getboolean("alias"): - if channel_name in recordInfoByName and "aliases" in recordInfoByName[channel_name]: - alProps = [create_alias_property(owner, channel_name)] - for p in newProps: - alProps.append(p) - for alias in recordInfoByName[channel_name]["aliases"]: - if alias in existingChannels: - ach = existingChannels[alias] - ach["properties"] = __merge_property_lists( - alProps, - ach, - processor.managed_properties, - ) - channels.append(ach) - else: - channels.append(create_channel(alias, owner, alProps)) - _log.debug("Add existing alias with different IOC: {s}".format(s=channels[-1])) - else: - """New channel""" - channels.append({"name": channel_name, "owner": owner, "properties": newProps}) - _log.debug("Add new channel: {s}".format(s=channels[-1])) - if conf.getboolean("alias"): - if channel_name in recordInfoByName and "aliases" in recordInfoByName[channel_name]: - alProps = [create_alias_property(owner, channel_name)] - for p in newProps: - alProps.append(p) - for alias in recordInfoByName[channel_name]["aliases"]: - channels.append({"name": alias, "owner": owner, "properties": alProps}) - _log.debug("Add new alias: {s}".format(s=channels[-1])) - _log.info("Total channels to update: {nChannels} {iocName}".format(nChannels=len(channels), iocName=iocName)) + create_new_channel(channels, channel_name, ioc_info, new_properties, cf_config, record_info_by_name) + _log.info("Total channels to update: %s for ioc: %s", len(channels), ioc_info) + if len(channels) != 0: - cf_set_chunked(client, channels, conf.get("findSizeLimit", 10000)) + cf_set_chunked(client, channels, cf_config.cf_query_limit) else: if old_channels and len(old_channels) != 0: - cf_set_chunked(client, channels, conf.get("findSizeLimit", 10000)) + cf_set_chunked(client, channels, cf_config.cf_query_limit) if processor.cancelled: - raise defer.CancelledError() + raise defer.CancelledError(f"Processor cancelled in _update_channelfinder for {ioc_info}") + +def cf_set_chunked(client: ChannelFinderClient, channels: List[CFChannel], chunk_size=DEFAULT_QUERY_LIMIT) -> None: + """Submit a list of channels to channelfinder in a chunked way. -def cf_set_chunked(client, channels, chunk_size=10000): + Args: + client: The channelfinder client. + channels: The list of channels. + chunk_size: The chunk size. + """ for i in range(0, len(channels), chunk_size): - chunk = channels[i : i + chunk_size] + chunk = [ch.as_dict() for ch in channels[i : i + chunk_size]] client.set(channels=chunk) -def create_properties(owner, iocTime, recceiverid, hostName, iocName, iocIP, iocid): +def create_ioc_properties( + owner: str, iocTime: str, recceiverid: str, hostName: str, iocName: str, iocIP: str, iocid: str +) -> List[CFProperty]: + """Create the properties from an IOC. + + Args: + owner: The owner of the properties. + iocTime: The time of the properties. + recceiverid: The recceiver ID of the properties. + hostName: The host name of the properties. + iocName: The IOC name of the properties. + iocIP: The IOC IP of the properties. + iocid: The IOC ID of the properties. + """ return [ - create_property(owner, "hostName", hostName), - create_property(owner, "iocName", iocName), - create_property(owner, "iocid", iocid), - create_property(owner, "iocIP", iocIP), - create_active_property(owner), - create_time_property(owner, iocTime), - create_property(owner, RECCEIVERID_KEY, recceiverid), + CFProperty(CFPropertyName.HOSTNAME.value, owner, hostName), + CFProperty(CFPropertyName.IOC_NAME.value, owner, iocName), + CFProperty(CFPropertyName.IOC_ID.value, owner, iocid), + CFProperty(CFPropertyName.IOC_IP.value, owner, iocIP), + CFProperty.active(owner), + CFProperty.time(owner, iocTime), + CFProperty(CFPropertyName.RECCEIVER_ID.value, owner, recceiverid), ] -def create_default_properties(owner, iocTime, recceiverid, channels_dict, iocs, cf_channel): - return create_properties( - owner, - iocTime, +def create_default_properties( + ioc_info: IocInfo, recceiverid: str, channels_iocs: Dict[str, List[str]], iocs: Dict[str, IocInfo], cf_channel +) -> List[CFProperty]: + """Create the default properties for an IOC. + + Args: + ioc_info: The IOC information. + recceiverid: The recceiver ID of the properties. + channels_iocs: The dictionary of channel names to IOC IDs. + iocs: The dictionary of IOCs. + cf_channel: The Channelfinder channel. + """ + channel_name = cf_channel.name + last_ioc_info = iocs[channels_iocs[channel_name][-1]] + return create_ioc_properties( + ioc_info.owner, + ioc_info.time, recceiverid, - iocs[channels_dict[cf_channel["name"]][-1]]["hostname"], - iocs[channels_dict[cf_channel["name"]][-1]]["iocname"], - iocs[channels_dict[cf_channel["name"]][-1]]["iocIP"], - channels_dict[cf_channel["name"]][-1], + last_ioc_info.hostname, + last_ioc_info.ioc_name, + last_ioc_info.ioc_IP, + last_ioc_info.ioc_id, ) def __merge_property_lists( - newProperties: List[Dict[str, str]], channel: Dict[str, List[Dict[str, str]]], managed_properties: Set[str] = set() -) -> List[Dict[str, str]]: - """ - Merges two lists of properties ensuring that there are no 2 properties with + new_properties: List[CFProperty], channel: CFChannel, managed_properties: Set[str] = set() +) -> List[CFProperty]: + """Merges two lists of properties. + + Ensures that there are no 2 properties with the same name In case of overlap between the new and old property lists the - new property list wins out + new property list wins out. + + Args: + new_properties: The new properties. + channel: The channel. + managed_properties: The managed properties """ - newPropNames = [p["name"] for p in newProperties] - for oldProperty in channel["properties"]: - if oldProperty["name"] not in newPropNames and (oldProperty["name"] not in managed_properties): - newProperties = newProperties + [oldProperty] - return newProperties + new_property_names = [p.name for p in new_properties] + for old_property in channel.properties: + if old_property.name not in new_property_names and (old_property.name not in managed_properties): + new_properties = new_properties + [old_property] + return new_properties + +def get_current_time(timezone: Optional[str] = None) -> str: + """Get the current time. -def getCurrentTime(timezone=False): + Args: + timezone: The timezone. + """ if timezone: return str(datetime.datetime.now().astimezone()) return str(datetime.datetime.now()) -def prepareFindArgs(conf, args, size=0): - size_limit = int(conf.get("findSizeLimit", size)) +def prepare_find_args(cf_config: CFConfig, args, size=0) -> List[Tuple[str, str]]: + """Prepare the find arguments. + + Args: + cf_config: The configuration. + args: The arguments. + size: The size. + """ + size_limit = int(cf_config.cf_query_limit) if size_limit > 0: args.append(("~size", size_limit)) return args def poll( - update_method, - processor, - recordInfoByName, + update_method: Callable[[CFProcessor, Dict[str, RecordInfo], List[str], IocInfo], None], + processor: CFProcessor, + record_info_by_name: Dict[str, RecordInfo], records_to_delete, - hostName, - iocName, - iocIP, - iocid, - owner, - iocTime, -): - _log.info("Polling {iocName} begins...".format(iocName=iocName)) - sleep = 1 + ioc_info: IocInfo, +) -> bool: + """Poll channelfinder with updates until it passes. + + Args: + update_method: The update method. + processor: The processor. + record_info_by_name: The record information by name. + records_to_delete: The records to delete. + ioc_info: The IOC information. + """ + _log.info("Polling for %s begins...", ioc_info) + sleep = 1.0 success = False while not success: try: - update_method( - processor, - recordInfoByName, - records_to_delete, - hostName, - iocName, - iocIP, - iocid, - owner, - iocTime, - ) + update_method(processor, record_info_by_name, records_to_delete, ioc_info) success = True return success except RequestException as e: - _log.error("ChannelFinder update failed: {s}".format(s=e)) + _log.error("ChannelFinder update failed: %s", e) retry_seconds = min(60, sleep) - _log.info("ChannelFinder update retry in {retry_seconds} seconds".format(retry_seconds=retry_seconds)) + _log.info("ChannelFinder update retry in %s seconds", retry_seconds) time.sleep(retry_seconds) sleep *= 1.5 - _log.info("Polling {iocName} complete".format(iocName=iocName)) + _log.info("Polling %s complete", ioc_info) + return success diff --git a/server/recceiver/interfaces.py b/server/recceiver/interfaces.py index 6a13910..1ac6e8f 100644 --- a/server/recceiver/interfaces.py +++ b/server/recceiver/interfaces.py @@ -1,5 +1,8 @@ # -*- coding: utf-8 -*- +from dataclasses import dataclass +from typing import Dict, List, Set, Tuple + from zope.interface import Attribute, Interface from twisted.application import service @@ -21,6 +24,24 @@ class ITransaction(Interface): """) +@dataclass +class SourceAddress: + host: str + port: int + + +@dataclass +class CommitTransaction: + source_address: SourceAddress + client_infos: Dict[str, str] + records_to_add: Dict[str, Tuple[str, str]] + records_to_delete: Set[str] + record_infos_to_add: Dict[str, Dict[str, str]] + aliases: Dict[str, List[str]] + initial: bool + connected: bool + + class IProcessor(service.IService): def commit(transaction): """Consume and process the provided ITransaction. diff --git a/server/recceiver_full.conf b/server/recceiver_full.conf index 894bf6b..2737d78 100644 --- a/server/recceiver_full.conf +++ b/server/recceiver_full.conf @@ -62,7 +62,7 @@ idkey = 42 # cf-store application # A space-separated list of infotags to set as CF Properties -infotags = archive +infotags = archive, archiver # Feature to add CA/PVA port info for name server to channelfinder iocConnectionInfo = True