diff --git a/src/common/io.py b/src/common/io.py index e889d2cb..9f8cb718 100644 --- a/src/common/io.py +++ b/src/common/io.py @@ -115,6 +115,17 @@ class ResultReader(abc.ABC): - (optional) get_trajectories(self, names) -> dict(name, Trajectory) for variables in . default: Loop over get_trajectory(name). """ + + def _get_name(self) -> list[str]: + warnings.warn( + "Getting variable names via the `name` attribute is deprecated and will be removed in a future version." + "Use the `get_variable_names` function instead.", + DeprecationWarning, + stacklevel=2 + ) + return self.get_variable_names() + + name = property(fget = _get_name) @abc.abstractmethod def get_variable_names(self) -> list[str]: @@ -214,17 +225,6 @@ class ResultDymola(ResultReader): Base class for representation of a result file. """ - def _get_name(self) -> list[str]: - warnings.warn( - "Getting variable names via the `name` attribute is deprecated and will be removed in a future version." - "Use the `get_variable_names` function instead.", - DeprecationWarning, - stacklevel=2 - ) - return self.get_variable_names() - - name = property(fget = _get_name) - def get_variable_names(self) -> list[str]: return [decode(n) for n in self.name_lookup.keys()] @@ -1124,6 +1124,11 @@ def is_variable(self, name): name -- Name of the variable/parameter/constant + Raises:: + + VariableNotFoundError -- + If variable does not exists. + Returns:: True if the variable is time-varying. @@ -1783,7 +1788,7 @@ def _get_calculated_diagnostics_cpu_time(self) -> np.ndarray: """Get trajectory values for cumulative CPU time.""" return DynamicDiagnosticsUtils.get_cpu_time(self.get_trajectory(f'{DIAGNOSTICS_PREFIX}cpu_time_per_step').x) - def is_variable(self, name): + def is_variable(self, name: str) -> bool: """ Returns True if the given name corresponds to a time-varying variable. @@ -1792,6 +1797,11 @@ def is_variable(self, name): name -- Name of the variable/parameter/constant. + Raises:: + + VariableNotFoundError -- + If variable does not exists. + Returns:: True if the variable is time-varying. @@ -3224,8 +3234,15 @@ def _data_1(self): return scipy.io.loadmat(self._fname, chars_as_strings=False, variable_names=["data_1"])["data_1"] def get_trajectory(self, name: str) -> Trajectory: + return self._get_trajectory(name) + + def _get_trajectory(self, name: str, start_index: int = 0, stop_index: int | None = None) -> Trajectory: time = self._diagnostics_time_vector if self._contains_diagnostic_data else self._time_vector + if start_index != 0 or stop_index is not None: + start_index, stop_index = _clamp_indicies(len(time), start_index, stop_index) + time = time[start_index: stop_index] + if name in ("time", "Time"): return Trajectory(time, time) @@ -3235,55 +3252,103 @@ def get_trajectory(self, name: str) -> Trajectory: return Trajectory(self._data_1[0], self._data_1[data_index]) case 2: data = ( - self._get_interpolated_trajectory(data_index) + self._get_interpolated_trajectory(data_index, start_index, stop_index) if self._contains_diagnostic_data - else self._get_trajectory(data_index) + else self._get_primary_trajectory(data_index, start_index, stop_index) ) return Trajectory(time, data) case 3: - return Trajectory(time, self._get_diagnostics_trajectory(data_index)) + return Trajectory(time, self._get_diagnostics_trajectory(data_index, start_index, stop_index)) case _: raise ValueError(f"Invalid data matrix: {data_mat}") @cached_property def _time_vector(self) -> np.ndarray: - return self._get_trajectory(0) + return self._get_primary_trajectory(0) @cached_property def _diagnostics_time_vector(self) -> np.ndarray: return self._get_diagnostics_trajectory(0) - def _get_trajectory(self, data_index: int) -> np.ndarray: - return fmi_util.read_trajectory( - encode(self._fname), - data_index, - self._data_2_info["file_position"], - self._data_2_info["sizeof_type"], - int(self._data_2_info["nbr_points"]), - int(self._data_2_info["nbr_variables"]) - ) + def _get_primary_trajectory(self, data_index: int, start_index: int = 0, stop_index: int | None = None) -> np.ndarray: + return self._get_trajectory_from_data(self._data_2_info, data_index, start_index, stop_index) + + def _get_diagnostics_trajectory(self, data_index: int, start_index: int = 0, stop_index: int | None = None) -> np.ndarray: + return self._get_trajectory_from_data(self._data_3_info, data_index, start_index, stop_index) + + def _get_trajectory_from_data(self, data_info, data_index: int, start_index: int = 0, stop_index: int | None = None) -> np.ndarray: + file_position = data_info["file_position"] + sizeof_type = data_info["sizeof_type"] + nbr_points = data_info["nbr_points"] + nbr_variables = data_info["nbr_variables"] + + start_index, stop_index = _clamp_indicies(nbr_points, start_index, stop_index) + new_file_position = file_position + start_index * sizeof_type * nbr_variables + new_nbr_points = stop_index - start_index - def _get_diagnostics_trajectory(self, data_index: int) -> np.ndarray: return fmi_util.read_trajectory( encode(self._fname), data_index, - self._data_3_info["file_position"], - self._data_3_info["sizeof_type"], - int(self._data_3_info["nbr_points"]), - int(self._data_3_info["nbr_variables"]) + new_file_position, + sizeof_type, + int(new_nbr_points), + int(nbr_variables) ) - def _get_interpolated_trajectory(self, data_index: int) -> np.ndarray: + def _get_interpolated_trajectory(self, data_index: int, start_index: int = 0, stop_index: int | None = None) -> np.ndarray: time_vector = self._time_vector - diag_time_vector = self._diagnostics_time_vector - data = self._get_trajectory(data_index) + data = self._get_primary_trajectory(data_index) if len(data) == 1: return data f = scipy.interpolate.interp1d(time_vector, data, fill_value="extrapolate") + + start_index, stop_index = _clamp_indicies(self._data_3_info["nbr_points"], start_index, stop_index) + diag_time_vector = self._diagnostics_time_vector[start_index: stop_index] return f(diag_time_vector) + def is_variable(self, name: str) -> bool: + if name == 'time' or name== 'Time': + return True + + data_index, _ = self._get_data_index_mat(name) + return data_index != 1 + + def get_variables_data( + self, + names: list[str], + start_index: int = 0, + stop_index: int | None = None, + ) -> tuple[dict[str, Trajectory], Union[int, None]]: + if isinstance(start_index, int) and isinstance(stop_index, int) and stop_index < start_index: + raise ValueError(f"Invalid values for {start_index=} and {stop_index=}, " + \ + "'start_index' needs to be less than or equal to 'stop_index'.") + + trajectories = {name: self._get_trajectory(name, start_index, stop_index) for name in names} + largest_trajectory_length = self._find_max_trajectory_length(trajectories) + new_start_index = (start_index + largest_trajectory_length) if trajectories else start_index + return trajectories, new_start_index + + def _find_max_trajectory_length(self, trajectories: dict[str, Trajectory]) -> int: + """ + Given a dict of trajectories, find the length of the largest trajectory + among the set of continuous variables. We disregard parameters/constants since they are not stored + with the same amount of data points as trajectories for continuous variables. + """ + return max([0] + [len(t.x) for v, t in trajectories.items() if self.is_variable(v)]) + + +def _clamp_indicies(nbr_points: int, start_index: int, stop_index: int | None): + # Accounts for sub-sets of data + start_index = max(0, start_index) + stop_index = max(0, nbr_points if stop_index is None else min(nbr_points, stop_index)) + + # Finally when stop_index = None, we can end up with start > stop, + # therefore we need to use min(start, stop) + start_index = min(start_index, stop_index) + + return start_index, stop_index class ResultReaderBinaryMat(ResultReader): def __init__(self, fname, allow_file_updates=False): @@ -3331,14 +3396,94 @@ def _get_delegate(self, fname, allow_file_updates: bool): ) def get_variable_names(self) -> list[str]: + """Retrieve the names of the variables stored in this class.""" return self._delegate.get_variable_names() def get_trajectory(self, name: str) -> Trajectory: + """Retrieve a single pyfmi.common.io.Trajectory by variables name. + Parameters:: + + name -- + String of variable name. + Return:: + pyfmi.common.io.Trajectory corresponding to variable_name. + """ return self._delegate.get_trajectory(name) def get_trajectories(self, names: list[str]) -> dict[str, Trajectory]: + """Retrieve multiple trajectories, returns a dictionary of variables trajectories. + names -- + List of Strings of variable names. + Return:: + Dictionary: {variable_name: pyfmi.common.io.Trajectory}. + """ return self._delegate.get_trajectories(names) + def is_variable(self, name: str) -> bool: + """ + Returns True if the given name corresponds to a time-varying variable. + + Parameters:: + + name -- + Name of the variable/parameter/constant. + + Raises:: + + VariableNotFoundError -- + If variable does not exists. + + Returns:: + + True if the variable is time-varying. + """ + return self._delegate.is_variable(name) + + def get_variables_data( + self, + names: list[str], + start_index: int = 0, + stop_index: int | None = None, + ) -> tuple[dict[str, Trajectory], Union[int, None]]: + """" + Returns trajectories for each variable in 'names' with lengths adjusted for the + interval [start_index, stop_index], i.e. partial trajectories. + Improper values for start_index and stop_index that are out of bounds are automatically corrected, + such that: + Negative values are always adjusted to 0 or larger. + Out of bounds for stop_index is adjusted for the number of available data points, example: + If start_index = 0, stop_index = 5 and there are only 3 data points available, + then returned trajectories are of length 3. + If start_index is larger than or equal to the number of available data points, empty trajectories + are returned, i.e. trajectories of length 0. + Note that trajectories for parameters are always of length 2 if indices 0 and 1 are + part of the requested trajectory since they reflect the values of before and after initialization. + Therefore if you request a trajectory for a parameter with start_index>=2, returned trajectory is empty. + + By default, start_index = 0 and stop_index = None, which implies that the full trajectory is returned. + + Parameters:: + + names -- + List of variables names for which to fetch trajectories. + + start_index -- + The index from where the trajectory data starts from. + + stop_index -- + The index from where the trajectory data ends. If stop_index is set to None, + it implies that all data in the slice [start_index:] is returned. + + Raises:: + ValueError -- If stop_index < start_index. + + Returns:: + Tuple: (dict of trajectories with keys corresponding to variable names, next start index (non-negative)) + """ + return self._delegate.get_variables_data( + names, start_index=start_index, stop_index=stop_index + ) + def verify_result_size(file_name, first_point, current_size, previous_size, max_size, ncp, time): free_space = get_available_disk_space(file_name) diff --git a/tests/test_io.py b/tests/test_io.py index 28c15788..ab095054 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -24,6 +24,7 @@ import re from io import StringIO, BytesIO from collections import OrderedDict +from dataclasses import dataclass from typing import Protocol from pyfmi import load_fmu @@ -2510,6 +2511,202 @@ def test_result_does_not_exist_raises_error(self): with pytest.raises(NoResultError): ResultReaderBinaryMat("does-not-exists") + def test_deprecated_name_property(self, mat_file): + result = ResultReaderBinaryMat(mat_file) + with pytest.warns( + DeprecationWarning, + match = "Use the `get_variable_names` function instead.", + ): + assert "spring.phi_nominal" in result.name + + def test_is_variable(self, mat_file): + result = ResultReaderBinaryMat(mat_file) + assert result.is_variable("spring.phi_nominal") + assert result.is_variable("time") + assert not result.is_variable("spring.k_constant") + + with pytest.raises(VariableNotFoundError): + result.is_variable("does.not.exists") + +@dataclass +class IndexCase: + start: int + stop: int | None + expected_next: int + + +class TestConsolidatedGetVariablesData: + """Tests for the get_variables_data method""" + + def test_get_full_trajectories_default_params(self, mat_file): + """Test getting full trajectories with default parameters""" + result = ResultReaderBinaryMat(mat_file) + trajectories, _ = result.get_variables_data(["time", "torque.flange.phi"]) + + assert len(trajectories) == 2 + assert len(trajectories["time"].x) == 3 + assert len(trajectories["torque.flange.phi"].x) == 3 + + def test_get_partial_trajectories_with_indices(self, mat_file): + """Test getting partial trajectories with start and stop indices""" + result = ResultReaderBinaryMat(mat_file) + trajectories, next_index = result.get_variables_data( + ["time", "torque.flange.phi"], + start_index=1, + stop_index=2, + ) + + assert len(trajectories["time"].x) == 1 + assert len(trajectories["torque.flange.phi"].x) == 1 + assert np.allclose(trajectories["time"].x, [11.0]) + assert np.allclose(trajectories["torque.flange.phi"].x, [1.1]) + assert next_index == 2 + + def test_get_trajectories_start_index_only(self, mat_file): + """Test getting trajectories from start_index to end""" + result = ResultReaderBinaryMat(mat_file) + trajectories, next_index = result.get_variables_data( + ["time", "torque.flange.phi"], + start_index=1 + ) + + assert len(trajectories["time"].x) == 2 + assert len(trajectories["torque.flange.phi"].x) == 2 + assert next_index == 3 + + def test_negative_start_index_corrected_to_zero(self, mat_file): + """Test that negative start_index is corrected to 0""" + result = ResultReaderBinaryMat(mat_file) + trajectories, _ = result.get_variables_data( + ["time"], + start_index=-5, + stop_index=2 + ) + + assert len(trajectories["time"].x) == 2 + assert np.allclose(trajectories["time"].x, [10.0, 11.0]) + + def test_out_of_bounds_stop_index_adjusted(self, mat_file): + """Test that out of bounds stop_index is adjusted to available data points""" + result = ResultReaderBinaryMat(mat_file) + trajectories, next_index = result.get_variables_data( + ["time"], + start_index=0, + stop_index=100 + ) + + assert len(trajectories["time"].x) == 3 + assert np.allclose(trajectories["time"].x, [10.0, 11.0, 12.0]) + assert next_index == 3 + + def test_start_index_beyond_data_returns_empty(self, mat_file): + """Test that start_index >= data points returns empty trajectories""" + result = ResultReaderBinaryMat(mat_file) + trajectories, next_index = result.get_variables_data( + ["torque.flange.phi"], + start_index=10 + ) + + assert len(trajectories["torque.flange.phi"].x) == 0 + assert next_index == 10 + + def test_stop_index_less_than_start_raises_error(self, mat_file): + """Test that stop_index < start_index raises ValueError""" + result = ResultReaderBinaryMat(mat_file) + with pytest.raises(ValueError): + result.get_variables_data( + ["time"], + start_index=5, + stop_index=2 + ) + + def test_parameter_with_start_index_two_or_more_returns_empty(self, mat_file): + """Test that parameters return empty trajectory when start_index > 2""" + result = ResultReaderBinaryMat(mat_file) + trajectories, _ = result.get_variables_data( + ["torque.flange.phi"], + start_index=3 + ) + + assert len(trajectories["torque.flange.phi"].x) == 0 + + def test_multiple_variables_mixed_types(self, mat_file): + """Test getting multiple variables including both parameters and time-series""" + result = ResultReaderBinaryMat(mat_file) + trajectories, _ = result.get_variables_data( + ["spring.phi_nominal", "time", "torque.flange.phi", "spring.k_constant", "@Diagnostics.error_code"], + start_index=0, + stop_index=1 + ) + + assert len(trajectories) == 5 + # Parameters + assert len(trajectories["spring.phi_nominal"].x) == 2 + assert len(trajectories["spring.k_constant"].x) == 2 + + # Time-series + assert len(trajectories["time"].x) == 1 + assert len(trajectories["torque.flange.phi"].x) == 1 + assert len(trajectories["@Diagnostics.error_code"].x) == 1 + + def test_empty_variable_list(self, mat_file): + """Test with empty variable names list""" + result = ResultReaderBinaryMat(mat_file) + trajectories, next_index = result.get_variables_data([]) + + assert len(trajectories) == 0 + assert next_index == 0 + + def test_diagnostic_variables(self, mat_file): + """Test getting diagnostic variables""" + result = ResultReaderBinaryMat(mat_file) + trajectories, _ = result.get_variables_data( + ["@Diagnostics.step_time", "@Diagnostics.error_code"], + start_index=0, + stop_index=2 + ) + + assert len(trajectories["@Diagnostics.step_time"].x) == 2 + assert len(trajectories["@Diagnostics.error_code"].x) == 2 + assert np.allclose(trajectories["@Diagnostics.error_code"].x, [0.0, 1.0]) + + def test_nonexistent_variable_raises_error(self, mat_file): + """Test that requesting non-existent variable raises VariableNotFoundError""" + result = ResultReaderBinaryMat(mat_file) + with pytest.raises(VariableNotFoundError): + result.get_variables_data(["does.not.exist"]) + + @pytest.mark.parametrize( + "index", [IndexCase(0, 2, 2), IndexCase(0, None, 3), IndexCase(10, 15, 10)] + ) + def test_next_index_calculation(self, index: IndexCase, mat_file): + """Test that next_index is correctly calculated""" + result = ResultReaderBinaryMat(mat_file) + + _, next_index = result.get_variables_data( + ["time"], start_index=index.start, stop_index=index.stop + ) + assert next_index == index.expected_next + + def test_interpolation_values_with_time(self, mat_file): + result = ResultReaderBinaryMat(mat_file) + def get_clipped_trajectory(name: str): + trajs, _ = result.get_variables_data([name], stop_index=2) + return trajs[name] + + traj = get_clipped_trajectory("torque.flange.phi") + assert np.allclose(traj.t, [10.0, 11.0]) + assert np.allclose(traj.x, [1.0, 1.1]) + + def test_interpolation_between_points(self, mat_file_interpolation): + result = ResultReaderBinaryMat(mat_file_interpolation) + trajs, _ = result.get_variables_data(["spring.phi_nominal"], start_index=1, stop_index=4) + traj = trajs["spring.phi_nominal"] + + assert np.allclose(traj.t, [0.5, 1.0, 1.5]) + assert traj.x[0] == pytest.approx(5.0) + assert traj.x[2] == pytest.approx(15.0) + def test_interpolation_between_points(mat_file_interpolation): result = ResultReaderBinaryMat(mat_file_interpolation)