From 13376107d2dc342715698181a1afa6a9a7b1e1df Mon Sep 17 00:00:00 2001 From: petermeisrimelmodelon Date: Tue, 10 Feb 2026 13:12:36 +0000 Subject: [PATCH] testing downsampling of master algorithm in serial mode; executing maximally sized do_step calls --- src/pyfmi/master.pyx | 53 +++++++++-- tests/test_fmi_master.py | 191 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 232 insertions(+), 12 deletions(-) diff --git a/src/pyfmi/master.pyx b/src/pyfmi/master.pyx index 9d963f6c..0907ca9a 100644 --- a/src/pyfmi/master.pyx +++ b/src/pyfmi/master.pyx @@ -75,6 +75,31 @@ cdef perform_do_step_serial(list models, dict time_spent, double cur_time, doubl if status != 0: raise FMUException("The step failed for model %s at time %f. See the log for more information. Return flag %d."%(model.get_name(), cur_time, status)) +cdef perform_do_step_serial_with_downsampling( + long step_number, + list models, + list downsampling_rates, + dict time_spent, + double cur_time, + double final_time, + double step_size, + bool new_step): + """ + Perform a do step on all the models. + """ + cdef double time_start = 0.0 + cdef int status = 0 + + for model, ds_rate in zip(models, downsampling_rates): + # Note: step_number is 0 based + if (step_number % ds_rate) == 0: + time_start = timer() + h = min(ds_rate*step_size, abs(final_time - cur_time)) # TODO: eps adjustments here? + status = model.do_step(cur_time, h, new_step) + time_spent[model] += timer() - time_start + if status != 0: + raise FMUException("The step failed for model %s at time %f. See the log for more information. Return flag %d."%(model.get_name(), cur_time, status)) + cdef perform_do_step_parallel(list models, FMIL2.fmi2_import_t** model_addresses, int n, double cur_time, double step_size, int new_step): """ Perform a do step on all the models. @@ -358,6 +383,9 @@ class MasterAlgOptions(OptionBase): It is not required to have all models as keys, missing ones take the default value of 1. Default: {m: 1 for m in models} (no downsampling) + + _experimental_serial_downsampling -- + TODO """ def __init__(self, master, *args, **kw): _defaults= { @@ -389,6 +417,7 @@ class MasterAlgOptions(OptionBase): "num_threads":None, "result_downsampling_factor": dict((model, 1) for model in master.models), "step_size_downsampling_factor" : dict((model, 1) for model in master.models), + "_experimental_serial_downsampling": False, } super(MasterAlgOptions,self).__init__(_defaults) # Exceptions to the above types need to handled here, e.g., allowing both @@ -440,6 +469,7 @@ cdef class Master: cdef public long long _step_number cdef public bool _last_step cdef public dict step_size_downsampling_factor + cdef public bool _uses_step_size_downsampling def __init__(self, models, connections): """ @@ -1388,8 +1418,18 @@ cdef class Master: step_size = final_time - tcur self.set_current_step_size(step_size) self._last_step = True - - perform_do_step(self.models, self.elapsed_time, self.fmu_adresses, tcur, step_size, True, calling_setting) + if opts["_experimental_serial_downsampling"]: + perform_do_step_serial_with_downsampling( + self._step_number, + self.models, + list(self.step_size_downsampling_factor.values()), + self.elapsed_time, + tcur, + final_time, + step_size, + True) + else: + perform_do_step(self.models, self.elapsed_time, self.fmu_adresses, tcur, step_size, True, calling_setting) if self.opts["store_step_before_update"]: time_start = timer() @@ -1617,7 +1657,9 @@ cdef class Master: f"got: '{val}'.") self.step_size_downsampling_factor[m] = val - if set(self.step_size_downsampling_factor.values()) != {1} and options["logging"]: + self._uses_step_size_downsampling = set(self.step_size_downsampling_factor.values()) != {1} + + if self._uses_step_size_downsampling and options["logging"]: warnings.warn("Both 'step_size_downsampling_factor' and 'logging' are used. " \ "Logging of A, B, C, and D matrices will be done on the global step-size." \ "Actual values may no longer be sensible.") @@ -1627,10 +1669,9 @@ cdef class Master: warnings.warn("Extrapolation of inputs only supported if the individual FMUs support interpolation of inputs.") options["extrapolation_order"] = 0 - uses_step_size_downsampling = set(self.step_size_downsampling_factor.values()) != {1} - if uses_step_size_downsampling and options["extrapolation_order"] > 0: + if self._uses_step_size_downsampling and options["extrapolation_order"] > 0: raise FMUException("Use of 'step_size_downsampling_factor' with 'extrapolation_order' > 0 not supported.") - if uses_step_size_downsampling and self.linear_correction: + if self._uses_step_size_downsampling and self.linear_correction: raise FMUException("Use of 'step_size_downsampling_factor' with 'linear_correction' not supported.") if options["num_threads"] and options["execution"] == "parallel": diff --git a/tests/test_fmi_master.py b/tests/test_fmi_master.py index 99bc8154..f884ead8 100644 --- a/tests/test_fmi_master.py +++ b/tests/test_fmi_master.py @@ -21,6 +21,8 @@ import warnings import re import scipy.sparse as sps +import dataclasses +from typing import List from pathlib import Path from pyfmi import Master @@ -600,6 +602,7 @@ def test_with_error_control(self): msg = "Step-size downsampling not supported for error controlled simulation, no downsampling will be performed." with pytest.warns(UserWarning, match = re.escape(msg)): self.master.simulate(options = opts) + assert not self.master._uses_step_size_downsampling def test_extrapolation_order(self): """Test combination with the 'extrapolation_order' option.""" @@ -617,6 +620,7 @@ def test_extrapolation_order(self): err_msg = "Use of 'step_size_downsampling_factor' with 'extrapolation_order' > 0 not supported." with pytest.raises(FMUException, match = re.escape(err_msg)): master.simulate(options = opts) + assert not self.master._uses_step_size_downsampling def test_linear_correction(self): """Test combination with the 'linear_correction' option.""" @@ -634,6 +638,7 @@ def test_linear_correction(self): err_msg = "Use of 'step_size_downsampling_factor' with 'linear_correction' not supported." with pytest.raises(FMUException, match = re.escape(err_msg)): master.simulate(options = opts) + assert not self.master._uses_step_size_downsampling def test_check_invalid_option_input_non_dict_via_simulate_options(self): """Test invalid inputs to the option, not a dictionary.""" @@ -676,6 +681,7 @@ def test_partial_input(self): opts["step_size_downsampling_factor"] = {self.fmu1: 2} opts["step_size"] = 0.25 self.master.simulate(options = opts) + assert self.master._uses_step_size_downsampling @pytest.mark.parametrize("input_var_name, output_var_name", [ @@ -683,37 +689,42 @@ def test_partial_input(self): ("Float64_discrete_input", "Float64_discrete_output"), ] ) - @pytest.mark.parametrize("rate1, rate2, expected_res1, expected_res2", + @pytest.mark.parametrize("rate1, rate2, expected_res1, expected_res2, uses", [ # Sanity check (1, 1, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], + False, ), # external input only sampled every other step (2, 1, [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11], - [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11] + [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11], + True, ), # connection only updated every other step (1, 2, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], - [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11] + [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11], + True, ), # non-aligned test 1 (2, 3, [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11], - [1, 1, 1, 3, 3, 3, 7, 7, 7, 9, 9] + [1, 1, 1, 3, 3, 3, 7, 7, 7, 9, 9], + True, ), # non-aligned test 2 (3, 2, [1, 1, 1, 4, 4, 4, 7, 7, 7, 10, 10], - [1, 1, 1, 1, 4, 4, 7, 7, 7, 7, 10] + [1, 1, 1, 1, 4, 4, 7, 7, 7, 7, 10], + True, ), ] ) def test_two_feedthrough_system(self, input_var_name, output_var_name, - rate1, rate2, expected_res1, expected_res2): + rate1, rate2, expected_res1, expected_res2, uses): """Test 'step_size_downsampling_factor' for 2 Feedthrough models + 1 external input.""" t_start, t_final = 0, 10 opts = self.master.simulate_options() @@ -729,6 +740,7 @@ def test_two_feedthrough_system(self, input_var_name, output_var_name, res = self.master.simulate(t_start, t_final, options = opts, input = input_object) np.testing.assert_array_equal(res[0][output_var_name], expected_res1) np.testing.assert_array_equal(res[1][output_var_name], expected_res2) + assert self.master._uses_step_size_downsampling == uses @pytest.mark.parametrize("input_var_name, output_var_name", [ @@ -761,6 +773,7 @@ def test_three_feedthrough_system(self, input_var_name, output_var_name): ] res = master.simulate(t_start, t_final, options = opts, input = input_object) + assert self.master._uses_step_size_downsampling expected_res1 = np.array([1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11]) expected_res2 = np.array([1, 1, 1, 3, 3, 3, 7, 7, 7, 9, 9]) expected_res3 = np.array([1, 1, 1, 1, 5, 5, 5, 5, 9, 9, 9]) @@ -783,6 +796,7 @@ def test_with_store_step_before_update(self): ] res = self.master.simulate(t_start, t_final, options = opts, input = input_object) + assert self.master._uses_step_size_downsampling np.testing.assert_array_equal( res[0]["Float64_continuous_output"], # [1, 1, 1, X, 4, 4, 4, X, 7, 7, 7, X, 10, last = 10], # X = before update value @@ -825,3 +839,168 @@ def compute_global_D(self): "Actual values may no longer be sensible." with pytest.warns(UserWarning, match = re.escape(msg)): master.simulate(t_start, t_final, options = opts) + assert self.master._uses_step_size_downsampling + + +class FMUModelCS2DoStepLogging(FMUModelCS2): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.do_step_history = [] + + def do_step(self, current_t, step_size, new_step = True): + self.do_step_history.append(step_size) + return super().do_step(current_t, step_size, new_step) + + +@dataclasses.dataclass +class SerialDownsamplingTestCase(): + # inputs + downsampling_rate_1: int + downsampling_rate_2: int + # expected outputs + uses_downsampling: bool + expected_outputs_1: List[float] + expected_outputs_2: List[float] + expected_stepsizes_1: List[float] + expected_stepsizes_2: List[float] + + +class Test_Master_serial_downsampling(): + @pytest.mark.parametrize("test_case", [ + SerialDownsamplingTestCase( # trivial case + downsampling_rate_1 = 1, + downsampling_rate_2 = 1, + uses_downsampling = False, + expected_outputs_1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], + expected_outputs_2 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], + expected_stepsizes_1 = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1], + expected_stepsizes_2 = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1], + ), + SerialDownsamplingTestCase( # external input only sampled every other step + downsampling_rate_1 = 2, + downsampling_rate_2 = 1, + uses_downsampling = True, + expected_outputs_1 = [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11], + expected_outputs_2 = [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11], + expected_stepsizes_1 = [2, 2, 2, 2, 2], + expected_stepsizes_2 = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1], + ), + SerialDownsamplingTestCase( # connection only updated every other step + downsampling_rate_1 = 1, + downsampling_rate_2 = 2, + uses_downsampling = True, + expected_outputs_1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], + expected_outputs_2 = [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11], + expected_stepsizes_1 = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1], + expected_stepsizes_2 = [2, 2, 2, 2, 2], + ), + SerialDownsamplingTestCase( # non-aligned test 1 + downsampling_rate_1 = 2, + downsampling_rate_2 = 3, + uses_downsampling = True, + expected_outputs_1 = [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11], + expected_outputs_2 = [1, 1, 1, 3, 3, 3, 7, 7, 7, 9, 9], + expected_stepsizes_1 = [2, 2, 2, 2, 2], + expected_stepsizes_2 = [3, 3, 3, 1], + ), + SerialDownsamplingTestCase( # non-aligned test 2 + downsampling_rate_1 = 3, + downsampling_rate_2 = 2, + uses_downsampling = True, + expected_outputs_1 = [1, 1, 1, 4, 4, 4, 7, 7, 7, 10, 10], + expected_outputs_2 = [1, 1, 1, 1, 4, 4, 7, 7, 7, 7, 10], + expected_stepsizes_1 = [3, 3, 3, 1], + expected_stepsizes_2 = [2, 2, 2, 2, 2], + ), + ]) + def test_serial_downsampling(self, test_case: SerialDownsamplingTestCase): + """Test the warning one gets when using 'logging' + 'step_size_downsampling_factor'.""" + fmu1 = FMUModelCS2DoStepLogging(os.path.join(FMI2_REF_FMU_PATH, "Feedthrough.fmu")) + fmu2 = FMUModelCS2DoStepLogging(os.path.join(FMI2_REF_FMU_PATH, "Feedthrough.fmu")) + + input_variable_name = "Float64_continuous_input" + output_variable_name = "Float64_continuous_output" + + models = [fmu1, fmu2] + connections = [ + (fmu1, output_variable_name, + fmu2, input_variable_name), + ] + master = Master(models, connections) + + t_start, t_final = 0, 10 + opts = master.simulate_options() + opts["step_size"] = 1 + opts["step_size_downsampling_factor"] = { + fmu1: test_case.downsampling_rate_1, + fmu2: test_case.downsampling_rate_2 + } + opts["execution"] = "serial" + opts["_experimental_serial_downsampling"] = True + + # Generate input + input_object = [ + [(fmu1, input_variable_name)], + lambda t: [t + 1] # not starting at zero to test correct values taken with initialization + ] + + res = master.simulate(t_start, t_final, options = opts, input = input_object) + + assert master._uses_step_size_downsampling == test_case.uses_downsampling + # verify output values + np.testing.assert_array_equal( + res[0][output_variable_name], + test_case.expected_outputs_1) + np.testing.assert_array_equal( + res[1][output_variable_name], + test_case.expected_outputs_2) + + # verify used step sizes + np.testing.assert_array_equal( + fmu1.do_step_history, + test_case.expected_stepsizes_1) + np.testing.assert_array_equal( + fmu2.do_step_history, + test_case.expected_stepsizes_2) + + @pytest.mark.parametrize("factor", [1, 2, 5, 10]) + def test_rescale_step_size_and_downsampling(self, factor): + """Test that rescaling both the step-size and downsampling gives identical results.""" + fmu1 = FMUModelCS2DoStepLogging(os.path.join(FMI2_REF_FMU_PATH, "Feedthrough.fmu")) + fmu2 = FMUModelCS2DoStepLogging(os.path.join(FMI2_REF_FMU_PATH, "Feedthrough.fmu")) + + input_variable_name = "Float64_continuous_input" + output_variable_name = "Float64_continuous_output" + + models = [fmu1, fmu2] + connections = [ + (fmu1, output_variable_name, + fmu2, input_variable_name), + ] + master = Master(models, connections) + + t_start, t_final = 0, 10 + opts = master.simulate_options() + opts["step_size"] = 1/factor + ds_1, ds_2 = 2*factor, 3*factor + opts["step_size_downsampling_factor"] = {fmu1: ds_1, fmu2: ds_2} + opts["execution"] = "serial" + opts["_experimental_serial_downsampling"] = True + + # Generate input + input_object = [ + [(fmu1, input_variable_name)], + lambda t: [t + 1] # not starting at zero to test correct values taken with initialization + ] + + master.simulate(t_start, t_final, options = opts, input = input_object) + + assert master._uses_step_size_downsampling + # verify used step sizes + np.testing.assert_array_equal( + fmu1.do_step_history, + [2, 2, 2, 2, 2]) + np.testing.assert_array_equal( + fmu2.do_step_history, + [3, 3, 3, 1]) +