diff --git a/packages/control/io_device.py b/packages/control/io_device.py index c752fd3e87..c351002901 100644 --- a/packages/control/io_device.py +++ b/packages/control/io_device.py @@ -1,7 +1,7 @@ from dataclasses import dataclass, field -from typing import Dict, Optional, Union +from typing import Dict, Optional, Tuple, Union from control import data -from control.limiting_value import LimitingValue +from control.limiting_value import LimitingValue, LoadmanagementLimit from helpermodules.constants import NO_ERROR from modules.common.utils.component_parser import get_io_name_by_id from modules.io_actions.controllable_consumers.dimming.api_eebus import DimmingEebus @@ -67,7 +67,7 @@ def _check_fault_state_io_device(self, io_device: int) -> None: if data.data.io_states[f"io_states{io_device}"].data.get.fault_state == 2: raise ValueError(LimitingValue.CONTROLLABLE_CONSUMERS_ERROR.value.format(get_io_name_by_id(io_device))) - def dimming_get_import_power_left(self, device: Dict) -> Optional[float]: + def dimming_get_import_power_left(self, device: Dict) -> Tuple[Optional[float], LoadmanagementLimit]: for action in self.actions.values(): if isinstance(action, (DimmingIo, DimmingEebus)): for d in action.config.configuration.devices: @@ -75,7 +75,7 @@ def dimming_get_import_power_left(self, device: Dict) -> Optional[float]: self._check_fault_state_io_device(action.config.configuration.io_device) return action.dimming_get_import_power_left() else: - return None + return None, LoadmanagementLimit(None, None) def dimming_set_import_power_left(self, device: Dict, used_power: float) -> Optional[float]: for action in self.actions.values(): @@ -84,7 +84,7 @@ def dimming_set_import_power_left(self, device: Dict, used_power: float) -> Opti if d == device: return action.dimming_set_import_power_left(used_power) - def dimming_via_direct_control(self, device: Dict) -> Optional[float]: + def dimming_via_direct_control(self, device: Dict) -> Tuple[Optional[float], LoadmanagementLimit]: for action in self.actions.values(): if isinstance(action, DimmingDirectControl): for d in action.config.configuration.devices: @@ -92,9 +92,9 @@ def dimming_via_direct_control(self, device: Dict) -> Optional[float]: self._check_fault_state_io_device(action.config.configuration.io_device) return action.dimming_via_direct_control() else: - return None + return None, LoadmanagementLimit(None, None) - def ripple_control_receiver(self, device: Dict) -> float: + def ripple_control_receiver(self, device: Dict) -> Tuple[float, LoadmanagementLimit]: for action in self.actions.values(): if isinstance(action, RippleControlReceiver): for d in action.config.configuration.devices: @@ -102,13 +102,13 @@ def ripple_control_receiver(self, device: Dict) -> float: self._check_fault_state_io_device(action.config.configuration.io_device) return action.ripple_control_receiver() else: - return 1 + return 1, LoadmanagementLimit(None, None) - def stepwise_control(self, device_id: int) -> Optional[float]: + def stepwise_control(self, device_id: int) -> Tuple[Optional[float], LoadmanagementLimit]: for action in self.actions.values(): if isinstance(action, (StepwiseControlEebus, StepwiseControlIo)): if device_id in [component["id"] for component in action.config.configuration.devices]: self._check_fault_state_io_device(action.config.configuration.io_device) return action.control_stepwise() else: - return None + return None, LoadmanagementLimit(None, None) diff --git a/packages/control/limiting_value.py b/packages/control/limiting_value.py index 8c19b321d3..558e76e44a 100644 --- a/packages/control/limiting_value.py +++ b/packages/control/limiting_value.py @@ -11,8 +11,10 @@ class LimitingValue(Enum): DIMMING_VIA_DIRECT_CONTROL = ", da die Dimmung per Direkt-Steuerung die Ladeleistung auf 4,2 kW begrenzt." RIPPLE_CONTROL_RECEIVER = (", da der Ladepunkt durch den RSE-Kontakt auf {}% der konfigurierten Anschlussleistung " "reduziert wird.") + CONTROL_STEPWISE = "Leistung begrenzt auf {}%" CONTROLLABLE_CONSUMERS_ERROR = (", da aufgrund eines Fehlers im IO-Gerät {} die steuerbaren Verbraucher nicht " "gesteuert werden können. Bitte prüfe die Status-Seite.") + MISSING_CONFIFGURATION = ", da die Konfiguration für die Aktion unvollständig ist." @dataclass diff --git a/packages/control/loadmanagement.py b/packages/control/loadmanagement.py index a1567b32ff..911d149e9f 100644 --- a/packages/control/loadmanagement.py +++ b/packages/control/loadmanagement.py @@ -137,35 +137,31 @@ def _limit_by_current(self, def _limit_by_dimming_via_direct_control(self, missing_currents: List[float], cp: Chargepoint) -> Tuple[List[float], LoadmanagementLimit]: - if data.data.io_actions.dimming_via_direct_control({"type": "cp", "id": cp.num}): + value, limit = data.data.io_actions.dimming_via_direct_control({"type": "cp", "id": cp.num}) + if value is not None: phases = 3-missing_currents.count(0) current_per_phase = 4200 / 230 / phases available_currents = [current_per_phase - cp.data.set.target_current if c > 0 else 0 for c in missing_currents] log.debug(f"Dimmung per Direkt-Steuerung: {available_currents}A") - limit = LoadmanagementLimit(LimitingValue.DIMMING_VIA_DIRECT_CONTROL.value, - LimitingValue.DIMMING_VIA_DIRECT_CONTROL) - return available_currents, limit - else: - return missing_currents, LoadmanagementLimit(None, None) + return available_currents, limit def _limit_by_dimming(self, available_currents: List[float], cp: Chargepoint) -> Tuple[List[float], LoadmanagementLimit]: - dimming_power_left = data.data.io_actions.dimming_get_import_power_left({"type": "cp", "id": cp.num}) + dimming_power_left, limit = data.data.io_actions.dimming_get_import_power_left({"type": "cp", "id": cp.num}) if dimming_power_left: if sum(available_currents)*230 > dimming_power_left: phases = 3-available_currents.count(0) overload_per_phase = (sum(available_currents) - dimming_power_left/230)/phases available_currents = [c - overload_per_phase if c > 0 else 0 for c in available_currents] log.debug(f"Reduzierung der Ströme durch die Dimmung: {available_currents}A") - return available_currents, LoadmanagementLimit(LimitingValue.DIMMING.value, LimitingValue.DIMMING) - return available_currents, LoadmanagementLimit(None, None) + return available_currents, limit def _limit_by_ripple_control_receiver(self, available_currents: List[float], cp: Chargepoint) -> Tuple[List[float], LoadmanagementLimit]: - value = data.data.io_actions.ripple_control_receiver({"type": "cp", "id": cp.num}) + value, limit = data.data.io_actions.ripple_control_receiver({"type": "cp", "id": cp.num}) if value != 1: phases = 3-available_currents.count(0) if phases > 1: @@ -177,9 +173,4 @@ def _limit_by_ripple_control_receiver(self, available_currents = [min(max_current*value - cp.data.set.target_current, c) if c > 0 else 0 for c in available_currents] log.debug(f"Reduzierung durch RSE-Kontakt auf {value*100}%, maximal {max_current*value}A") - limit = LoadmanagementLimit( - LimitingValue.RIPPLE_CONTROL_RECEIVER.value.format(value*100), - LimitingValue.RIPPLE_CONTROL_RECEIVER) - return available_currents, limit - else: - return available_currents, LoadmanagementLimit(None, None) + return available_currents, limit diff --git a/packages/control/pv_all.py b/packages/control/pv_all.py index e938636a4c..02666c43c1 100644 --- a/packages/control/pv_all.py +++ b/packages/control/pv_all.py @@ -69,15 +69,11 @@ def calc_power_for_all_components(self) -> None: else: if fault_state < module.data.get.fault_state: fault_state = module.data.get.fault_state - limit_value = data.data.io_actions.stepwise_control(module.num) - if limit_value is not None and module.data.get.fault_state == 0: - msg = ( - f"Leistung begrenzt auf {int(limit_value * 100)}%" - if limit_value < 1 - else "Keine Leistungsbegrenzung aktiv." - ) - module.data.get.fault_str = msg - Pub().pub(f"openWB/set/pv/{module.num}/get/fault_str", msg) + limit = data.data.io_actions.stepwise_control(module.num)[1] + if module.data.get.fault_state == 0: + # Fehlermeldung nicht überschreiben + module.data.get.fault_str = limit.message + Pub().pub(f"openWB/set/pv/{module.num}/get/fault_str", limit.message) except Exception: log.exception(f"Fehler im allgemeinen PV-Modul für pv{module.num}") if fault_state == 0: diff --git a/packages/modules/io_actions/common.py b/packages/modules/io_actions/common.py new file mode 100644 index 0000000000..126b773d2f --- /dev/null +++ b/packages/modules/io_actions/common.py @@ -0,0 +1,5 @@ +from control import data + + +def check_fault_state_io_device(io_device: int) -> bool: + return data.data.io_states[f"io_states{io_device}"].data.get.fault_state == 2 diff --git a/packages/modules/io_actions/controllable_consumers/dimming/api_eebus.py b/packages/modules/io_actions/controllable_consumers/dimming/api_eebus.py index 1c1dbadc10..8a1bd51579 100644 --- a/packages/modules/io_actions/controllable_consumers/dimming/api_eebus.py +++ b/packages/modules/io_actions/controllable_consumers/dimming/api_eebus.py @@ -1,11 +1,15 @@ import logging +from typing import Optional, Tuple from control import data +from control.limiting_value import LimitingValue, LoadmanagementLimit from helpermodules import timecheck from helpermodules.logger import ModifyLoglevelContext from helpermodules.pub import Pub from helpermodules.timecheck import create_timestamp from dataclass_utils import asdict from modules.common.abstract_io import AbstractIoAction +from modules.common.utils.component_parser import get_io_name_by_id +from modules.io_actions.common import check_fault_state_io_device from modules.io_actions.controllable_consumers.dimming.config import DimmingSetup from modules.io_devices.eebus.config import AnalogInputMapping, DigitalInputMapping @@ -42,9 +46,12 @@ def setup(self) -> None: log.debug(f"Dimmen: {self.import_power_left}W inkl. Überschuss") with ModifyLoglevelContext(control_command_log, logging.DEBUG): - if self.dimming_active(): + if self.dimming_active() or check_fault_state_io_device(self.config.configuration.io_device): if self.timestamp is None: Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", create_timestamp()) + if check_fault_state_io_device(self.config.configuration.io_device): + control_command_log.info( + "Fehler des IO-Geräts: Dimmen aktiviert für Failsafe-Modus.") control_command_log.info(f"Dimmen aktiviert. Übermittelter LPC-Wert: {lpc_value/1000}kWh. " "Leistungswerte vor Ausführung des Steuerbefehls:") @@ -64,11 +71,16 @@ def setup(self) -> None: Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", None) control_command_log.info("Dimmen deaktiviert.") - def dimming_get_import_power_left(self) -> None: + def dimming_get_import_power_left(self) -> Tuple[Optional[float], LoadmanagementLimit]: + if check_fault_state_io_device(self.config.configuration.io_device): + return (self.import_power_left, LoadmanagementLimit( + LimitingValue.CONTROLLABLE_CONSUMERS_ERROR.value.format(get_io_name_by_id( + self.config.configuration.io_device)), + LimitingValue.CONTROLLABLE_CONSUMERS_ERROR)) if self.dimming_active(): - return self.import_power_left + return self.import_power_left, LoadmanagementLimit(LimitingValue.DIMMING.value, LimitingValue.DIMMING) else: - return None + return None, LoadmanagementLimit(None, None) def dimming_set_import_power_left(self, used_power: float) -> None: self.import_power_left -= used_power diff --git a/packages/modules/io_actions/controllable_consumers/dimming/api_io.py b/packages/modules/io_actions/controllable_consumers/dimming/api_io.py index e94b7029f1..81a77b5859 100644 --- a/packages/modules/io_actions/controllable_consumers/dimming/api_io.py +++ b/packages/modules/io_actions/controllable_consumers/dimming/api_io.py @@ -1,11 +1,15 @@ import logging +from typing import Optional, Tuple from control import data +from control.limiting_value import LimitingValue, LoadmanagementLimit from helpermodules.logger import ModifyLoglevelContext from helpermodules.pub import Pub from helpermodules.timecheck import create_timestamp from dataclass_utils import asdict from modules.common.abstract_io import AbstractIoAction +from modules.common.utils.component_parser import get_io_name_by_id +from modules.io_actions.common import check_fault_state_io_device from modules.io_actions.controllable_consumers.dimming.config import DimmingSetup log = logging.getLogger(__name__) @@ -49,10 +53,11 @@ def setup(self) -> None: log.debug(f"Dimmen: {self.import_power_left}W inkl. Überschuss") with ModifyLoglevelContext(control_command_log, logging.DEBUG): - if data.data.io_states[f"io_states{self.config.configuration.io_device}"].data.get.digital_input[ - self.dimming_input] == self.dimming_value: + if self.dimming_active() or check_fault_state_io_device(self.config.configuration.io_device): if self.timestamp is None: Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", create_timestamp()) + if check_fault_state_io_device(self.config.configuration.io_device): + control_command_log.info("Fehler des IO-Geräts: Dimmen aktiviert für Failsafe-Modus.") control_command_log.info("Dimmen aktiviert. Leistungswerte vor Ausführung des Steuerbefehls:") msg = (f"EVU-Zähler: " @@ -71,12 +76,17 @@ def setup(self) -> None: Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", None) control_command_log.info("Dimmen deaktiviert.") - def dimming_get_import_power_left(self) -> None: + def dimming_get_import_power_left(self) -> Tuple[Optional[float], LoadmanagementLimit]: + if check_fault_state_io_device(self.config.configuration.io_device): + return (self.import_power_left, LoadmanagementLimit( + LimitingValue.CONTROLLABLE_CONSUMERS_ERROR.value.format(get_io_name_by_id( + self.config.configuration.io_device)), + LimitingValue.CONTROLLABLE_CONSUMERS_ERROR)) if self.dimming_active(): - return self.import_power_left + return self.import_power_left, LoadmanagementLimit(LimitingValue.DIMMING.value, LimitingValue.DIMMING) elif data.data.io_states[f"io_states{self.config.configuration.io_device}"].data.get.digital_input[ self.no_dimming_input] == self.no_dimming_value: - return None + return None, LoadmanagementLimit(None, None) else: raise Exception("Pattern passt nicht zur Dimmung.") diff --git a/packages/modules/io_actions/controllable_consumers/dimming_direct_control/api.py b/packages/modules/io_actions/controllable_consumers/dimming_direct_control/api.py index eca5d7c60e..6672e9d2c8 100644 --- a/packages/modules/io_actions/controllable_consumers/dimming_direct_control/api.py +++ b/packages/modules/io_actions/controllable_consumers/dimming_direct_control/api.py @@ -1,10 +1,14 @@ import logging +from typing import Optional, Tuple from control import data +from control.limiting_value import LimitingValue, LoadmanagementLimit from helpermodules.logger import ModifyLoglevelContext from helpermodules.pub import Pub from helpermodules.timecheck import create_timestamp from modules.common.abstract_device import DeviceDescriptor from modules.common.abstract_io import AbstractIoAction +from modules.common.utils.component_parser import get_io_name_by_id +from modules.io_actions.common import check_fault_state_io_device from modules.io_actions.controllable_consumers.dimming_direct_control.config import DimmingDirectControlSetup control_command_log = logging.getLogger("steuve_control_command") @@ -29,13 +33,16 @@ def __init__(self, config: DimmingDirectControlSetup): def setup(self) -> None: with ModifyLoglevelContext(control_command_log, logging.DEBUG): if data.data.io_states[f"io_states{self.config.configuration.io_device}"].data.get.digital_input[ - self.dimming_input] == self.dimming_value: + self.dimming_input] == self.dimming_value or check_fault_state_io_device(self.config.configuration.io_device): device = self.config.configuration.devices[0] if device["type"] == "cp": cp = f"cp{device['id']}" if self.timestamp is None: Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", create_timestamp()) if device["type"] == "cp": + if check_fault_state_io_device(self.config.configuration.io_device): + control_command_log.info( + "Fehler des IO-Geräts: Direktsteuerung an Ladepunkt aktiviert für Failsafe-Modus.") control_command_log.info( f"Direktsteuerung an Ladepunkt " f"{data.data.cp_data[cp].data.config.name} aktiviert. " @@ -55,13 +62,19 @@ def setup(self) -> None: Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", None) control_command_log.info("Direktsteuerung deaktiviert.") - def dimming_via_direct_control(self) -> None: - if data.data.io_states[f"io_states{self.config.configuration.io_device}"].data.get.digital_input[ + def dimming_via_direct_control(self) -> Tuple[Optional[float], LoadmanagementLimit]: + if check_fault_state_io_device(self.config.configuration.io_device): + return (4200, LoadmanagementLimit( + LimitingValue.CONTROLLABLE_CONSUMERS_ERROR.value.format(get_io_name_by_id( + self.config.configuration.io_device)), + LimitingValue.CONTROLLABLE_CONSUMERS_ERROR)) + elif data.data.io_states[f"io_states{self.config.configuration.io_device}"].data.get.digital_input[ self.dimming_input] == self.dimming_value: - return 4200 + return (4200, LoadmanagementLimit(LimitingValue.DIMMING_VIA_DIRECT_CONTROL.value, + LimitingValue.DIMMING_VIA_DIRECT_CONTROL)) elif data.data.io_states[f"io_states{self.config.configuration.io_device}"].data.get.digital_input[ self.no_dimming_input] == self.no_dimming_value: - return None + return None, LoadmanagementLimit(None, None) else: raise Exception("Pattern passt nicht zur Dimmung per Direktsteuerung.") diff --git a/packages/modules/io_actions/controllable_consumers/ripple_control_receiver/api.py b/packages/modules/io_actions/controllable_consumers/ripple_control_receiver/api.py index 6d92b3b40b..157e805882 100644 --- a/packages/modules/io_actions/controllable_consumers/ripple_control_receiver/api.py +++ b/packages/modules/io_actions/controllable_consumers/ripple_control_receiver/api.py @@ -1,10 +1,14 @@ import logging +from typing import Tuple from control import data +from control.limiting_value import LimitingValue, LoadmanagementLimit from helpermodules.logger import ModifyLoglevelContext from helpermodules.pub import Pub from helpermodules.timecheck import create_timestamp from modules.common.abstract_device import DeviceDescriptor from modules.common.abstract_io import AbstractIoAction +from modules.common.utils.component_parser import get_io_name_by_id +from modules.io_actions.common import check_fault_state_io_device from modules.io_actions.controllable_consumers.ripple_control_receiver.config import RippleControlReceiverSetup control_command_log = logging.getLogger("steuve_control_command") @@ -16,40 +20,53 @@ def __init__(self, config: RippleControlReceiverSetup): super().__init__() def setup(self) -> None: - with ModifyLoglevelContext(control_command_log, logging.DEBUG): - for pattern in self.config.configuration.input_pattern: - for digital_input, value in pattern["matrix"].items(): - if data.data.io_states[f"io_states{self.config.configuration.io_device}"].data.get.digital_input[ - digital_input] != value: - break - else: - # Alle digitalen Eingänge entsprechen dem Pattern - if pattern["value"] != 1: - if self.timestamp is None: - Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", create_timestamp()) - control_command_log.info( - f"RSE-Sperre mit Wert {pattern['value']*100}" - "% aktiviert. Leistungswerte vor Ausführung des Steuerbefehls:") + def log_active_ripple_control_receiver(): + if self.timestamp is None: + Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", create_timestamp()) + if check_fault_state_io_device(self.config.configuration.io_device): + control_command_log.info( + "Fehler des IO-Geräts: Dimmen aktiviert für Failsafe-Modus.") + control_command_log.info( + f"RSE-Sperre mit Wert {pattern['value']*100}" + "% aktiviert. Leistungswerte vor Ausführung des Steuerbefehls:") + + evu_counter = data.data.counter_data[data.data.counter_all_data.get_evu_counter_str()] + msg = f"EVU-Zähler: {evu_counter.data.get.powers}W" + for device in self.config.configuration.devices: + if device["type"] == "cp": + cp = f"cp{device['id']}" + msg += (f", Ladepunkt {data.data.cp_data[cp].data.config.name}: " + f"{data.data.cp_data[cp].data.get.powers}W") + if device["type"] == "io": + io = f"io{device['id']}" + msg += (f", IO-Gerät {data.data.io_data[io].data.config.name}: " + "Leistung unbekannt") + control_command_log.info(msg) - evu_counter = data.data.counter_data[data.data.counter_all_data.get_evu_counter_str()] - msg = f"EVU-Zähler: {evu_counter.data.get.powers}W" - for device in self.config.configuration.devices: - if device["type"] == "cp": - cp = f"cp{device['id']}" - msg += (f", Ladepunkt {data.data.cp_data[cp].data.config.name}: " - f"{data.data.cp_data[cp].data.get.powers}W") - if device["type"] == "io": - io = f"io{device['id']}" - msg += (f", IO-Gerät {data.data.io_data[io].data.config.name}: " - "Leistung unbekannt") - control_command_log.info(msg) - break + with ModifyLoglevelContext(control_command_log, logging.DEBUG): + if check_fault_state_io_device(self.config.configuration.io_device): + log_active_ripple_control_receiver() + for pattern in self.config.configuration.input_pattern: + for digital_input, value in pattern["matrix"].items(): + if data.data.io_states[f"io_states{self.config.configuration.io_device}"].data.get.digital_input[ + digital_input] != value: + break + else: + # Alle digitalen Eingänge entsprechen dem Pattern + if pattern["value"] != 1: + log_active_ripple_control_receiver() + break else: if self.timestamp: Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", None) control_command_log.info("RSE-Sperre deaktiviert.") - def ripple_control_receiver(self) -> float: + def ripple_control_receiver(self) -> Tuple[float, LoadmanagementLimit]: + if check_fault_state_io_device(self.config.configuration.io_device): + return (0, LoadmanagementLimit( + LimitingValue.CONTROLLABLE_CONSUMERS_ERROR.value.format(get_io_name_by_id( + self.config.configuration.io_device)), + LimitingValue.CONTROLLABLE_CONSUMERS_ERROR)) for pattern in self.config.configuration.input_pattern: for digital_input, value in pattern["matrix"].items(): if data.data.io_states[f"io_states{self.config.configuration.io_device}" @@ -57,10 +74,21 @@ def ripple_control_receiver(self) -> float: break else: # Alle digitalen Eingänge entsprechen dem Pattern - return pattern["value"] + if pattern["value"] is None: + return 0, LoadmanagementLimit(LimitingValue.MISSING_CONFIFGURATION, + LimitingValue.MISSING_CONFIFGURATION) + if pattern["value"] != 1: + limit = LoadmanagementLimit( + LimitingValue.RIPPLE_CONTROL_RECEIVER.value.format(pattern["value"]*100), + LimitingValue.RIPPLE_CONTROL_RECEIVER) + else: + limit = LoadmanagementLimit(None, None) + return pattern["value"], limit else: # Zustand entspricht keinem Pattern - return 0 + return 0, LoadmanagementLimit( + LimitingValue.RIPPLE_CONTROL_RECEIVER.value.format(0), + LimitingValue.RIPPLE_CONTROL_RECEIVER) def create_action(config: RippleControlReceiverSetup, parent_device_type: str): diff --git a/packages/modules/io_actions/generator_systems/stepwise_control/api_eebus.py b/packages/modules/io_actions/generator_systems/stepwise_control/api_eebus.py index df60a75eef..e4971c7f6b 100644 --- a/packages/modules/io_actions/generator_systems/stepwise_control/api_eebus.py +++ b/packages/modules/io_actions/generator_systems/stepwise_control/api_eebus.py @@ -1,12 +1,14 @@ import logging from typing import Optional from control import data +from control.limiting_value import LimitingValue, LoadmanagementLimit from helpermodules.logger import ModifyLoglevelContext from helpermodules.pub import Pub from helpermodules.timecheck import create_timestamp from modules.common.abstract_device import DeviceDescriptor from modules.common.abstract_io import AbstractIoAction -from modules.common.utils.component_parser import get_component_name_by_id +from modules.common.utils.component_parser import get_component_name_by_id, get_io_name_by_id +from modules.io_actions.common import check_fault_state_io_device from modules.io_actions.generator_systems.stepwise_control.config import StepwiseControlSetup from modules.io_devices.eebus.config import AnalogInputMapping, DigitalInputMapping @@ -86,17 +88,33 @@ def setup(self) -> None: control_command_log.info("EZA-Begrenzung aufgehoben.") def control_stepwise(self) -> Optional[float]: + if check_fault_state_io_device(self.config.configuration.io_device): + return (0, LoadmanagementLimit( + LimitingValue.CONTROLLABLE_CONSUMERS_ERROR.value.format(get_io_name_by_id( + self.config.configuration.io_device)), + LimitingValue.CONTROLLABLE_CONSUMERS_ERROR)) for pattern in self.config.configuration.input_pattern: for digital_input, value in pattern["matrix"].items(): if data.data.io_states[f"io_states{self.config.configuration.io_device}" ].data.get.digital_input[digital_input] != value: break else: + if pattern["value"] is None: + return 0, LoadmanagementLimit(LimitingValue.MISSING_CONFIFGURATION, + LimitingValue.MISSING_CONFIFGURATION) # Alle digitalen Eingänge entsprechen dem Pattern - return pattern['value'] + elif pattern["value"] != 1: + limit = LoadmanagementLimit( + LimitingValue.CONTROL_STEPWISE.value.format(pattern["value"]*100), + LimitingValue.CONTROL_STEPWISE) + else: + limit = LoadmanagementLimit("Keine Leistungsbegrenzung aktiv.", "Keine Leistungsbegrenzung aktiv.") + return pattern["value"], limit else: - # Zustand entspricht keinem Pattern, Leistungsbegrenzung aufheben - return 1 + # Zustand entspricht keinem Pattern + return 0, LoadmanagementLimit( + LimitingValue.CONTROL_STEPWISE.value.format(0), + LimitingValue.CONTROL_STEPWISE) def create_action(config: StepwiseControlSetup, parent_device_type: str): diff --git a/packages/modules/io_actions/generator_systems/stepwise_control/api_io.py b/packages/modules/io_actions/generator_systems/stepwise_control/api_io.py index f1108eeb5d..10472bb170 100644 --- a/packages/modules/io_actions/generator_systems/stepwise_control/api_io.py +++ b/packages/modules/io_actions/generator_systems/stepwise_control/api_io.py @@ -1,12 +1,14 @@ import logging -from typing import Optional +from typing import Optional, Tuple from control import data +from control.limiting_value import LimitingValue, LoadmanagementLimit from helpermodules.logger import ModifyLoglevelContext from helpermodules.pub import Pub from helpermodules.timecheck import create_timestamp from modules.common.abstract_device import DeviceDescriptor from modules.common.abstract_io import AbstractIoAction -from modules.common.utils.component_parser import get_component_name_by_id +from modules.common.utils.component_parser import get_component_name_by_id, get_io_name_by_id +from modules.io_actions.common import check_fault_state_io_device from modules.io_actions.generator_systems.stepwise_control.config import StepwiseControlSetup control_command_log = logging.getLogger("steuve_control_command") @@ -79,18 +81,34 @@ def setup(self) -> None: Pub().pub(f"openWB/set/io/action/{self.config.id}/timestamp", None) control_command_log.info("EZA-Begrenzung aufgehoben.") - def control_stepwise(self) -> Optional[float]: + def control_stepwise(self) -> Tuple[Optional[float], LoadmanagementLimit]: + if check_fault_state_io_device(self.config.configuration.io_device): + return (0, LoadmanagementLimit( + LimitingValue.CONTROLLABLE_CONSUMERS_ERROR.value.format(get_io_name_by_id( + self.config.configuration.io_device)), + LimitingValue.CONTROLLABLE_CONSUMERS_ERROR)) for pattern in self.config.configuration.input_pattern: for digital_input, value in pattern["matrix"].items(): if data.data.io_states[f"io_states{self.config.configuration.io_device}" ].data.get.digital_input[digital_input] != value: break else: + if pattern["value"] is None: + return 0, LoadmanagementLimit(LimitingValue.MISSING_CONFIFGURATION, + LimitingValue.MISSING_CONFIFGURATION) # Alle digitalen Eingänge entsprechen dem Pattern - return pattern['value'] + elif pattern["value"] != 1: + limit = LoadmanagementLimit( + LimitingValue.CONTROL_STEPWISE.value.format(pattern["value"]*100), + LimitingValue.CONTROL_STEPWISE) + else: + limit = LoadmanagementLimit("Keine Leistungsbegrenzung aktiv.", "Keine Leistungsbegrenzung aktiv.") + return pattern["value"], limit else: - # Zustand entspricht keinem Pattern, Leistungsbegrenzung aufheben - return 1 + # Zustand entspricht keinem Pattern + return 0, LoadmanagementLimit( + LimitingValue.CONTROL_STEPWISE.value.format(0), + LimitingValue.CONTROL_STEPWISE) def create_action(config: StepwiseControlSetup, parent_device_type: str):