diff --git a/examples/29_multivariable_streams/29_multivariable_streams.yaml b/examples/29_multivariable_streams/29_multivariable_streams.yaml new file mode 100644 index 000000000..858da1819 --- /dev/null +++ b/examples/29_multivariable_streams/29_multivariable_streams.yaml @@ -0,0 +1,12 @@ +name: "H2Integrate_config" + +system_summary: > + This example demonstrates the multivariable streams feature in H2Integrate. + Multivariable streams allow users to connect multiple related variables with + a single connection specification. In this example, we connect a wellhead_gas + stream (containing gas_flow, hydrogen_fraction, oxygen_fraction, gas_temperature, + and gas_pressure) between a dummy gas producer and consumer. + +driver_config: "driver_config.yaml" +technology_config: "tech_config.yaml" +plant_config: "plant_config.yaml" diff --git a/examples/29_multivariable_streams/driver_config.yaml b/examples/29_multivariable_streams/driver_config.yaml new file mode 100644 index 000000000..2368ccea9 --- /dev/null +++ b/examples/29_multivariable_streams/driver_config.yaml @@ -0,0 +1,6 @@ +name: "driver_config" +description: "Driver configuration for multivariable streams example" + +general: + folder_output: outputs + create_om_reports: false diff --git a/examples/29_multivariable_streams/plant_config.yaml b/examples/29_multivariable_streams/plant_config.yaml new file mode 100644 index 000000000..0979f7588 --- /dev/null +++ b/examples/29_multivariable_streams/plant_config.yaml @@ -0,0 +1,14 @@ +name: "plant_config" +description: "Demonstrates multivariable streams with a gas combiner" + +plant: + plant_life: 30 + simulation: + n_timesteps: 8760 + timezone: -6 + +technology_interconnections: [ + ["gas_producer_1", "gas_combiner", "wellhead_gas", "pipe"], + ["gas_producer_2", "gas_combiner", "wellhead_gas", "pipe"], + ["gas_combiner", "gas_consumer", "wellhead_gas"], +] diff --git a/examples/29_multivariable_streams/run_multivariable_streams.py b/examples/29_multivariable_streams/run_multivariable_streams.py new file mode 100644 index 000000000..63e7032e1 --- /dev/null +++ b/examples/29_multivariable_streams/run_multivariable_streams.py @@ -0,0 +1,154 @@ +""" +Example 29: Multivariable Streams with Gas Combiner + +This example demonstrates: +1. Multivariable streams - connecting multiple related variables with a single connection +2. Gas stream combiner - combining multiple gas streams with mass-weighted averaging + +Two gas producers with different properties feed into a combiner, which outputs +a single combined stream to a consumer. + +The wellhead_gas stream includes: +- gas_flow (kg/h): Total mass flow rate +- hydrogen_fraction: Fraction of hydrogen +- oxygen_fraction: Fraction of oxygen +- gas_temperature (K): Temperature +- gas_pressure (bar): Pressure +""" + +import numpy as np +import matplotlib.pyplot as plt + +from h2integrate.core.h2integrate_model import H2IntegrateModel + + +# Create and setup the H2Integrate model +model = H2IntegrateModel("29_multivariable_streams.yaml") + +model.setup() + +model.run() + + +# Get outputs from gas producers +print("\nGas Producer 1 Outputs:") +flow1 = model.prob.get_val("gas_producer_1.gas_flow_out", units="kg/h") +temp1 = model.prob.get_val("gas_producer_1.gas_temperature_out", units="K") +pres1 = model.prob.get_val("gas_producer_1.gas_pressure_out", units="bar") +print(f" Flow: mean={flow1.mean():.2f} kg/h") +print(f" Temperature: mean={temp1.mean():.1f} K") +print(f" Pressure: mean={pres1.mean():.2f} bar") + +print("\nGas Producer 2 Outputs:") +flow2 = model.prob.get_val("gas_producer_2.gas_flow_out", units="kg/h") +temp2 = model.prob.get_val("gas_producer_2.gas_temperature_out", units="K") +pres2 = model.prob.get_val("gas_producer_2.gas_pressure_out", units="bar") +print(f" Flow: mean={flow2.mean():.2f} kg/h") +print(f" Temperature: mean={temp2.mean():.1f} K") +print(f" Pressure: mean={pres2.mean():.2f} bar") + +# Get outputs from combiner +print("\nGas Combiner Outputs (mass-weighted average):") +flow_out = model.prob.get_val("gas_combiner.gas_flow_out", units="kg/h") +temp_out = model.prob.get_val("gas_combiner.gas_temperature_out", units="K") +pres_out = model.prob.get_val("gas_combiner.gas_pressure_out", units="bar") +h2_out = model.prob.get_val("gas_combiner.hydrogen_fraction_out") +print(f" Total Flow: mean={flow_out.mean():.2f} kg/h (sum of inputs)") +print(f" Temperature: mean={temp_out.mean():.1f} K (weighted avg)") +print(f" Pressure: mean={pres_out.mean():.2f} bar (weighted avg)") +print(f" H2 Fraction: mean={h2_out.mean():.3f} (weighted avg)") + +# Get derived outputs from gas_consumer +print("\nGas Consumer Derived Outputs:") +h2_mass_flow = model.prob.get_val("gas_consumer.hydrogen_mass_flow", units="kg/h") +total_consumed = model.prob.get_val("gas_consumer.total_gas_consumed", units="kg") +avg_temp = model.prob.get_val("gas_consumer.avg_temperature", units="K") +avg_pressure = model.prob.get_val("gas_consumer.avg_pressure", units="bar") +print(f" H2 Mass Flow: mean={h2_mass_flow.mean():.2f} kg/h") +print(f" Total Gas Consumed: {total_consumed[0]:,.0f} kg") +print(f" Avg Temperature: {avg_temp[0]:.1f} K") +print(f" Avg Pressure: {avg_pressure[0]:.2f} bar") + +print("\n" + "=" * 60) +print("SUCCESS: Gas stream combiner with multivariable streams worked!") +print("=" * 60) + + +# Time axis in hours +n_timesteps = len(flow1) +time_hours = np.arange(n_timesteps) + +# Create figure with 3 subplots sharing x-axis +fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 10), sharex=True) +fig.suptitle("Gas Stream Time History", fontsize=14, fontweight="bold") + +# Colors for the two streams +color1 = "#1f77b4" # Blue for stream 1 +color2 = "#ff7f0e" # Orange for stream 2 +color_total = "#2ca02c" # Green for total/combined + +# ------------------------------------------------------------ +# Plot 1: Mass Flow Rates (stacked area) +# ------------------------------------------------------------ +# Stack the flows - stream 1 on bottom, stream 2 on top +ax1.fill_between(time_hours, 0, flow1, color=color1, alpha=0.7, label="Stream 1") +ax1.fill_between(time_hours, flow1, flow1 + flow2, color=color2, alpha=0.7, label="Stream 2") +ax1.plot(time_hours, flow_out, color=color_total, linewidth=2, label="Total (Combined)") + +# Add in-area labels at the center of each region +mid_time = n_timesteps // 2 +ax1.text( + mid_time, + flow1.mean() / 2, + "Producer 1", + ha="center", + va="center", + fontsize=10, + fontweight="bold", + color="white", +) +ax1.text( + mid_time, + flow1.mean() + flow2.mean() / 2, + "Producer 2", + ha="center", + va="center", + fontsize=10, + fontweight="bold", + color="white", +) + +ax1.set_ylabel("Mass Flow Rate (kg/h)") +ax1.set_title("Mass Flow Rates") +ax1.legend(loc="upper right") +ax1.grid(True, alpha=0.3) +ax1.set_ylim(bottom=0) + +# ------------------------------------------------------------ +# Plot 2: Pressure +# ------------------------------------------------------------ +ax2.plot(time_hours, pres1, color=color1, linewidth=1.5, label="Stream 1", linestyle="--") +ax2.plot(time_hours, pres2, color=color2, linewidth=1.5, label="Stream 2", linestyle="--") +ax2.plot(time_hours, pres_out, color=color_total, linewidth=2, label="Combined") + +ax2.set_ylabel("Pressure (bar)") +ax2.set_title("Pressure") +ax2.legend(loc="upper right") +ax2.grid(True, alpha=0.3) + +# ------------------------------------------------------------ +# Plot 3: Temperature +# ------------------------------------------------------------ +ax3.plot(time_hours, temp1, color=color1, linewidth=1.5, label="Stream 1", linestyle="--") +ax3.plot(time_hours, temp2, color=color2, linewidth=1.5, label="Stream 2", linestyle="--") +ax3.plot(time_hours, temp_out, color=color_total, linewidth=2, label="Combined") + +ax3.set_xlabel("Time (hours)") +ax3.set_ylabel("Temperature (K)") +ax3.set_title("Temperature") +ax3.legend(loc="upper right") +ax3.grid(True, alpha=0.3) + +# Adjust layout and save +plt.tight_layout() +plt.show() diff --git a/examples/29_multivariable_streams/tech_config.yaml b/examples/29_multivariable_streams/tech_config.yaml new file mode 100644 index 000000000..37551e99e --- /dev/null +++ b/examples/29_multivariable_streams/tech_config.yaml @@ -0,0 +1,64 @@ +name: "technology_config" +description: > + Technology configuration for multivariable streams example. + Demonstrates two gas producers feeding into a combiner, which then + feeds a consumer. The combiner uses mass-weighted averaging. + +technologies: + gas_producer_1: + performance_model: + model: "dummy_gas_producer_performance" + cost_model: + model: "dummy_gas_producer_cost" + model_inputs: + performance_parameters: + base_flow_rate: 150.0 # kg/hr + base_temperature: 310.0 # K + base_pressure: 12.0 # bar + flow_variation: 0.3 # ±30% variation + temp_variation: 15.0 # ±15 K variation + pressure_variation: 2.0 # ±2 bar variation + random_seed: 42 # for reproducibility + cost_parameters: + capex: 1000000.0 # $1M + opex: 50000.0 # $50k/year + cost_year: 2024 + + gas_producer_2: + performance_model: + model: "dummy_gas_producer_performance" + cost_model: + model: "dummy_gas_producer_cost" + model_inputs: + performance_parameters: + base_flow_rate: 100.0 # kg/hr - different flow rate + base_temperature: 350.0 # K - hotter stream + base_pressure: 8.0 # bar - lower pressure + flow_variation: 0.25 # ±25% variation + temp_variation: 20.0 # ±20 K variation + pressure_variation: 1.5 # ±1.5 bar variation + random_seed: 123 # different seed for different random pattern + cost_parameters: + capex: 800000.0 # $800k + opex: 40000.0 # $40k/year + cost_year: 2024 + + gas_combiner: + performance_model: + model: "gas_stream_combiner" + model_inputs: + performance_parameters: + stream_type: "wellhead_gas" + in_streams: 2 + + gas_consumer: + performance_model: + model: "dummy_gas_consumer_performance" + cost_model: + model: "dummy_gas_consumer_cost" + model_inputs: + performance_parameters: {} + cost_parameters: + capex: 2000000.0 # $2M + opex: 100000.0 # $100k/year + cost_year: 2024 diff --git a/h2integrate/converters/natural_gas/dummy_gas_components.py b/h2integrate/converters/natural_gas/dummy_gas_components.py new file mode 100644 index 000000000..a62b01049 --- /dev/null +++ b/h2integrate/converters/natural_gas/dummy_gas_components.py @@ -0,0 +1,231 @@ +""" +Dummy components for demonstrating multivariable streams. + +These components are used in example 29 to showcase the multivariable stream +connection feature. They produce and consume wellhead_gas streams with +5 constituent variables. +""" + +import numpy as np +import openmdao.api as om +from attrs import field, define + +from h2integrate.core.utilities import BaseConfig, merge_shared_inputs +from h2integrate.core.validators import gt_zero, gte_zero +from h2integrate.core.model_baseclasses import CostModelBaseClass, CostModelBaseConfig +from h2integrate.core.commodity_stream_definitions import multivariable_streams + + +@define(kw_only=True) +class DummyGasProducerPerformanceConfig(BaseConfig): + """ + Configuration class for dummy gas producer performance model. + + Attributes: + base_flow_rate: Base gas flow rate in kg/h + base_temperature: Base gas temperature in K + base_pressure: Base gas pressure in bar + flow_variation: Fractional variation in flow rate (0-1) + temp_variation: Variation in temperature in K + pressure_variation: Variation in pressure in bar + random_seed: Seed for random number generator (for reproducibility) + """ + + base_flow_rate: float = field(default=100.0, validator=gt_zero) + base_temperature: float = field(default=300.0, validator=gt_zero) + base_pressure: float = field(default=10.0, validator=gt_zero) + flow_variation: float = field(default=0.2, validator=gte_zero) + temp_variation: float = field(default=10.0, validator=gte_zero) + pressure_variation: float = field(default=1.0, validator=gte_zero) + random_seed: int | None = field(default=None) + + +class DummyGasProducerPerformance(om.ExplicitComponent): + """ + A dummy gas producer component that outputs a 'wellhead_gas' multivariable stream. + + This component produces time-varying outputs for each constituent variable + of the wellhead_gas stream (gas_flow, hydrogen_fraction, oxygen_fraction, + gas_temperature, gas_pressure). + + The outputs use random variations around base values. + """ + + def initialize(self): + self.options.declare("driver_config", types=dict) + self.options.declare("plant_config", types=dict) + self.options.declare("tech_config", types=dict) + + def setup(self): + self.config = DummyGasProducerPerformanceConfig.from_dict( + merge_shared_inputs(self.options["tech_config"]["model_inputs"], "performance") + ) + n_timesteps = self.options["plant_config"]["plant"]["simulation"]["n_timesteps"] + + # Add all wellhead_gas stream outputs + for var_name, var_props in multivariable_streams["wellhead_gas"].items(): + self.add_output( + f"{var_name}_out", + val=0.0, + shape=n_timesteps, + units=var_props.get("units"), + desc=var_props.get("desc", ""), + ) + + def compute(self, inputs, outputs): + n_timesteps = self.options["plant_config"]["plant"]["simulation"]["n_timesteps"] + + # Set random seed for reproducibility if specified + rng = np.random.default_rng(self.config.random_seed) + + # Generate random variations around base values + base_flow = self.config.base_flow_rate + base_temp = self.config.base_temperature + base_pressure = self.config.base_pressure + + # Gas flow varies randomly within ±variation fraction + flow_noise = rng.uniform( + -self.config.flow_variation, self.config.flow_variation, n_timesteps + ) + outputs["gas_flow_out"] = base_flow * (1.0 + flow_noise) + + # Hydrogen fraction: 0.7 to 0.9 (random) + outputs["hydrogen_fraction_out"] = rng.uniform(0.7, 0.9, n_timesteps) + + # Oxygen fraction: 0.0 to 0.05 (random) + outputs["oxygen_fraction_out"] = rng.uniform(0.0, 0.05, n_timesteps) + + # Temperature varies randomly within ±temp_variation K + temp_noise = rng.uniform( + -self.config.temp_variation, self.config.temp_variation, n_timesteps + ) + outputs["gas_temperature_out"] = base_temp + temp_noise + + # Pressure varies randomly within ±pressure_variation bar + pres_noise = rng.uniform( + -self.config.pressure_variation, self.config.pressure_variation, n_timesteps + ) + outputs["gas_pressure_out"] = base_pressure + pres_noise + + +class DummyGasConsumerPerformance(om.ExplicitComponent): + """ + A dummy gas consumer component that takes in a 'wellhead_gas' multivariable stream. + + This component demonstrates receiving all constituent variables of a + wellhead_gas stream (gas_flow, hydrogen_fraction, oxygen_fraction, + gas_temperature, gas_pressure) and performing simple calculations. + + The component calculates some derived quantities from the input stream. + """ + + def initialize(self): + self.options.declare("driver_config", types=dict) + self.options.declare("plant_config", types=dict) + self.options.declare("tech_config", types=dict) + + def setup(self): + n_timesteps = self.options["plant_config"]["plant"]["simulation"]["n_timesteps"] + + # Add all wellhead_gas stream inputs + for var_name, var_props in multivariable_streams["wellhead_gas"].items(): + self.add_input( + f"{var_name}_in", + val=0.0, + shape=n_timesteps, + units=var_props.get("units"), + desc=var_props.get("desc", ""), + ) + + # Add some derived outputs + self.add_output( + "hydrogen_mass_flow", + val=0.0, + shape=n_timesteps, + units="kg/h", + desc="Mass flow rate of hydrogen component", + ) + self.add_output( + "total_gas_consumed", val=0.0, units="kg", desc="Total gas consumed over the simulation" + ) + self.add_output("avg_temperature", val=0.0, units="K", desc="Average gas temperature") + self.add_output("avg_pressure", val=0.0, units="bar", desc="Average gas pressure") + + def compute(self, inputs, outputs): + # Calculate derived quantities from the stream inputs + gas_flow = inputs["gas_flow_in"] + h2_fraction = inputs["hydrogen_fraction_in"] + temperature = inputs["gas_temperature_in"] + pressure = inputs["gas_pressure_in"] + + # Hydrogen mass flow is total flow times hydrogen fraction + outputs["hydrogen_mass_flow"] = gas_flow * h2_fraction + + # Total gas consumed (assuming hourly data, sum all flow rates) + outputs["total_gas_consumed"] = np.sum(gas_flow) + + # Average temperature and pressure + outputs["avg_temperature"] = np.mean(temperature) + outputs["avg_pressure"] = np.mean(pressure) + + +@define(kw_only=True) +class DummyGasProducerCostConfig(CostModelBaseConfig): + """ + Configuration class for dummy gas producer cost model. + + Attributes: + capex: Capital expenditure in USD + opex: Fixed operational expenditure in USD/year + """ + + capex: float = field(default=1_000_000.0, validator=gte_zero) + opex: float = field(default=50_000.0, validator=gte_zero) + + +class DummyGasProducerCost(CostModelBaseClass): + """ + Simple cost model for the dummy gas producer. + """ + + def setup(self): + self.config = DummyGasProducerCostConfig.from_dict( + merge_shared_inputs(self.options["tech_config"]["model_inputs"], "cost") + ) + + super().setup() + + def compute(self, inputs, outputs, discrete_inputs=None, discrete_outputs=None): + outputs["CapEx"] = self.config.capex + outputs["OpEx"] = self.config.opex + + +@define(kw_only=True) +class DummyGasConsumerCostConfig(CostModelBaseConfig): + """ + Configuration class for dummy gas consumer cost model. + + Attributes: + capex: Capital expenditure in USD + opex: Fixed operational expenditure in USD/year + """ + + capex: float = field(default=2_000_000.0, validator=gte_zero) + opex: float = field(default=100_000.0, validator=gte_zero) + + +class DummyGasConsumerCost(CostModelBaseClass): + """ + Simple cost model for the dummy gas consumer. + """ + + def setup(self): + self.config = DummyGasConsumerCostConfig.from_dict( + merge_shared_inputs(self.options["tech_config"]["model_inputs"], "cost") + ) + + super().setup() + + def compute(self, inputs, outputs, discrete_inputs=None, discrete_outputs=None): + outputs["CapEx"] = self.config.capex + outputs["OpEx"] = self.config.opex diff --git a/h2integrate/core/commodity_stream_definitions.py b/h2integrate/core/commodity_stream_definitions.py new file mode 100644 index 000000000..1322df3e9 --- /dev/null +++ b/h2integrate/core/commodity_stream_definitions.py @@ -0,0 +1,75 @@ +""" +Commodity stream definitions for H2Integrate. + +This module contains: +1. multivariable_streams: Definitions for streams that bundle multiple related variables +2. is_electricity_producer: Helper function to identify electricity-producing technologies +""" + +multivariable_streams = { + "wellhead_gas": { + "gas_flow": { + "units": "kg/h", + "desc": "Total mass flow rate of gas in the stream", + }, + "hydrogen_fraction": { + "units": "unitless", + "desc": "Fraction of hydrogen in the gas stream", + }, + "oxygen_fraction": { + "units": "unitless", + "desc": "Fraction of oxygen in the gas stream", + }, + "gas_temperature": { + "units": "K", + "desc": "Temperature of the gas stream", + }, + "gas_pressure": { + "units": "bar", + "desc": "Pressure of the gas stream", + }, + }, + "electricity_ac": { + "ac_power": { + "units": "kW", + "desc": "AC power of the electricity stream", + }, + "voltage": { + "units": "V", + "desc": "Voltage of the electricity stream", + }, + "frequency": { + "units": "Hz", + "desc": "Frequency of the electricity stream", + }, + }, +} + + +def is_electricity_producer(tech_name: str) -> bool: + """Check if a technology is an electricity producer. + + Args: + tech_name: The name of the technology to check. + Returns: + True if tech_name starts with any of the known electricity producing + tech prefixes (e.g., 'wind', 'solar', 'pv', 'grid_buy', etc.). + Note: + This uses prefix matching, so 'grid_buy_1' and 'grid_buy_2' would both + be considered electricity producers. Be careful when naming technologies + to avoid unintended matches (e.g., 'pv_battery' would be incorrectly + identified as an electricity producer). + """ + + # add any new electricity producing technologies to this list + electricity_producing_techs = [ + "wind", + "solar", + "pv", + "river", + "hopp", + "natural_gas_plant", + "grid_buy", + ] + + return any(tech_name.startswith(elem) for elem in electricity_producing_techs) diff --git a/h2integrate/core/h2integrate_model.py b/h2integrate/core/h2integrate_model.py index 2cbeb9540..1ae80cdd1 100644 --- a/h2integrate/core/h2integrate_model.py +++ b/h2integrate/core/h2integrate_model.py @@ -14,10 +14,14 @@ ) from h2integrate.finances.finances import AdjustedCapexOpexComp from h2integrate.core.resource_summer import ElectricitySumComp -from h2integrate.core.supported_models import supported_models, is_electricity_producer +from h2integrate.core.supported_models import supported_models from h2integrate.core.inputs.validation import load_tech_yaml, load_plant_yaml, load_driver_yaml from h2integrate.core.pose_optimization import PoseOptimization from h2integrate.postprocess.sql_to_csv import convert_sql_to_csv_summary +from h2integrate.core.commodity_stream_definitions import ( + multivariable_streams, + is_electricity_producer, +) try: @@ -330,6 +334,9 @@ def create_site_model(self): and resources models (if provided in the configuration) for that site. """ # Loop through each site defined in the plant config + # If no sites defined in plant_config, nothing to do + if "sites" not in self.plant_config or not self.plant_config["sites"]: + return for site_name, site_info in self.plant_config["sites"].items(): # Reorganize the plant config to be formatted as expected by the # resource models @@ -885,6 +892,42 @@ def connect_technologies(self): if len(connection) == 4: source_tech, dest_tech, transport_item, transport_type = connection + # Check if this is a multivariable stream connection + # Format: [source, dest, stream_name, transport_type] + if transport_item in multivariable_streams: + # Handle combiner connections with auto-counting + if "combiner" in dest_tech: + if dest_tech not in combiner_counts: + combiner_counts[dest_tech] = 1 + else: + combiner_counts[dest_tech] += 1 + stream_index = combiner_counts[dest_tech] + for var_name in multivariable_streams[transport_item]: + self.plant.connect( + f"{source_tech}.{var_name}_out", + f"{dest_tech}.{var_name}_in{stream_index}", + ) + # Handle splitter connections with auto-counting + elif "splitter" in source_tech: + if source_tech not in splitter_counts: + splitter_counts[source_tech] = 1 + else: + splitter_counts[source_tech] += 1 + stream_index = splitter_counts[source_tech] + for var_name in multivariable_streams[transport_item]: + self.plant.connect( + f"{source_tech}.{var_name}_out{stream_index}", + f"{dest_tech}.{var_name}_in", + ) + # Direct connection for non-combiner/splitter destinations + else: + for var_name in multivariable_streams[transport_item]: + self.plant.connect( + f"{source_tech}.{var_name}_out", + f"{dest_tech}.{var_name}_in", + ) + continue # Skip the rest of the 4-element handling + if transport_type in self.tech_names: # if the transport type is already a technology, skip creating a new component connection_name = f"{transport_type}" @@ -983,13 +1026,30 @@ def connect_technologies(self): source_tech, dest_tech, connected_parameter = connection if isinstance(connected_parameter, tuple | list): source_parameter, dest_parameter = connected_parameter - self.plant.connect( - f"{source_tech}.{source_parameter}", f"{dest_tech}.{dest_parameter}" - ) + # Check if this is a multivariable stream connection + if source_parameter in multivariable_streams: + # Expand the multivariable stream into individual connections + for var_name in multivariable_streams[source_parameter]: + self.plant.connect( + f"{source_tech}.{var_name}_out", f"{dest_tech}.{var_name}_in" + ) + else: + self.plant.connect( + f"{source_tech}.{source_parameter}", f"{dest_tech}.{dest_parameter}" + ) else: - self.plant.connect( - f"{source_tech}.{connected_parameter}", f"{dest_tech}.{connected_parameter}" - ) + # Check if the connected_parameter is a multivariable stream + if connected_parameter in multivariable_streams: + # Expand the multivariable stream into individual connections + for var_name in multivariable_streams[connected_parameter]: + self.plant.connect( + f"{source_tech}.{var_name}_out", f"{dest_tech}.{var_name}_in" + ) + else: + self.plant.connect( + f"{source_tech}.{connected_parameter}", + f"{dest_tech}.{connected_parameter}", + ) else: err_msg = f"Invalid connection: {connection}" @@ -1003,49 +1063,49 @@ def connect_technologies(self): for resource_key, resource_params in site_grp_inputs.get("resources", {}).items(): resource_models[f"{site_grp}-{resource_key}"] = resource_params - resource_source_connections = [c[0] for c in resource_to_tech_connections] - # Check if there is a missing resource to tech connection or missing resource model - if len(resource_models) != len(resource_source_connections): - if len(resource_models) > len(resource_source_connections): - # more resource models than resources connected to technologies - non_connected_resource = [ - k for k in resource_models if k not in resource_source_connections - ] - # check if theres a resource model that isn't connected to a technology - if len(non_connected_resource) > 0: - msg = ( - "Some resources are not connected to a technology. Resource models " - f"{non_connected_resource} are not included in " - "`resource_to_tech_connections`. Please connect these resources " - "to their technologies under `resource_to_tech_connections` in " - "the plant config file." - ) - raise ValueError(msg) - if len(resource_source_connections) > len(resource_models): - # more resources connected than resource models - missing_resource = [ - k for k in resource_source_connections if k not in resource_models - ] - # check if theres a resource model that isn't connected to a technology - if len(missing_resource) > 0: - msg = ( - "Missing resource(s) are not defined but are connected to a technology. " - f"Missing resource(s) are {missing_resource}. " - "Please check ``resource_to_tech_connections`` in the plant config file " - "or add the missing resources" - " to plant_config['site']['resources']." - ) - raise ValueError(msg) + resource_source_connections = [c[0] for c in resource_to_tech_connections] + # Check if there is a missing resource to tech connection or missing resource model + if len(resource_models) != len(resource_source_connections): + if len(resource_models) > len(resource_source_connections): + # more resource models than resources connected to technologies + non_connected_resource = [ + k for k in resource_models if k not in resource_source_connections + ] + # check if theres a resource model that isn't connected to a technology + if len(non_connected_resource) > 0: + msg = ( + "Some resources are not connected to a technology. Resource models " + f"{non_connected_resource} are not included in " + "`resource_to_tech_connections`. Please connect these resources " + "to their technologies under `resource_to_tech_connections` in " + "the plant config file." + ) + raise ValueError(msg) + if len(resource_source_connections) > len(resource_models): + # more resources connected than resource models + missing_resource = [ + k for k in resource_source_connections if k not in resource_models + ] + # check if theres a resource model that isn't connected to a technology + if len(missing_resource) > 0: + msg = ( + "Missing resource(s) are not defined but are connected to a" + f" technology. Missing resource(s) are {missing_resource}. " + "Please check ``resource_to_tech_connections`` in the plant" + " config file or add the missing resources" + " to plant_config['site']['resources']." + ) + raise ValueError(msg) - for connection in resource_to_tech_connections: - if len(connection) != 3: - err_msg = f"Invalid resource to tech connection: {connection}" - raise ValueError(err_msg) + for connection in resource_to_tech_connections: + if len(connection) != 3: + err_msg = f"Invalid resource to tech connection: {connection}" + raise ValueError(err_msg) - resource_name, tech_name, variable = connection + resource_name, tech_name, variable = connection - # Connect the resource output to the technology input - self.model.connect(f"{resource_name}.{variable}", f"{tech_name}.{variable}") + # Connect the resource output to the technology input + self.model.connect(f"{resource_name}.{variable}", f"{tech_name}.{variable}") # connect outputs of the technology models to the cost and finance models of the # same name if the cost and finance models are not None diff --git a/h2integrate/core/inputs/plant_schema.yaml b/h2integrate/core/inputs/plant_schema.yaml index d6d95064c..7b9d010ab 100644 --- a/h2integrate/core/inputs/plant_schema.yaml +++ b/h2integrate/core/inputs/plant_schema.yaml @@ -75,4 +75,3 @@ properties: required: - name - description - - sites diff --git a/h2integrate/core/resource_summer.py b/h2integrate/core/resource_summer.py index 00aa8541c..6520460a8 100644 --- a/h2integrate/core/resource_summer.py +++ b/h2integrate/core/resource_summer.py @@ -1,7 +1,7 @@ import numpy as np import openmdao.api as om -from h2integrate.core.supported_models import is_electricity_producer +from h2integrate.core.commodity_stream_definitions import is_electricity_producer class ElectricitySumComp(om.ExplicitComponent): diff --git a/h2integrate/core/supported_models.py b/h2integrate/core/supported_models.py index e030c2c45..b951cd292 100644 --- a/h2integrate/core/supported_models.py +++ b/h2integrate/core/supported_models.py @@ -54,6 +54,7 @@ SaltCavernStorageCostModel, LinedRockCavernStorageCostModel, ) +from h2integrate.transporters.gas_stream_combiner import GasStreamCombinerPerformanceModel from h2integrate.converters.ammonia.ammonia_synloop import ( AmmoniaSynLoopCostModel, AmmoniaSynLoopPerformanceModel, @@ -90,6 +91,12 @@ ) from h2integrate.converters.hydrogen.singlitico_cost_model import SingliticoCostModel from h2integrate.converters.co2.marine.direct_ocean_capture import DOCCostModel, DOCPerformanceModel +from h2integrate.converters.natural_gas.dummy_gas_components import ( + DummyGasConsumerCost, + DummyGasProducerCost, + DummyGasConsumerPerformance, + DummyGasProducerPerformance, +) from h2integrate.control.control_strategies.pyomo_controllers import ( HeuristicLoadFollowingController, ) @@ -228,6 +235,7 @@ "cable": CablePerformanceModel, "pipe": PipePerformanceModel, "combiner_performance": GenericCombinerPerformanceModel, + "gas_stream_combiner": GasStreamCombinerPerformanceModel, "splitter_performance": GenericSplitterPerformanceModel, "iron_transport_performance": IronTransportPerformanceComponent, "iron_transport_cost": IronTransportCostComponent, @@ -258,37 +266,13 @@ # Grid "grid_performance": GridPerformanceModel, "grid_cost": GridCostModel, + # Dummy components for multivariable stream demonstrations + "dummy_gas_producer_performance": DummyGasProducerPerformance, + "dummy_gas_producer_cost": DummyGasProducerCost, + "dummy_gas_consumer_performance": DummyGasConsumerPerformance, + "dummy_gas_consumer_cost": DummyGasConsumerCost, # Finance "ProFastComp": ProFastLCO, "ProFastNPV": ProFastNPV, "NumpyFinancialNPV": NumpyFinancialNPV, } - - -def is_electricity_producer(tech_name: str) -> bool: - """Check if a technology is an electricity producer. - - Args: - tech_name: The name of the technology to check. - Returns: - True if tech_name starts with any of the known electricity producing - tech prefixes (e.g., 'wind', 'solar', 'pv', 'grid_buy', etc.). - Note: - This uses prefix matching, so 'grid_buy_1' and 'grid_buy_2' would both - be considered electricity producers. Be careful when naming technologies - to avoid unintended matches (e.g., 'pv_battery' would be incorrectly - identified as an electricity producer). - """ - - # add any new electricity producing technologies to this list - electricity_producing_techs = [ - "wind", - "solar", - "pv", - "river", - "hopp", - "natural_gas_plant", - "grid_buy", - ] - - return any(tech_name.startswith(elem) for elem in electricity_producing_techs) diff --git a/h2integrate/core/test/test_supported_models.py b/h2integrate/core/test/test_supported_models.py index e42104098..bc58c0b44 100644 --- a/h2integrate/core/test/test_supported_models.py +++ b/h2integrate/core/test/test_supported_models.py @@ -1,4 +1,4 @@ -from h2integrate.core.supported_models import is_electricity_producer +from h2integrate.core.commodity_stream_definitions import is_electricity_producer def test_is_electricity_producer(subtests): diff --git a/h2integrate/transporters/gas_stream_combiner.py b/h2integrate/transporters/gas_stream_combiner.py new file mode 100644 index 000000000..5d5ae0a9e --- /dev/null +++ b/h2integrate/transporters/gas_stream_combiner.py @@ -0,0 +1,105 @@ +""" +Gas stream combiner for multivariable streams. + +Combines multiple gas streams using mass-weighted averaging for intensive properties +(temperature, pressure, composition) while summing extensive properties (mass flow). +""" + +import numpy as np +import openmdao.api as om +from attrs import field, define + +from h2integrate.core.utilities import BaseConfig, merge_shared_inputs +from h2integrate.core.commodity_stream_definitions import multivariable_streams + + +@define(kw_only=True) +class GasStreamCombinerConfig(BaseConfig): + """Configuration for the gas stream combiner. + + Attributes: + stream_type: Type of multivariable stream (e.g., 'wellhead_gas') + in_streams: Number of inflow streams to combine + """ + + stream_type: str = field(default="wellhead_gas") + in_streams: int = field(default=2) + + def __attrs_post_init__(self): + if self.stream_type not in multivariable_streams: + raise ValueError( + f"Unknown stream type '{self.stream_type}'. " + f"Available: {list(multivariable_streams.keys())}" + ) + + +class GasStreamCombinerPerformanceModel(om.ExplicitComponent): + """ + Combine multiple gas streams into one using mass-weighted averaging. + + Total mass flow is summed. Temperature, pressure, and compositions are + mass-weighted averages of the input streams. + """ + + def initialize(self): + self.options.declare("driver_config", types=dict) + self.options.declare("plant_config", types=dict) + self.options.declare("tech_config", types=dict) + + def setup(self): + self.config = GasStreamCombinerConfig.from_dict( + merge_shared_inputs(self.options["tech_config"]["model_inputs"], "performance") + ) + + n_timesteps = int(self.options["plant_config"]["plant"]["simulation"]["n_timesteps"]) + stream_def = multivariable_streams[self.config.stream_type] + + # Add inputs for each stream + for i in range(1, self.config.in_streams + 1): + for var_name, var_props in stream_def.items(): + self.add_input( + f"{var_name}_in{i}", + val=0.0, + shape=n_timesteps, + units=var_props.get("units"), + desc=f"Stream {i}: {var_props.get('desc', '')}", + ) + + # Add outputs + for var_name, var_props in stream_def.items(): + self.add_output( + f"{var_name}_out", + val=0.0, + shape=n_timesteps, + units=var_props.get("units"), + desc=f"Combined: {var_props.get('desc', '')}", + ) + + # Identify the flow variable for weighting + self._flow_var = next((v for v in stream_def.keys() if "flow" in v.lower()), None) + if self._flow_var is None: + raise ValueError(f"No flow variable found in '{self.config.stream_type}'") + + def compute(self, inputs, outputs): + n_streams = self.config.in_streams + stream_def = multivariable_streams[self.config.stream_type] + flow_var = self._flow_var + + # Collect mass flows + mass_flows = [inputs[f"{flow_var}_in{i}"] for i in range(1, n_streams + 1)] + total_mass_flow = sum(mass_flows) + outputs[f"{flow_var}_out"] = total_mass_flow + + # Mass-weighted average for other variables + for var_name in stream_def.keys(): + if var_name == flow_var: + continue + + weighted_sum = sum( + inputs[f"{var_name}_in{i}"] * mass_flows[i - 1] for i in range(1, n_streams + 1) + ) + + with np.errstate(divide="ignore", invalid="ignore"): + outputs[f"{var_name}_out"] = np.where( + total_mass_flow > 0, weighted_sum / total_mass_flow, 0.0 + )