diff --git a/.github/workflows/labeler.yml b/.github/workflows/labeler.yml index 246a0460..3e227106 100644 --- a/.github/workflows/labeler.yml +++ b/.github/workflows/labeler.yml @@ -12,8 +12,8 @@ jobs: steps: - name: Check out the repository uses: actions/checkout@v4.2.2 - - name: Run Labeler uses: crazy-max/ghaction-github-labeler@v5.1.0 with: + configuration-path: .github/labels.yml skip-delete: true diff --git a/custom_components/hilo/__init__.py b/custom_components/hilo/__init__.py index 607eb3b7..bdbf2541 100755 --- a/custom_components/hilo/__init__.py +++ b/custom_components/hilo/__init__.py @@ -232,16 +232,19 @@ def __init__(self, hass: HomeAssistant, entry: ConfigEntry, api: API) -> None: self.find_meter(self._hass) self.entry = entry self.devices: Devices = Devices(api) - self._websocket_reconnect_task: asyncio.Task | None = None - self._update_task: asyncio.Task | None = None - self.invocations = {0: self.subscribe_to_location} + self.challenge_id = 0 + self._websocket_reconnect_tasks: list[asyncio.Task | None] = [None, None] + self._update_task: list[asyncio.Task | None] = [None, None] + self.invocations = { + 0: self.subscribe_to_location, + 1: self.subscribe_to_challenge, + 2: self.subscribe_to_challengelist, + } self.hq_plan_name = entry.options.get(CONF_HQ_PLAN_NAME, DEFAULT_HQ_PLAN_NAME) self.appreciation = entry.options.get( CONF_APPRECIATION_PHASE, DEFAULT_APPRECIATION_PHASE ) - self.pre_cold = entry.options.get( - CONF_PRE_COLD_PHASE, DEFAULT_PRE_COLD_PHASE # this is new - ) + self.pre_cold = entry.options.get(CONF_PRE_COLD_PHASE, DEFAULT_PRE_COLD_PHASE) self.challenge_lock = entry.options.get( CONF_CHALLENGE_LOCK, DEFAULT_CHALLENGE_LOCK ) @@ -260,25 +263,93 @@ def __init__(self, hass: HomeAssistant, entry: ConfigEntry, api: API) -> None: self._events: dict = {} if self.track_unknown_sources: self._api._get_device_callbacks = [self._get_unknown_source_tracker] + self._websocket_listeners = [] def validate_heartbeat(self, event: WebsocketEvent) -> None: heartbeat_time = from_utc_timestamp(event.arguments[0]) # type: ignore if self._api.log_traces: LOG.debug(f"Heartbeat: {time_diff(heartbeat_time, event.timestamp)}") - @callback - async def on_websocket_event(self, event: WebsocketEvent) -> None: - """Define a callback for receiving a websocket event.""" - async_dispatcher_send(self._hass, DISPATCHER_TOPIC_WEBSOCKET_EVENT, event) - if event.event_type == "COMPLETE": - cb = self.invocations.get(event.invocation) - if cb: - async_call_later(self._hass, 3, cb(event.invocation)) - elif event.target == "Heartbeat": - self.validate_heartbeat(event) - elif event.target == "DevicesValuesReceived": - # When receiving attribute values for unknown devices, assume - # we have refresh the device list. + def register_websocket_listener(self, listener): + """Register a listener for websocket events.""" + LOG.debug(f"Registering websocket listener: {listener.__class__.__name__}") + self._websocket_listeners.append(listener) + + async def _handle_websocket_message(self, event): + """Process websocket messages and notify listeners.""" + + LOG.debug(f"Received websocket message type: {event}") + target = event.target + LOG.debug(f"handle_websocket_message_target {target}") + msg_data = event + LOG.debug(f"handle_websocket_message_ msg_data {msg_data}") + + if target == "ChallengeListInitialValuesReceived": + msg_type = "challenge_list_initial" + elif target == "ChallengeAdded": + msg_type = "challenge_added" + elif target == "ChallengeDetailsUpdated": + msg_type = "challenge_details_update" + elif target == "ChallengeConsumptionUpdatedValuesReceived": + msg_type = "challenge_details_update" + elif target == "ChallengeDetailsUpdatedValuesReceived": + msg_type = "challenge_details_update" + elif target == "ChallengeDetailsInitialValuesReceived": + msg_type = "challenge_details_update" + elif target == "ChallengeListUpdatedValuesReceived": + msg_type = "challenge_details_update" + + # ic-dev21 Notify listeners + for listener in self._websocket_listeners: + handler_name = f"handle_{msg_type}" + if hasattr(listener, handler_name): + handler = getattr(listener, handler_name) + try: + # ic-dev21 Extract the arguments from the WebsocketEvent object + if isinstance(msg_data, WebsocketEvent): + arguments = msg_data.arguments + if arguments: # ic-dev21 check if there are arguments + await handler(arguments[0]) + else: + LOG.warning(f"Received empty arguments for {msg_type}") + else: + await handler(msg_data) + except Exception as e: + LOG.error(f"Error in websocket handler {handler_name}: {e}") + + async def _handle_challenge_events(self, event: WebsocketEvent) -> None: + """Handle all challenge-related websocket events.""" + if event.target == "ChallengeDetailsInitialValuesRecei0ved": + challenge = event.arguments[0] + LOG.debug(f"ChallengeDetailsInitialValuesReceived, challenge = {challenge}") + self.challenge_id = challenge.get("id") + + elif event.target == "ChallengeDetailsUpdatedValuesReceived": + LOG.debug("ChallengeDetailsUpdatedValuesReceived") + + elif event.target == "ChallengeListUpdatedValuesReceived": + LOG.debug("ChallengeListUpdatedValuesReceived") + self.challenge_phase = event.arguments[0][0]["currentPhase"] + + elif event.target == "ChallengeAdded": + LOG.debug("ChallengeAdded") + challenge = event.arguments[0] + self.challenge_id = challenge.get("id") + await self.subscribe_to_challenge(1, self.challenge_id) + + elif event.target == "ChallengeListInitialValuesReceived": + LOG.debug("ChallengeListInitialValuesReceived") + challenges = event.arguments[0] + + for challenge in challenges: + challenge_id = challenge.get("id") + self.challenge_phase = challenge.get("currentPhase") + self.challenge_id = challenge.get("id") + await self.subscribe_to_challenge(1, challenge_id) + + async def _handle_device_events(self, event: WebsocketEvent) -> None: + """Handle all device-related websocket events.""" + if event.target == "DevicesValuesReceived": new_devices = any( self.devices.find_device(item["deviceId"]) is None for item in event.arguments[0] @@ -290,53 +361,61 @@ async def on_websocket_event(self, event: WebsocketEvent) -> None: await self.devices.update() updated_devices = self.devices.parse_values_received(event.arguments[0]) - # NOTE(dvd): If we don't do this, we need to wait until the coordinator - # runs (scan_interval) to have updated data in the dashboard. for device in updated_devices: async_dispatcher_send( self._hass, SIGNAL_UPDATE_ENTITY.format(device.id) ) + elif event.target == "DeviceListInitialValuesReceived": - # This websocket event only happens after calling SubscribeToLocation. - # This triggers an update without throwing an exception - new_devices = await self.devices.update_devicelist_from_signalr( - event.arguments[0] - ) + await self.devices.update_devicelist_from_signalr(event.arguments[0]) + elif event.target == "DeviceListUpdatedValuesReceived": - # This message only contains display information, such as the Device's name (as set in the app), it's groupid, icon, etc. - # Updating the device name causes issues in the integration, it detects it as a new device and creates a new entity. - # Ignore this call, for now... (update_devicelist_from_signalr does work, but causes the issue above) - # await self.devices.update_devicelist_from_signalr(event.arguments[0]) LOG.debug( "Received 'DeviceListUpdatedValuesReceived' message, not implemented yet." ) + elif event.target == "DevicesListChanged": - # This message only contains the location_id and is used to inform us that devices have been removed from the location. - # Device deletion is not implemented yet, so we just log the message for now. LOG.debug("Received 'DevicesListChanged' message, not implemented yet.") + elif event.target == "DeviceAdded": - # Same structure as DeviceList* but only one device instead of a list - devices = [] - devices.append(event.arguments[0]) - new_devices = await self.devices.update_devicelist_from_signalr(devices) + devices = [event.arguments[0]] + await self.devices.update_devicelist_from_signalr(devices) + elif event.target == "DeviceDeleted": - # Device deletion is not implemented yet, so we just log the message for now. LOG.debug("Received 'DeviceDeleted' message, not implemented yet.") + elif event.target == "GatewayValuesReceived": - # Gateway deviceId hardcoded to 1 as it is not returned by Gateways/Info. - # First time we encounter a GatewayValueReceived event, update device with proper deviceid. gateway = self.devices.find_device(1) if gateway: gateway.id = event.arguments[0][0]["deviceId"] LOG.debug(f"Updated Gateway's deviceId from default 1 to {gateway.id}") updated_devices = self.devices.parse_values_received(event.arguments[0]) - # NOTE(dvd): If we don't do this, we need to wait until the coordinator - # runs (scan_interval) to have updated data in the dashboard. for device in updated_devices: async_dispatcher_send( self._hass, SIGNAL_UPDATE_ENTITY.format(device.id) ) + + @callback + async def on_websocket_event(self, event: WebsocketEvent) -> None: + """Define a callback for receiving a websocket event.""" + async_dispatcher_send(self._hass, DISPATCHER_TOPIC_WEBSOCKET_EVENT, event) + + if event.event_type == "COMPLETE": + cb = self.invocations.get(event.invocation) + if cb: + async_call_later(self._hass, 3, cb(event.invocation)) + + elif event.target == "Heartbeat": + self.validate_heartbeat(event) + + elif "Challenge" in event.target: + await self._handle_challenge_events(event) + await self._handle_websocket_message(event) + + elif "Device" in event.target or event.target == "GatewayValuesReceived": + await self._handle_device_events(event) + else: LOG.warning(f"Unhandled websocket event: {event}") @@ -344,13 +423,48 @@ async def on_websocket_event(self, event: WebsocketEvent) -> None: async def subscribe_to_location(self, inv_id: int) -> None: """Sends the json payload to receive updates from the location.""" LOG.debug(f"Subscribing to location {self.devices.location_id}") - await self._api.websocket.async_invoke( + await self._api.websocket_devices.async_invoke( [self.devices.location_id], "SubscribeToLocation", inv_id ) + @callback + async def subscribe_to_challenge(self, inv_id: int, event_id: int = 0) -> None: + """Sends the json payload to receive updates from the challenge.""" + # ic-dev21 : data structure of the message was incorrect, needed the "fixed" strings + LOG.debug(f"ic-dev21 subscribe to challenge :{event_id} or {self.challenge_id}") + event_id = event_id or self.challenge_id + + LOG.debug( + f"Subscribing to challenge {event_id} at location {self.devices.location_id}" + ) + await self._api.websocket_challenges.async_invoke( + [{"locationId": self.devices.location_id, "eventId": event_id}], + "SubscribeToChallenge", + inv_id, + ) + + @callback + async def subscribe_to_challengelist(self, inv_id: int) -> None: + """Sends the json payload to receive updates from the challenge list.""" + # ic-dev21 this will be necessary to get the challenge list + LOG.debug( + f"Subscribing to challenge list at location {self.devices.location_id}" + ) + await self._api.websocket_challenges.async_invoke( + [{"locationId": self.devices.location_id}], + "SubscribeToChallengeList", + inv_id, + ) + @callback async def request_status_update(self) -> None: - await self._api.websocket.send_status() + await self._api.websocket_devices.send_status() + for inv_id, inv_cb in self.invocations.items(): + await inv_cb(inv_id) + + @callback + async def request_status_update_challenge(self) -> None: + await self._api.websocket_challenges.send_status() for inv_id, inv_cb in self.invocations.items(): await inv_cb(inv_id) @@ -424,20 +538,28 @@ async def async_init(self, scan_interval: int) -> None: self._hass, self.entry, self.unknown_tracker_device ) - self._api.websocket.add_connect_callback(self.request_status_update) - self._api.websocket.add_event_callback(self.on_websocket_event) - self._websocket_reconnect_task = asyncio.create_task( - self.start_websocket_loop() + self._api.websocket_devices.add_connect_callback(self.request_status_update) + self._api.websocket_devices.add_event_callback(self.on_websocket_event) + self._api.websocket_challenges.add_connect_callback( + self.request_status_update_challenge + ) + self._api.websocket_challenges.add_event_callback(self.on_websocket_event) + self._websocket_reconnect_tasks[0] = asyncio.create_task( + self.start_websocket_loop(self._api.websocket_devices, 0) + ) + self._websocket_reconnect_tasks[1] = asyncio.create_task( + self.start_websocket_loop(self._api.websocket_challenges, 1) ) - # asyncio.create_task(self._api.websocket.async_connect()) + + # asyncio.create_task(self._api.websocket_devices.async_connect()) async def websocket_disconnect_listener(_: Event) -> None: """Define an event handler to disconnect from the websocket.""" if TYPE_CHECKING: - assert self._api.websocket + assert self._api.websocket_devices - if self._api.websocket.connected: - await self._api.websocket.async_disconnect() + if self._api.websocket_devices.connected: + await self._api.websocket_devices.async_disconnect() self.entry.async_on_unload( self._hass.bus.async_listen_once( @@ -452,37 +574,37 @@ async def websocket_disconnect_listener(_: Event) -> None: update_method=self.async_update, ) - async def start_websocket_loop(self) -> None: + async def start_websocket_loop(self, websocket, id) -> None: """Start a websocket reconnection loop.""" if TYPE_CHECKING: - assert self._api.websocket + assert websocket should_reconnect = True try: - await self._api.websocket.async_connect() - await self._api.websocket.async_listen() + await websocket.async_connect() + await websocket.async_listen() except asyncio.CancelledError: LOG.debug("Request to cancel websocket loop received") raise except WebsocketError as err: LOG.error(f"Failed to connect to websocket: {err}", exc_info=err) - await self.cancel_websocket_loop() + await self.cancel_websocket_loop(websocket, id) except InvalidCredentialsError: LOG.warning("Invalid credentials? Refreshing websocket infos") - await self.cancel_websocket_loop() + await self.cancel_websocket_loop(websocket, id) await self._api.refresh_ws_token() except Exception as err: # pylint: disable=broad-except LOG.error( f"Unknown exception while connecting to websocket: {err}", exc_info=err ) - await self.cancel_websocket_loop() + await self.cancel_websocket_loop(websocket, id) if should_reconnect: LOG.info("Disconnected from websocket; reconnecting in 5 seconds.") await asyncio.sleep(5) - self._websocket_reconnect_task = self._hass.async_create_task( - self.start_websocket_loop() + self._websocket_reconnect_tasks[id] = self._hass.async_create_task( + self.start_websocket_loop(websocket, id) ) async def cancel_task(self, task) -> None: @@ -496,15 +618,15 @@ async def cancel_task(self, task) -> None: task = None return task - async def cancel_websocket_loop(self) -> None: + async def cancel_websocket_loop(self, websocket, id) -> None: """Stop any existing websocket reconnection loop.""" - self._websocket_reconnect_task = await self.cancel_task( - self._websocket_reconnect_task + self._websocket_reconnect_tasks[id] = await self.cancel_task( + self._websocket_reconnect_tasks[id] ) - self._update_task = await self.cancel_task(self._update_task) + self._update_task[id] = await self.cancel_task(self._update_task[id]) if TYPE_CHECKING: - assert self._api.websocket - await self._api.websocket.async_disconnect() + assert websocket + await websocket.async_disconnect() async def async_update(self) -> None: """Updates tarif periodically.""" diff --git a/custom_components/hilo/sensor.py b/custom_components/hilo/sensor.py index 5776ad78..cab8839c 100755 --- a/custom_components/hilo/sensor.py +++ b/custom_components/hilo/sensor.py @@ -1,6 +1,7 @@ """Support for various Hilo sensors.""" from __future__ import annotations +import asyncio from datetime import datetime, timedelta, timezone from os.path import isfile @@ -158,17 +159,16 @@ async def async_setup_entry( def create_energy_entity(hilo, device): device._energy_entity = EnergySensor(hilo, device) new_entities.append(device._energy_entity) - energy_unique_id = f"{slugify(device.identifier)}-energy" - if ( - energy_entity := hilo.async_get_entity_id_domain( - Platform.SENSOR, energy_unique_id + energy_entity = f"{slugify(device.name)}_hilo_energy" + if energy_entity == HILO_ENERGY_TOTAL: + LOG.error( + "An hilo entity can't be named 'total' because it conflicts " + "with the generated name for the smart energy meter" ) - ) is None: - energy_entity = f"sensor.{slugify(device.name)}_hilo_energy" - energy_entity = energy_entity.replace("sensor.", "") - + return tariff_list = default_tariff_list if device.type == "Meter": + energy_entity = HILO_ENERGY_TOTAL tariff_list = validate_tariff_list(tariff_config) net_consumption = device.net_consumption utility_manager.add_meter(energy_entity, tariff_list, net_consumption) @@ -287,14 +287,7 @@ def __init__(self, hilo, device): if device.type == "Meter": self._attr_name = HILO_ENERGY_TOTAL - power_unique_id = f"{slugify(device.identifier)}-power" - if ( - power_entity_id := hilo.async_get_entity_id_domain( - Platform.SENSOR, power_unique_id - ) - ) is None: - power_entity_id = f"{Platform.SENSOR}.{slugify(device.name)}_power" - self._source = power_entity_id + self._source = f"sensor.{slugify(device.name)}_power" # ic-dev21: Set initial state and last_valid_state, removes log errors and unavailable states initial_state = 0 self._attr_native_value = initial_state @@ -708,7 +701,7 @@ async def _save_history(self, history: list): await yaml_file.write(yaml.dump(history, Dumper=yaml.RoundTripDumper)) -class HiloChallengeSensor(HiloEntity, RestoreEntity, SensorEntity): +class HiloChallengeSensor(HiloEntity, SensorEntity): """Hilo challenge sensor. Its state will be either: - off: no ongoing or scheduled challenge @@ -732,19 +725,135 @@ def __init__(self, hilo, device, scan_interval): old_unique_id, self._attr_unique_id, Platform.SENSOR ) LOG.debug(f"Setting up ChallengeSensor entity: {self._attr_name}") - # note ic-dev21: scan time at 5 minutes (300s) will force local update self.scan_interval = timedelta(seconds=EVENT_SCAN_INTERVAL_REDUCTION) self._state = "off" self._next_events = [] + self._events = {} # Store active events self.async_update = Throttle(self.scan_interval)(self._async_update) + hilo.register_websocket_listener(self) + + async def handle_challenge_added(self, event_data): + """Handle new challenge event.""" + LOG.debug("ic-dev21 handle_challenge_added") + if event_data.get("progress") == "scheduled": + event_id = event_data.get("id") + if event_id: + event = Event(**event_data) + if self._hilo.appreciation > 0: + event.appreciation(self._hilo.appreciation) + if self._hilo.pre_cold > 0: + event.pre_cold(self._hilo.pre_cold) + self._events[event_id] = event + self._update_next_events() + + async def handle_challenge_list_initial(self, challenges): + """Handle initial challenge list.""" + LOG.debug(f"ic-dev21 handle_challenge_list_initial challenges: {challenges}") + self._events.clear() + LOG.debug(f"ic-dev21 handle_challenge_list_initial events: {self._events}") + for challenge in challenges: + event_id = challenge.get("id") + progress = challenge.get("progress") + LOG.debug(f"ic-dev21 handle_challenge_list_initial progress is {progress}") + if challenge.get("progress") in ["scheduled", "inProgress"]: + event_id = challenge.get("id") + if event_id: + event = Event(**challenge) + if self._hilo.appreciation > 0: + event.appreciation(self._hilo.appreciation) + if self._hilo.pre_cold > 0: + event.pre_cold(self._hilo.pre_cold) + self._events[event_id] = event + self._update_next_events() + + async def handle_challenge_list_update(self, challenges): + """Handle challenge list updates.""" + LOG.debug("ic-dev21 handle_challenge_list_update") + for challenge in challenges: + event_id = challenge.get("id") + progress = challenge.get("progress") + baselinewH = challenge.get("baselinewH") + LOG.debug(f"ic-dev21 handle_challenge_list_update progress is {progress}") + LOG.debug( + f"ic-dev21 handle_challenge_list_update baselinewH is {baselinewH}" + ) + if event_id in self._events: + if challenge.get("progress") == "completed": + # Find the oldest event based on recovery_end datetime + oldest_event_id = min( + self._events.keys(), + key=lambda key: self._events[key] + .as_dict() + .get("phases", {}) + .get("recovery_end", ""), + ) + await asyncio.sleep(300) + del self._events[oldest_event_id] + break + else: + current_event = self._events[event_id] + LOG.debug( + f"ic-dev21 handle_challenge_list_update current event is: {current_event}" + ) + updated_event = Event(**{**current_event.as_dict(), **challenge}) + if self._hilo.appreciation > 0: + updated_event.appreciation(self._hilo.appreciation) + if self._hilo.pre_cold > 0: + updated_event.pre_cold(self._hilo.pre_cold) + self._events[event_id] = updated_event + self._update_next_events() + + async def handle_challenge_details_update(self, challenge): + """Handle challenge detail updates.""" + LOG.debug(f"ic-dev21 handle_challenge_details_update {challenge}") + challenge = challenge[0] if isinstance(challenge, list) else challenge + event_id = challenge.get("id") + progress = challenge.get("progress", "unknown") + baselinewH = challenge.get("baselinewH", 0) + used_wH = challenge.get("currentWh", 0) + if used_wH is not None and used_wH > 0: + used_kWh = used_wH / 1000 + else: + used_kWh = 0 + LOG.debug(f"ic-dev21 handle_challenge_details_update progress is {progress}") + LOG.debug( + f"ic-dev21 handle_challenge_details_update baselinewH is {baselinewH}" + ) + LOG.debug(f"ic-dev21 handle_challenge_details_update used_kwh is {used_kWh}") + LOG.debug(f"ic-dev21 handle_challenge_details_update progress is {progress}") + if event_id in self._events: + if challenge.get("progress") == "completed": + del self._events[event_id] + else: + current_event = self._events[event_id] + updated_event = Event(**{**current_event.as_dict(), **challenge}) + if self._hilo.appreciation > 0: + updated_event.appreciation(self._hilo.appreciation) + if self._hilo.pre_cold > 0: + updated_event.pre_cold(self._hilo.pre_cold) + self._events[event_id] = updated_event + self._update_next_events() + + def _update_next_events(self): + """Update the next_events list based on current events.""" + LOG.debug("ic-dev21 sorting events") + # Sort events by start time + sorted_events = sorted(self._events.values(), key=lambda x: x.preheat_start) + + self._next_events = [ + event.as_dict() for event in sorted_events # if event.state != "completed" + ] + + # Force an update of the entity + self.async_write_ha_state() @property def state(self): + """Return the current state based on next events.""" if len(self._next_events) > 0: event = Event(**{**{"id": 0}, **self._next_events[0]}) return event.state - else: - return "off" + return "off" @property def icon(self): @@ -768,7 +877,8 @@ def icon(self): @property def should_poll(self): - return True + """No need to poll with websockets.""" + return False @property def extra_state_attributes(self): @@ -777,24 +887,10 @@ def extra_state_attributes(self): async def async_added_to_hass(self): """Handle entity about to be added to hass event.""" await super().async_added_to_hass() - last_state = await self.async_get_last_state() - if last_state: - self._last_update = dt_util.utcnow() - self._state = last_state.state - self._next_events = last_state.attributes.get("next_events", []) async def _async_update(self): - self._next_events = [] - events = await self._hilo._api.get_gd_events(self._hilo.devices.location_id) - LOG.debug(f"Events received from Hilo: {events}") - for raw_event in events: - details = await self._hilo.get_event_details(raw_event["id"]) - event = Event(**details) - if self._hilo.appreciation > 0: - event.appreciation(self._hilo.appreciation) - if self._hilo.pre_cold > 0: - event.pre_cold(self._hilo.pre_cold) - self._next_events.append(event.as_dict()) + """This method can be kept for fallback but shouldn't be needed with websockets.""" + pass class DeviceSensor(HiloEntity, SensorEntity): diff --git a/requirements.txt b/requirements.txt index 8c48e819..9481af44 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ colorlog==6.9.0 -homeassistant~=2025.1.0 +homeassistant~=2025.1.4 pip>=21.3.1 -ruff==0.9.1 +ruff==0.9.4