Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 47 additions & 6 deletions src/pyfmi/master.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,31 @@ cdef perform_do_step_serial(list models, dict time_spent, double cur_time, doubl
if status != 0:
raise FMUException("The step failed for model %s at time %f. See the log for more information. Return flag %d."%(model.get_name(), cur_time, status))

cdef perform_do_step_serial_with_downsampling(
long step_number,
list models,
list downsampling_rates,
dict time_spent,
double cur_time,
double final_time,
double step_size,
bool new_step):
"""
Perform a do step on all the models.
"""
cdef double time_start = 0.0
cdef int status = 0

for model, ds_rate in zip(models, downsampling_rates):
# Note: step_number is 0 based
if (step_number % ds_rate) == 0:
time_start = timer()
h = min(ds_rate*step_size, abs(final_time - cur_time)) # TODO: eps adjustments here?
status = model.do_step(cur_time, h, new_step)
time_spent[model] += timer() - time_start
if status != 0:
raise FMUException("The step failed for model %s at time %f. See the log for more information. Return flag %d."%(model.get_name(), cur_time, status))

cdef perform_do_step_parallel(list models, FMIL2.fmi2_import_t** model_addresses, int n, double cur_time, double step_size, int new_step):
"""
Perform a do step on all the models.
Expand Down Expand Up @@ -358,6 +383,9 @@ class MasterAlgOptions(OptionBase):
It is not required to have all models as keys,
missing ones take the default value of 1.
Default: {m: 1 for m in models} (no downsampling)

_experimental_serial_downsampling --
TODO
"""
def __init__(self, master, *args, **kw):
_defaults= {
Expand Down Expand Up @@ -389,6 +417,7 @@ class MasterAlgOptions(OptionBase):
"num_threads":None,
"result_downsampling_factor": dict((model, 1) for model in master.models),
"step_size_downsampling_factor" : dict((model, 1) for model in master.models),
"_experimental_serial_downsampling": False,
}
super(MasterAlgOptions,self).__init__(_defaults)
# Exceptions to the above types need to handled here, e.g., allowing both
Expand Down Expand Up @@ -440,6 +469,7 @@ cdef class Master:
cdef public long long _step_number
cdef public bool _last_step
cdef public dict step_size_downsampling_factor
cdef public bool _uses_step_size_downsampling

def __init__(self, models, connections):
"""
Expand Down Expand Up @@ -1388,8 +1418,18 @@ cdef class Master:
step_size = final_time - tcur
self.set_current_step_size(step_size)
self._last_step = True

perform_do_step(self.models, self.elapsed_time, self.fmu_adresses, tcur, step_size, True, calling_setting)
if opts["_experimental_serial_downsampling"]:
perform_do_step_serial_with_downsampling(
self._step_number,
self.models,
list(self.step_size_downsampling_factor.values()),
self.elapsed_time,
tcur,
final_time,
step_size,
True)
else:
perform_do_step(self.models, self.elapsed_time, self.fmu_adresses, tcur, step_size, True, calling_setting)

if self.opts["store_step_before_update"]:
time_start = timer()
Expand Down Expand Up @@ -1617,7 +1657,9 @@ cdef class Master:
f"got: '{val}'.")
self.step_size_downsampling_factor[m] = val

if set(self.step_size_downsampling_factor.values()) != {1} and options["logging"]:
self._uses_step_size_downsampling = set(self.step_size_downsampling_factor.values()) != {1}

if self._uses_step_size_downsampling and options["logging"]:
warnings.warn("Both 'step_size_downsampling_factor' and 'logging' are used. " \
"Logging of A, B, C, and D matrices will be done on the global step-size." \
"Actual values may no longer be sensible.")
Expand All @@ -1627,10 +1669,9 @@ cdef class Master:
warnings.warn("Extrapolation of inputs only supported if the individual FMUs support interpolation of inputs.")
options["extrapolation_order"] = 0

uses_step_size_downsampling = set(self.step_size_downsampling_factor.values()) != {1}
if uses_step_size_downsampling and options["extrapolation_order"] > 0:
if self._uses_step_size_downsampling and options["extrapolation_order"] > 0:
raise FMUException("Use of 'step_size_downsampling_factor' with 'extrapolation_order' > 0 not supported.")
if uses_step_size_downsampling and self.linear_correction:
if self._uses_step_size_downsampling and self.linear_correction:
raise FMUException("Use of 'step_size_downsampling_factor' with 'linear_correction' not supported.")

if options["num_threads"] and options["execution"] == "parallel":
Expand Down
191 changes: 185 additions & 6 deletions tests/test_fmi_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import warnings
import re
import scipy.sparse as sps
import dataclasses
from typing import List
from pathlib import Path

from pyfmi import Master
Expand Down Expand Up @@ -600,6 +602,7 @@ def test_with_error_control(self):
msg = "Step-size downsampling not supported for error controlled simulation, no downsampling will be performed."
with pytest.warns(UserWarning, match = re.escape(msg)):
self.master.simulate(options = opts)
assert not self.master._uses_step_size_downsampling

def test_extrapolation_order(self):
"""Test combination with the 'extrapolation_order' option."""
Expand All @@ -617,6 +620,7 @@ def test_extrapolation_order(self):
err_msg = "Use of 'step_size_downsampling_factor' with 'extrapolation_order' > 0 not supported."
with pytest.raises(FMUException, match = re.escape(err_msg)):
master.simulate(options = opts)
assert not self.master._uses_step_size_downsampling

def test_linear_correction(self):
"""Test combination with the 'linear_correction' option."""
Expand All @@ -634,6 +638,7 @@ def test_linear_correction(self):
err_msg = "Use of 'step_size_downsampling_factor' with 'linear_correction' not supported."
with pytest.raises(FMUException, match = re.escape(err_msg)):
master.simulate(options = opts)
assert not self.master._uses_step_size_downsampling

def test_check_invalid_option_input_non_dict_via_simulate_options(self):
"""Test invalid inputs to the option, not a dictionary."""
Expand Down Expand Up @@ -676,44 +681,50 @@ def test_partial_input(self):
opts["step_size_downsampling_factor"] = {self.fmu1: 2}
opts["step_size"] = 0.25
self.master.simulate(options = opts)
assert self.master._uses_step_size_downsampling

@pytest.mark.parametrize("input_var_name, output_var_name",
[
("Float64_continuous_input", "Float64_continuous_output"),
("Float64_discrete_input", "Float64_discrete_output"),
]
)
@pytest.mark.parametrize("rate1, rate2, expected_res1, expected_res2",
@pytest.mark.parametrize("rate1, rate2, expected_res1, expected_res2, uses",
[
# Sanity check
(1, 1,
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
False,
),
# external input only sampled every other step
(2, 1,
[1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11],
[1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11]
[1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11],
True,
),
# connection only updated every other step
(1, 2,
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
[1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11]
[1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11],
True,
),
# non-aligned test 1
(2, 3,
[1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11],
[1, 1, 1, 3, 3, 3, 7, 7, 7, 9, 9]
[1, 1, 1, 3, 3, 3, 7, 7, 7, 9, 9],
True,
),
# non-aligned test 2
(3, 2,
[1, 1, 1, 4, 4, 4, 7, 7, 7, 10, 10],
[1, 1, 1, 1, 4, 4, 7, 7, 7, 7, 10]
[1, 1, 1, 1, 4, 4, 7, 7, 7, 7, 10],
True,
),
]
)
def test_two_feedthrough_system(self, input_var_name, output_var_name,
rate1, rate2, expected_res1, expected_res2):
rate1, rate2, expected_res1, expected_res2, uses):
"""Test 'step_size_downsampling_factor' for 2 Feedthrough models + 1 external input."""
t_start, t_final = 0, 10
opts = self.master.simulate_options()
Expand All @@ -729,6 +740,7 @@ def test_two_feedthrough_system(self, input_var_name, output_var_name,
res = self.master.simulate(t_start, t_final, options = opts, input = input_object)
np.testing.assert_array_equal(res[0][output_var_name], expected_res1)
np.testing.assert_array_equal(res[1][output_var_name], expected_res2)
assert self.master._uses_step_size_downsampling == uses

@pytest.mark.parametrize("input_var_name, output_var_name",
[
Expand Down Expand Up @@ -761,6 +773,7 @@ def test_three_feedthrough_system(self, input_var_name, output_var_name):
]

res = master.simulate(t_start, t_final, options = opts, input = input_object)
assert self.master._uses_step_size_downsampling
expected_res1 = np.array([1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11])
expected_res2 = np.array([1, 1, 1, 3, 3, 3, 7, 7, 7, 9, 9])
expected_res3 = np.array([1, 1, 1, 1, 5, 5, 5, 5, 9, 9, 9])
Expand All @@ -783,6 +796,7 @@ def test_with_store_step_before_update(self):
]

res = self.master.simulate(t_start, t_final, options = opts, input = input_object)
assert self.master._uses_step_size_downsampling
np.testing.assert_array_equal(
res[0]["Float64_continuous_output"],
# [1, 1, 1, X, 4, 4, 4, X, 7, 7, 7, X, 10, last = 10], # X = before update value
Expand Down Expand Up @@ -825,3 +839,168 @@ def compute_global_D(self):
"Actual values may no longer be sensible."
with pytest.warns(UserWarning, match = re.escape(msg)):
master.simulate(t_start, t_final, options = opts)
assert self.master._uses_step_size_downsampling


class FMUModelCS2DoStepLogging(FMUModelCS2):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.do_step_history = []

def do_step(self, current_t, step_size, new_step = True):
self.do_step_history.append(step_size)
return super().do_step(current_t, step_size, new_step)


@dataclasses.dataclass
class SerialDownsamplingTestCase():
# inputs
downsampling_rate_1: int
downsampling_rate_2: int
# expected outputs
uses_downsampling: bool
expected_outputs_1: List[float]
expected_outputs_2: List[float]
expected_stepsizes_1: List[float]
expected_stepsizes_2: List[float]


class Test_Master_serial_downsampling():
@pytest.mark.parametrize("test_case", [
SerialDownsamplingTestCase( # trivial case
downsampling_rate_1 = 1,
downsampling_rate_2 = 1,
uses_downsampling = False,
expected_outputs_1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
expected_outputs_2 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
expected_stepsizes_1 = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
expected_stepsizes_2 = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
),
SerialDownsamplingTestCase( # external input only sampled every other step
downsampling_rate_1 = 2,
downsampling_rate_2 = 1,
uses_downsampling = True,
expected_outputs_1 = [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11],
expected_outputs_2 = [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11],
expected_stepsizes_1 = [2, 2, 2, 2, 2],
expected_stepsizes_2 = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
),
SerialDownsamplingTestCase( # connection only updated every other step
downsampling_rate_1 = 1,
downsampling_rate_2 = 2,
uses_downsampling = True,
expected_outputs_1 = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
expected_outputs_2 = [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11],
expected_stepsizes_1 = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1],
expected_stepsizes_2 = [2, 2, 2, 2, 2],
),
SerialDownsamplingTestCase( # non-aligned test 1
downsampling_rate_1 = 2,
downsampling_rate_2 = 3,
uses_downsampling = True,
expected_outputs_1 = [1, 1, 3, 3, 5, 5, 7, 7, 9, 9, 11],
expected_outputs_2 = [1, 1, 1, 3, 3, 3, 7, 7, 7, 9, 9],
expected_stepsizes_1 = [2, 2, 2, 2, 2],
expected_stepsizes_2 = [3, 3, 3, 1],
),
SerialDownsamplingTestCase( # non-aligned test 2
downsampling_rate_1 = 3,
downsampling_rate_2 = 2,
uses_downsampling = True,
expected_outputs_1 = [1, 1, 1, 4, 4, 4, 7, 7, 7, 10, 10],
expected_outputs_2 = [1, 1, 1, 1, 4, 4, 7, 7, 7, 7, 10],
expected_stepsizes_1 = [3, 3, 3, 1],
expected_stepsizes_2 = [2, 2, 2, 2, 2],
),
])
def test_serial_downsampling(self, test_case: SerialDownsamplingTestCase):
"""Test the warning one gets when using 'logging' + 'step_size_downsampling_factor'."""
fmu1 = FMUModelCS2DoStepLogging(os.path.join(FMI2_REF_FMU_PATH, "Feedthrough.fmu"))
fmu2 = FMUModelCS2DoStepLogging(os.path.join(FMI2_REF_FMU_PATH, "Feedthrough.fmu"))

input_variable_name = "Float64_continuous_input"
output_variable_name = "Float64_continuous_output"

models = [fmu1, fmu2]
connections = [
(fmu1, output_variable_name,
fmu2, input_variable_name),
]
master = Master(models, connections)

t_start, t_final = 0, 10
opts = master.simulate_options()
opts["step_size"] = 1
opts["step_size_downsampling_factor"] = {
fmu1: test_case.downsampling_rate_1,
fmu2: test_case.downsampling_rate_2
}
opts["execution"] = "serial"
opts["_experimental_serial_downsampling"] = True

# Generate input
input_object = [
[(fmu1, input_variable_name)],
lambda t: [t + 1] # not starting at zero to test correct values taken with initialization
]

res = master.simulate(t_start, t_final, options = opts, input = input_object)

assert master._uses_step_size_downsampling == test_case.uses_downsampling
# verify output values
np.testing.assert_array_equal(
res[0][output_variable_name],
test_case.expected_outputs_1)
np.testing.assert_array_equal(
res[1][output_variable_name],
test_case.expected_outputs_2)

# verify used step sizes
np.testing.assert_array_equal(
fmu1.do_step_history,
test_case.expected_stepsizes_1)
np.testing.assert_array_equal(
fmu2.do_step_history,
test_case.expected_stepsizes_2)

@pytest.mark.parametrize("factor", [1, 2, 5, 10])
def test_rescale_step_size_and_downsampling(self, factor):
"""Test that rescaling both the step-size and downsampling gives identical results."""
fmu1 = FMUModelCS2DoStepLogging(os.path.join(FMI2_REF_FMU_PATH, "Feedthrough.fmu"))
fmu2 = FMUModelCS2DoStepLogging(os.path.join(FMI2_REF_FMU_PATH, "Feedthrough.fmu"))

input_variable_name = "Float64_continuous_input"
output_variable_name = "Float64_continuous_output"

models = [fmu1, fmu2]
connections = [
(fmu1, output_variable_name,
fmu2, input_variable_name),
]
master = Master(models, connections)

t_start, t_final = 0, 10
opts = master.simulate_options()
opts["step_size"] = 1/factor
ds_1, ds_2 = 2*factor, 3*factor
opts["step_size_downsampling_factor"] = {fmu1: ds_1, fmu2: ds_2}
opts["execution"] = "serial"
opts["_experimental_serial_downsampling"] = True

# Generate input
input_object = [
[(fmu1, input_variable_name)],
lambda t: [t + 1] # not starting at zero to test correct values taken with initialization
]

master.simulate(t_start, t_final, options = opts, input = input_object)

assert master._uses_step_size_downsampling
# verify used step sizes
np.testing.assert_array_equal(
fmu1.do_step_history,
[2, 2, 2, 2, 2])
np.testing.assert_array_equal(
fmu2.do_step_history,
[3, 3, 3, 1])