diff --git a/.gitignore b/.gitignore index bee8a64..8691be0 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,5 @@ __pycache__ +*~ +*# +build +catbird.egg-info diff --git a/README.md b/README.md index 8f54ca5..d662b6a 100644 --- a/README.md +++ b/README.md @@ -5,8 +5,6 @@ Catbird A code for generation of Python objects and input for various [MOOSE](https://moose.inl.gov/SitePages/Home.aspx). -*Caveats: This currently only parses data for the `MOOSE::Problem` type.* - Prerequisites ------------- @@ -29,63 +27,98 @@ To include packages for testsing, run pip install .[test] ``` -Example -------- +# Usage -Below is an example generating Python classes for the `OpenMCCellAverageProblem` and `NekRSProblem` types in the [Cardinal](https://cardinal.cels.anl.gov/). +## Load syntax into a Factory +First, import the package and load available syntax from a MOOSE application. ```python -In [1]: from catbird import app_from_exec +>>> from catbird import * +>>> factory=Factory('./heat_conduction-opt') +``` +This may take a few minutes, so don't panic if it hangs. The syntax first has to be dumped from the original application as json and then parsed; +the factory then converts "available" syntax into python object constructors. The more syntax we enable the longer it takes, so if you want +to speed things up you should only enable the syntax you intend to use. To write out what objects are currently enabled, try: +```python +>>> factory.write_config("config.json") +``` +We could then edit that file and load it via an optional constructor argument to `Factory`: +```python +>>> lightweight_factory=factory=Factory('./heat_conduction-opt', config="lighweight_config.json") +``` +However, modifying the config file directory is likely to be very laborious if done manually. Another option +is to derive your own Factory class and override the `set_defaults` method. To limit the enabled syntax, we can pass in dictionaries +of enabled syntax into the `enable_syntax` method, e.g.: +``` +executioner_enable_dict={ + "obj_type": ["Steady","Transient"] +} +self.enable_syntax("Mesh") +self.enable_syntax("Executioner", executioner_enable_dict) +``` -In [2]: cardinal = app_from_exec('./cardinal-opt') +## Create a Model +With our factory built, all we have in practice is bunch of constructor objects. We now need to create the objects and assemble them as a `Model`: +```python +>>> model=MooseModel(factory) +``` +This will have created a "boiler-plate" model with some sensible base objects to work with. We can now start to set their attributes in an object-oriented way. +``` +>>> model.mesh.dim=2 +``` +To see the available attributes for a given object, just use "help" to obtain useful documentation e.g.: +``` +>>> help(model.mesh) +``` +The help will also print the type and valid options. Type checking of attributes is performed, and if the type (i) incompatible with the expected type +and (ii) not castable as the expected type then a ValueError exception is raised, e.g. +``` +>>> model.mesh.dim='2' # This is fine, MOOSE expects string +>>> model.mesh.dim=2 # This is fine, str(2) is valid +>>> model.mesh.nx=1 # This is fine, MOOSE expects in +>>> model.mesh.nx="hello" # This raises a ValueError as int("hello") is invalid +``` -In [3]: openmc_prob = cardinal['problems']['OpenMCCellAverageProblem']() +Many of the objects are `Collection` types, i.e. a collection MOOSE objects, for example Variables. To add a variable, we must call: +``` +>>> model.add_variable("T", order="SECOND") +``` +Notice the use of key-word arguments in this function call. We can also act on the created object directly, via +``` +>>> model.add_variable("T") +>>> model.variables.objects["T"].order="SECOND" +``` -In [4]: openmc_prob.batches? -Type: property -String form: -Docstring: -Type: int -Number of batches to run in OpenMC; this overrides the setting in the XML files. +Even if the root-level syntax hasn't been added to the model (but is available in the factory) we can still add it to our model through `add_syntax` calls: +``` +model.add_syntax("VectorPostprocessors") +``` +Since this is a Collection type, to add a specific VectorPostprocessor, we call the add_to_collection method: +``` +model.add_to_collection("VectorPostprocessors","VectorPostprocessor","t_sampler") ``` -Features --------- +It is generally expected that the user will develop their own class derived from `MooseModel`, overriding the method `load_default_syntax`. +See for example TransientModel in `model.py`. -Every attribute comes with type and dimensionality checking (for array types). -This provides live feedback to users when setting problem parameters. In the example above, -setting `openmc_prob.batches` to a non-integer value results in the following +## Write to file +Finally if we are happy with our model we can write to file: ```python -In [5]: openmc_prob.batches = 100 - -In [6]: openmc_prob.batches = 'one hundred' ---------------------------------------------------------------------------- -ValueError Traceback (most recent call last) -Cell In[6], line 1 -----> 1 openmc_prob.batches = 'one hundred' - -File ~/soft/catbird/catbird/cbird.py:66, in Catbird.prop_set..fset(self, val) - 64 def fset(self, val): - 65 if dim == 0: ----> 66 self.check_type(name, val, attr_type) - 67 if allowed_vals is not None: - 68 self.check_vals(name, val, allowed_vals) - -File ~/soft/catbird/catbird/cbird.py:40, in Catbird.check_type(name, val, attr_type) - 38 val_type_str = val.__class__.__name__ - 39 exp_type_str = attr_type.__name__ ----> 40 raise ValueError(f'Incorrect type "{val_type_str}" for attribute "{name}". ' - 41 f'Expected type "{exp_type_str}".') - 42 return val +>>> model.write("catbird_input.i") +``` +You can run that input from the command line as you normally would or launch a subprocess. -ValueError: Incorrect type "str" for attribute "batches". Expected type "int". +Note, if you inspect our file `catbird_input.i` you may not see all attributes written out. The default behaviour is that parameters that are unchanged relative to +their default value are suppressed when writing to file. To see all attributes, instead run +```python +>>> model.write("catbird_input_full.i", print_default=True) ``` -Default values are also set on attributes automatically, so full problem -descriptions can be generated by setting only required parameters. +Example +------- +A fully worked heat conduction example may be found in: `catbird/examples/thermal.py`. Run with +```bash +$ python examples/thermal.py +``` -```python -In [7]: openmc_prob.initial_properties -Out[7]: 'moose' -``` \ No newline at end of file diff --git a/catbird/__init__.py b/catbird/__init__.py index 8821230..4a8314a 100644 --- a/catbird/__init__.py +++ b/catbird/__init__.py @@ -1,3 +1,3 @@ - - -from .cbird import * \ No newline at end of file +from .obj import * +from .model import * +from .collection import * diff --git a/catbird/action.py b/catbird/action.py new file mode 100644 index 0000000..6a90473 --- /dev/null +++ b/catbird/action.py @@ -0,0 +1,38 @@ +from .base import MooseBase + +class MooseAction(MooseBase): + params_name="_moose_action_params" + class_alias="Action" + + def __init__(self): + if hasattr(self,"_action_moose_params"): + # Dictionary of the attributes this class should have + moose_param_dict_local=getattr(self,"_moose_action_params") + + # Loop over and make into attributes + for attr_name, moose_param in moose_param_dict_local.items(): + # Crucially, acts on the instance, not the class. + setattr(self,attr_name,moose_param.val) + + @property + def moose_action_params(self): + """ + Return a unified list of all the parameters we've added. + """ + moose_param_list_local=[] + if hasattr(self,"_moose_action_params"): + moose_param_dict_local=getattr(self,"_moose_action_params") + moose_param_list_local=list(moose_param_dict_local.keys()) + return moose_param_list_local + + def inner_to_str(self,print_default=False): + inner_str="" + param_list=self.moose_action_params + + # We don't print the type of actions + if "type" in param_list: + param_list.remove("type") + + for attr_name in param_list: + inner_str+=self.attr_to_str(attr_name,print_default) + return inner_str diff --git a/catbird/base.py b/catbird/base.py new file mode 100644 index 0000000..6b223a0 --- /dev/null +++ b/catbird/base.py @@ -0,0 +1,148 @@ +from abc import ABC +from collections.abc import Iterable +from .param import MooseParam +from .string import MooseString + +class MooseBase(ABC,MooseString): + """ + Class that can add type-checked properties to itself. + """ + def __setattr__(self, attr_name, value_in): + value_to_set=value_in + if hasattr(self,attr_name): + attr_val_now=getattr(self,attr_name) + type_now=type(attr_val_now) + + sub_type_now=None + if issubclass(type_now, Iterable) and type_now != str : + sub_type_now=type(attr_val_now[0]) + + if not isinstance(value_in,type_now): + # If not the right type, try to cast + values=None + if isinstance(value_in,str): + values=value_in.split() + + try: + if sub_type_now is not None: + if values: + value_to_set=[ sub_type_now(v) for v in values ] + else: + value_to_set=[sub_type_now(value_in)] + else: + value_to_set=type_now(value_in) + except ValueError: + msg="Attribute {} should have type {}".format(attr_name,type_now) + raise ValueError(msg) + super().__setattr__(attr_name, value_to_set) + + + @staticmethod + def check_type(name, val, attr_type): + """Checks a value's type""" + if not isinstance(val, attr_type): + val_type_str = val.__class__.__name__ + exp_type_str = attr_type.__name__ + raise ValueError(f'Incorrect type "{val_type_str}" for attribute "{name}". ' + f'Expected type "{exp_type_str}".') + return val + + @staticmethod + def check_vals(name, val, allowed_vals): + """Checks that a value is in the set of allowed_values""" + if val not in allowed_vals: + raise ValueError(f'Value {val} for attribute {name} is not one of {allowed_vals}') + + @classmethod + def add_moose_param(cls,moose_param): + assert isinstance(moose_param,MooseParam) + + # Check name + attr_name=moose_param.name + if attr_name.find("_syntax_") != -1: + msg="'_syntax_' is reserved attribute string. Cannot create attibute {}".format(attr_name) + raise RuntimeError(msg) + + # Store attribute in dict + params_name=None + if cls.params_name: + if not hasattr(cls,cls.params_name): + setattr(cls,cls.params_name,{}) + moose_param_dict_local=getattr(cls,cls.params_name) + moose_param_dict_local[attr_name]=moose_param + setattr(cls,cls.params_name,moose_param_dict_local) + + def get_param(self,attr_name): + """Return MooseParam corresponding to attribute name. + + Raise KeyError if not found. + """ + dict_now=getattr(self,self.params_name) + return dict_now[attr_name] + + def is_default(self,attr_name): + # Get current value + attr_val = getattr(self, attr_name) + + # Look up the default value + param=self.get_param(attr_name) + default_val = param.default + if default_val is None: + + default_val = param.attr_type() + + # Compare and return + return attr_val == default_val + + def attr_to_str(self,attr_name,print_default=False): + attr_str="" + if self.is_default(attr_name) and not print_default: + return attr_str + + attr_val=getattr(self, attr_name) + if attr_val is None: + return attr_str + + type_now=type(attr_val) + attr_val_str="" + if issubclass(type_now, Iterable) and type_now != str : + str_list = [ str(v)+" " for v in attr_val ] + attr_val_str="".join(str_list) + attr_val_str=attr_val_str.rstrip() + attr_val_str="'"+attr_val_str+"'" + else: + attr_val_str=str(attr_val) + + attr_str=self.indent+'{}='.format(attr_name)+attr_val_str+'\n' + return attr_str + + @classmethod + def moose_doc(cls, param_list): + """Generate documentation for all the MOOSE parameters""" + # Obtain class info for header + class_string='' + if hasattr(cls,'class_alias'): + class_name_now=getattr(cls,'class_alias') + class_string=" "+class_name_now + param_string='MOOSE'+class_string+' Parameters' + dash_len=len(param_string) + dashes=''.ljust(dash_len,"-") + + # Write header + doc_now=param_string + doc_now+="\n"+dashes + + # Loop over parameters' documentation + for param in param_list: + assert isinstance(param,MooseParam) + # We don't document the type as it is fixed + if param.name == "type": + continue + doc_now=doc_now+param.doc + + # Footer + more_dashes=''.ljust(65,"-") + doc_now+=more_dashes + doc_now+="\n" + + return doc_now diff --git a/catbird/cbird.py b/catbird/cbird.py deleted file mode 100644 index 4d86d9d..0000000 --- a/catbird/cbird.py +++ /dev/null @@ -1,244 +0,0 @@ -from abc import ABC -from collections.abc import Iterable -import json -import numpy as np -from pathlib import Path -import subprocess - - -type_mapping = {'Integer' : int, - 'Boolean' : bool, - 'Float' : float, - 'Real' : float, - 'String' : str, - 'Array' : list} - - -# convenience function for converting types -def _convert_to_type(t, val): - if t == bool: - val = bool(int(val)) - else: - val = t(val) - return val - - -class Catbird(ABC): - """ - Class that can add type-checked properties to itself. - """ - - def __init__(self): - self.__moose_attrs__ = [] - - @staticmethod - def check_type(name, val, attr_type): - """Checks a value's type""" - if not isinstance(val, attr_type): - val_type_str = val.__class__.__name__ - exp_type_str = attr_type.__name__ - raise ValueError(f'Incorrect type "{val_type_str}" for attribute "{name}". ' - f'Expected type "{exp_type_str}".') - return val - - @staticmethod - def check_vals(name, val, allowed_vals): - """Checks that a value is in the set of allowed_values""" - if val not in allowed_vals: - raise ValueError(f'Value {val} for attribute {name} is not one of {allowed_vals}') - - @staticmethod - def prop_get(name, default=None): - """Returns function for getting an attribute""" - def fget(self): - # set to the default value if the internal attribute doesn't exist - if not hasattr(self, '_'+name): - setattr(self, '_'+name, default) - value = getattr(self, '_'+name) - return value - return fget - - @staticmethod - def prop_set(name, attr_type, dim=0, allowed_vals=None): - """Returns a function for setting an attribute""" - def fset(self, val): - if dim == 0: - self.check_type(name, val, attr_type) - if allowed_vals is not None: - self.check_vals(name, val, allowed_vals) - setattr(self, '_'+name, val) - else: - val = np.asarray(val) - self.check_type(name, val.flat[0].item(), attr_type) - if len(val.shape) != dim: - raise ValueError(f'Dimensionality is incorrect. Expects a {dim}-D array.') - for v in val.flatten(): - if allowed_vals is not None: - self.check_vals(name, v, allowed_vals) - setattr(self, '_'+name, val) - # self.__moose_attrs__ += [name] - return fset - - @classmethod - def newattr(cls, attr_name, attr_type=str, dim=0, default=None, allowed_vals=None, desc=None): - """Adds a property to the class""" - if not isinstance(attr_name, str): - raise ValueError('Attributes must be strings') - prop = property(fget=cls.prop_get(attr_name, default), - fset=cls.prop_set(attr_name, attr_type, dim, allowed_vals)) - setattr(cls, attr_name, prop) - - # set attribute docstring - doc_str = f'\nType: {attr_type.__name__}\n' - if desc is not None: - doc_str += desc - if allowed_vals is not None: - doc_str += f'\nValues: {allowed_vals}' - - if doc_str: - getattr(cls, attr_name).__doc__ = doc_str - - def to_node(self): - """ - Create a pyhit node for this MOOSE object - """ - import pyhit - - node = pyhit.Node(hitnode=self.__class__.__name__) - - for attr in self.__moose_attrs__: - val = getattr(self, attr) - if val is not None: - node[attr] = val - - return node - - -def app_from_json(json_file, problem_names=None): - """ - Returns the Python objects corresponding to the MOOSE application described - by the json file. - - Parameters - ---------- - json_file : dict, str, or Path - Either an open file handle, or a path to the json file. If `json` is a - dict, it is assumed this is a pre-parsed json object. - problems : Iterable of str - Set of problems to generate classes for - - Returns - ------- - dict - A dictionary of problem objects - """ - - if isinstance(json_file, dict): - json_obj = json_file - else: - json_obj = json.load(json_file) - - out = dict() - - out['problems'] = parse_problems(json_obj, problem_names=problem_names) - - return out - - -def parse_problems(json_obj, problem_names=None): - # get problems block - problems = json_obj['blocks']['Problem']['types'] - - instances_out = dict() - - for problem, block in problems.items(): - # skip any blocks that we aren't looking for - if problem_names is not None and problem not in problem_names: - continue - - params = block['parameters'] - - # create new subclass of Catbird with a name that matches the problem - new_cls = type(problem, (Catbird,), dict()) - - # loop over the problem parameters - for param_name, param_info in params.items(): - # determine the type of the parameter - attr_types = tuple(type_mapping[t] for t in param_info['basic_type'].split(':')) - attr_type = attr_types[-1] - - if len(attr_types) > 1: - for t in attr_types[:-1]: - assert issubclass(t, Iterable) - ndim = len(attr_types) - 1 - else: - ndim = 0 - - # set allowed values if present - allowed_values = None - if param_info['options']: - values = param_info['options'].split() - allowed_values = [_convert_to_type(attr_type, v) for v in values] - - # apply the default value if provided - # TODO: default values need to be handled differently. They are replacing - # properties in the type definition as they are now - default = None - if 'default' in param_info and param_info['default'] != 'none': - # only supporting defaults for one dimensional dim types - vals = [_convert_to_type(attr_type, v) for v in param_info['default'].split()] - if ndim == 0: - default = vals[0] - else: - default = np.array(vals) - - # add an attribute to the class instance for this parameter - new_cls.newattr(param_name, - attr_type, - desc=param_info.get('description'), - default=default, - dim=ndim, - allowed_vals=allowed_values) - - # insert new instance into the output dictionary - instances_out[problem] = new_cls - - return instances_out - - -def app_from_exec(exec, problem_names=None): - """ - Returns the Python objects corresponding to the MOOSE - application described by the json file. - - Parameters - ---------- - json : str or Path - Path to the MOOSE executable - problems : Iterable of str - Set of problems to generate classes for - - Returns - ------- - dict - A dictionary of problem objects - """ - - json_proc = subprocess.Popen([exec, '--json'], stdout=subprocess.PIPE) - json_str = '' - - # filter out the header and footer from the json data - while True: - line = json_proc.stdout.readline().decode() - if not line: - break - if '**START JSON DATA**' in line: - continue - if '**END JSON DATA**' in line: - continue - - json_str += line - - j_obj = json.loads(json_str) - - return app_from_json(j_obj, problem_names=problem_names) diff --git a/catbird/collection.py b/catbird/collection.py new file mode 100644 index 0000000..5c2dcd5 --- /dev/null +++ b/catbird/collection.py @@ -0,0 +1,47 @@ +from collections.abc import MutableSet +from .base import MooseBase +from .action import MooseAction +from .obj import MooseObject +from .string import MooseString + +class MooseCollection(MutableSet,MooseString): + """A collection of MOOSE objects""" + def __init__(self): + self.objects={} + + # Define mandatory methods + def __contains__(self,key): + return key in self.objects.keys() + + def __iter__(self): + return iter(self.objects) + + def __len__(self): + return len(self.objects) + + def _check_type(self,obj): + assert issubclass(type(obj),MooseBase) + + def add(self,obj,lookup_name): + # Type checking on object, raise an error if fails + self._check_type(obj) + + # Set the name of the object + obj.set_lookup_name(lookup_name) + + # Don't duplicate entries in collection + if lookup_name in self.objects.keys(): + msg="Collection already contains named block {}".format(block_name) + raise RuntimeError(msg) + + # Save + self.objects[lookup_name]=obj + + def discard(self,key): + self.objects.pop(key) + + def inner_to_str(self,print_default=False): + inner_str="" + for name, obj in self.objects.items(): + inner_str+=obj.to_str(print_default) + return inner_str diff --git a/catbird/factory.py b/catbird/factory.py new file mode 100644 index 0000000..9fb3adf --- /dev/null +++ b/catbird/factory.py @@ -0,0 +1,307 @@ +from copy import deepcopy +from .syntax import SyntaxRegistry, parse_block +from .utils import read_json, write_json, json_from_exec + +class Factory(): + """Class to contain constructors for MOOSE syntax objects""" + def __init__(self,exec_path,config_file=None): + print("Loading syntax from library...") + json_obj=json_from_exec(exec_path) + print("Done") + + print("Constructing syntax registry...") + self.registry=SyntaxRegistry(json_obj) + self.available_blocks=self.registry.get_available_blocks() + print("Done") + + print("Configuring objects to enable...") + self.set_defaults() + print("Done") + + if config_file is not None: + print("Loading configuration from file",config_file) + self.load_config(config_file) + print("Done") + + print("Loading enabled objects...") + self.load_enabled_objects(json_obj) + print("Done") + + + def _load_root_syntax(self,block_name, block): + """ + Retreive a tuple of abstract classes to mix to form our root syntax node. + """ + assert not block.is_leaf + self.root_syntax[block.longname]=block.get_mixins() + self.collection_syntax[block.longname]=block.get_collection_mixins() + + + def _load_leaf_syntax(self,block_name,block,json_obj): + """ + Retreive a class with attributes matching the available syntax for the block. + """ + assert block.is_leaf + + # Some details about the type of object + relation=block.relation_key + lookup_name=block.name + class_name=block.longname + parent_name=block.parent_longname + + # Convert string to SyntaxPath + syntax_path=self.registry.syntax_dict[block_name] + + # Fetch syntax for block and make a new object type + new_class=parse_block(json_obj,syntax_path,class_name) + + # Ensure dictionary initialised + if parent_name not in self.constructors.keys(): + self.constructors[parent_name]={} + if relation not in self.constructors[parent_name].keys(): + self.constructors[parent_name][relation]={} + + # Don't duplicate + if class_name in self.constructors[parent_name][relation].keys(): + raise RuntimeError("Duplicated class name {} in namespace {}.{}".format(class_name,parent_name,relation)) + + # Save class constructor + self.constructors[parent_name][relation][lookup_name]=new_class + + def load_enabled_objects(self,json_obj): + self.constructors={} + self.root_syntax={} + self.collection_syntax={} + + # Loop over enabled syntax blocks + for block_name, block in self.available_blocks.items(): + if not block.enabled: + continue + + if block.is_leaf: + self._load_leaf_syntax(block_name, block, json_obj) + else: + self._load_root_syntax(block_name, block) + + @staticmethod + def __get_init_method(mixins): + def __init__(self, *args, **kwargs): + for base in mixins: + # This mix-in does not have parameters at all + if not hasattr(base,"params_name"): + base.__init__(self, *args, **kwargs) + continue + + # Might not have enabled any parameters for this mix-in + if not hasattr(self,base.params_name): + continue + + # Dictionary of the attributes this base class should have + moose_param_dict_local=getattr(self,base.params_name) + + # Loop over and make into properties + duplicated_params=[] + for attr_name, moose_param in moose_param_dict_local.items(): + # Crucially, acts on the instance, not the class. + if hasattr(self,attr_name): + # MOOSE is stupid, sometimes parameters can come from more than one source. + # If this happens, warn and skip, then remove from list afterwards + duplicated_params.append(attr_name) + msg="Warning! Syntax collision for attribute {} in class {}. Skipping.".format(attr_name, self.__class__.__name__) + print(msg) + continue + setattr(self,attr_name,moose_param.val) + for attr_name in duplicated_params: + moose_param_dict_local.pop(attr_name) + + #Todo: apply kwargs here? + + return __init__ + + @staticmethod + def __get_inner_to_str_method(mixins): + # The returned _inner_to_str_ method should call each of the mix-in base class methods in turn and concatenate. + def inner_to_str(self,print_default=False): + inner_str="" + for base in mixins: + inner_str+=base.inner_to_str(self, print_default) + return inner_str + return inner_to_str + + @staticmethod + def __get_get_param_method(mixins): + def get_param(self,attr_name): + # Search each base in turn + param=None + for base in mixins: + # Skip mix-ins with no parameters + if not hasattr(base,"params_name"): + continue + try: + dict_now=getattr(self,base.params_name) + param=dict_now[attr_name] + break + except KeyError: + continue + + if param is None: + msg="Could not find MOOSE param for attribute {}".format(attr_name) + raise KeyError(msg) + return param + return get_param + + @staticmethod + def __get_docstring(mixins): + # Generate a docstring for new class from the docs of each mix-in + doc_now="" + for base in mixins: + if base.__doc__ is not None: + doc_now=doc_now+base.__doc__ + return doc_now + + def derive_class(self,root_name,obj_types,class_name,in_collection=False): + """ + Form a new mix-in class from a tuple of classes + + Parameters + ---------- + rootname : str + obj_types: dict + """ + # Get mixins boilerplate + if in_collection: + mixins=self.collection_syntax[root_name] + else: + mixins=self.root_syntax[root_name] + + # Update mixins list by comparing types + mixins_now=deepcopy(mixins) + for relation_type,derived_type in obj_types.items(): + if relation_type not in mixins.keys(): + raise RuntimeError("{} is not an available mix-in".format(relation_type)) + # Fetch derived class for mix-in + class_now=self.constructors[root_name][relation_type][derived_type] + + # Update + mixins_now[relation_type]=class_now + + # Finally, remove duplicates but preserve order + mixin_list=[] + for mixin_test in mixins_now.values(): + if mixin_test not in mixin_list: + mixin_list.append(mixin_test) + + # Convert to tuple + mixin_tuple=tuple(mixin_list) + + # Our fancy new mixin class + new_cls = type(class_name, mixin_tuple, + { + "__init__": self.__get_init_method(mixin_tuple), + "__doc__": self.__get_docstring(mixin_tuple), + "inner_to_str":self.__get_inner_to_str_method(mixin_tuple), + "get_param":self.__get_get_param_method(mixin_tuple), + }) + return new_cls + + def construct_root(self,root_name,obj_types,kwargs): + """ + Parameters + ---------- + rootname : str + obj_types: dict + kwargs: dict + """ + # Get class + obj_class=self.derive_class(root_name, obj_types,root_name) + obj=obj_class() + + # Handle keyword arguments + for key, value in kwargs.items(): + if not hasattr(obj,key): + msg="Object type {} does not have attribute {}".format(root_name,key) + raise RuntimeError() + setattr(obj, key, value) + + return obj + + def construct(self,root_name,obj_types,class_name,in_collection,**kwargs): + class_now=self.derive_class(root_name,obj_types,class_name,in_collection) + obj=class_now() + + # Handle keyword arguments + for key, value in kwargs.items(): + if not hasattr(obj,key): + msg="Object type {} does not have attribute {}".format(derived_type,key) + raise RuntimeError(msg) + setattr(obj, key, value) + return obj + + def enable_syntax(self,block_name,enable_dict=None): + """ + Configure what MOOSE syntax to enable. + + Objects with enabled syntax will be converted to Python classes. + """ + # Construct full name + syntax_name="blocks/"+block_name + + # Check syntax is known + if syntax_name not in self.available_blocks.keys(): + msg="Cannot enable unknown syntax {}".format(syntax_name) + raise RuntimeError(msg) + + syntax_to_enable=[syntax_name] + + while len(syntax_to_enable)>0: + # Get front of queue + syntax_name_now=syntax_to_enable.pop(0) + + # Enable top level block syntax + self.available_blocks[syntax_name_now].enabled=True + + # Get sub-block types + block_now=self.available_blocks[syntax_name_now] + + available_sub_syntax=self.registry.get_available_syntax(syntax_name_now) + if available_sub_syntax is not None: + for relation_shortname, syntax_list in available_sub_syntax.items(): + # If enable_dict is provided, only enable user-specified types + if enable_dict and relation_shortname not in enable_dict.keys(): + continue + + for syntax_item in syntax_list: + if enable_dict and syntax_item not in enable_dict[relation_shortname]: + continue + + # Get syntax lookup key and add to queue + new_syntax=block_now.path_to_child(relation_shortname,syntax_item) + syntax_to_enable.append(new_syntax) + + def write_config(self,filename,print_depth=3,verbose=False): + config_dict={} + for block_name, block in self.available_blocks.items(): + config_entry=block.to_dict(print_depth,verbose) + if config_entry is not None: + config_dict.update(config_entry) + write_json(config_dict,filename) + + def load_config(self,filename): + # Fetch enabled objects from filename + config_in=read_json(filename) + for block_name, block_dict in config_in.items(): + self.available_blocks[block_name].enabled=block_dict["enabled"] + + def set_defaults(self): + executioner_enable_dict={ + "obj_type": ["Steady","Transient"] + } + self.enable_syntax("Mesh") + self.enable_syntax("Executioner", executioner_enable_dict) + self.enable_syntax("Problem") + self.enable_syntax("Variables") + self.enable_syntax("Kernels") + self.enable_syntax("BCs") + self.enable_syntax("Materials") + self.enable_syntax("VectorPostprocessors") + self.enable_syntax("Outputs") diff --git a/catbird/legacy.py b/catbird/legacy.py new file mode 100644 index 0000000..b28ff8b --- /dev/null +++ b/catbird/legacy.py @@ -0,0 +1,332 @@ +from .obj import MooseObject +from .syntax import type_mapping, _convert_to_type +from .utils import json_from_exec, write_json + +def parse_blocks(json_obj): + """ + Returns the a dictionary of block types corresponding to the MOOSE application described + by the json file. + + Parameters + ---------- + json_obj : dict + Dictionary of full MOOSE object tree + + Returns + ------- + dict + Dictionary of available block types organised by category + """ + + # Get all top level categories of block + block_name_list = json_obj['blocks'].keys() + + #all_syntax=[] + parsed_blocks={} + + types_key='types' + wildcard_key='star' + nested_key='subblocks' + nested_block_key='subblock_types' + + for block_name in block_name_list: + block_dict_now = json_obj['blocks'][block_name] + if types_key in block_dict_now.keys(): + try : + # If dict + block_types_now = list(block_dict_now[types_key].keys()) + #fundamental_blocks[block_name]=block_types_now + parsed_blocks[block_name]=SyntaxBlock(block_name,"fundamental",block_types_now) + + #all_syntax.append(SyntaxBlock(block_name,"fundamental",block_types_now)) + except AttributeError : + # Otherwise + block_types_now = block_dict_now[types_key] + if block_types_now == None: + #systems.append(block_name) + parsed_blocks[block_name]=SyntaxBlock(block_name,"system",None) + #all_syntax.append(SyntaxBlock(block_name,"systems",None)) + continue + + #print(block_name," available types: ", block_types_now) + elif wildcard_key in block_dict_now.keys() and nested_block_key in block_dict_now[wildcard_key].keys(): + try: + types_now = list(block_dict_now[wildcard_key][nested_block_key].keys()) + #nested_blocks[block_name]=types_now + parsed_blocks[block_name]=SyntaxBlock(block_name,"nested",types_now) + #all_syntax.append(SyntaxBlock(block_name,"nested",types_now)) + + except AttributeError : + types_now = block_dict_now[wildcard_key][nested_block_key] + if types_now == None: + #nested_systems.append(block_name) + #all_syntax.append(SyntaxBlock(block_name,"nested_system",None)) + parsed_blocks[block_name]=SyntaxBlock(block_name,"nested_system",None) + continue + + elif nested_key in block_dict_now.keys(): + #nested_systems.append(block_name) + #all_syntax.append(SyntaxBlock(block_name,"nested_system",None)) + parsed_blocks[block_name]=SyntaxBlock(block_name,"nested_system",None) + + else: + print(block_name," has keys: ",block_dict_now.keys()) + raise RuntimeError("unhandled block category") + + + # parsed_block_list={} + # parsed_block_list["Systems"]=systems + # parsed_block_list["Nested systems"]=nested_systems + # parsed_block_list["Fundamental blocks"]=fundamental_blocks + # parsed_block_list["Nested blocks"]=nested_blocks + + #return parsed_block_list + return parsed_blocks + +def get_block_types(json_obj,block_name): + block_types=None + syntax_type="" + + blocks_dict=json_obj['blocks'] + + if block_name not in blocks_dict.keys(): + msg="Unknown block name {}".format(block_name) + raise RuntimeError(msg) + + current_block_dict=blocks_dict[block_name] + + syntax_type_to_block_types={ + "fundamental":{}, + "system":{}, + "nested":{}, + "nested_system":{}, + "action":{}, + "double_nested":{}, + } + + # 6 cases, but not limited to single type at once + # TODO this is awful... refactor + # Suggest recursing down until found a "parameter" key + if 'types' in current_block_dict.keys() and current_block_dict['types'] is not None: + block_types=current_block_dict['types'] + syntax_type_to_block_types["fundamental"].update(block_types) + + if 'star' in current_block_dict.keys() and current_block_dict['star'] is not None: + if 'subblock_types' in current_block_dict['star'].keys(): + block_types=current_block_dict['star']['subblock_types'] + if block_types is not None: + syntax_type_to_block_types["nested"].update(block_types) + + if 'actions' in current_block_dict['star'].keys(): + block_types=current_block_dict['star']['actions'] + if block_types is not None: + syntax_type_to_block_types["nested_action"].update(block_types) + + if 'subblocks' in current_block_dict.keys() and current_block_dict['subblocks'] is not None: + + system_type_dict={} + nested_type_dict={} + double_nested_type_dict={} + + for subblock_name in current_block_dict['subblocks'].keys(): + subblock_dict=current_block_dict['subblocks'][subblock_name] + + if 'types' in subblock_dict.keys() and subblock_dict['types'] is not None: + block_types=subblock_dict['types'] + system_type_dict[subblock_name]=block_types + + if 'star' in subblock_dict.keys() and subblock_dict['star'] is not None: + if 'subblock_types' in subblock_dict['star'].keys(): + block_types=subblock_dict['star']['subblock_types'] + if block_types is not None: + nested_type_dict[subblock_name]=block_types + + if 'subblocks' in subblock_dict.keys() and subblock_dict['subblocks'] is not None: + + double_nested_type_dict[subblock_name]={} + + for subsubblock_name in subblock_dict['subblocks'].keys(): + subsubblock_dict=subblock_dict['subblocks'][subsubblock_name] + + if 'actions' in subsubblock_dict.keys() and subsubblock_dict['actions'] is not None: + double_nested_type_dict[subblock_name][subsubblock_name]=subsubblock_dict['actions'] + + if 'star' in subsubblock_dict.keys() and subsubblock_dict['star'] is not None: + if 'actions' in subsubblock_dict['star'].keys() and subsubblock_dict['star']['actions'] is not None: + double_nested_type_dict[subblock_name][subsubblock_name]=subsubblock_dict['star']['actions'] + + + if len(system_type_dict) >0: + syntax_type_to_block_types["system"].update(system_type_dict) + if len(nested_type_dict) >0: + syntax_type_to_block_types["nested_system"].update(nested_type_dict) + if len(double_nested_type_dict) >0: + syntax_type_to_block_types["double_nested"].update(double_nested_type_dict) + + + if 'actions' in current_block_dict.keys() and current_block_dict['actions'] is not None: + block_types=current_block_dict['actions'] + syntax_type_to_block_types["action"].update(block_types) + + + count_types=0 + for syntax_type in syntax_type_to_block_types.keys(): + block_types=syntax_type_to_block_types[syntax_type] + if len(block_types) > 0: + count_types+=1 + + if count_types == 0: + msg="Block {} is undocumented".format(block_name) + print(msg) + #raise RuntimeError(msg) + #block_types=None + #syntax_type="Unknown" + + elif count_types > 1: + msg="Block {} is has {} types".format(block_name,count_types) + print(msg) + + #return block_types, syntax_type + return syntax_type_to_block_types + + +def parse_blocks_types(json_obj,category,category_names=None): + """ + Make python objects out of MOOSE syntax for a fundamental category of block + (E.g. Executioner, Problem) + + Parameters + ---------- + json_obj : dict + A dictionary of all MOOSE objects + + category: str + A string naming the category of fundamental MOOSE block + + category_names: list(str) + Optional field. If provided, only return objects for specified types. + + Returns + ------- + dict + A dictionary of pythonised MOOSE objects of the given category. + """ + + requested_blocks,syntax_type = get_block_types(json_obj,category) + + instances_out = dict() + + for block_type, block_attributes in requested_blocks.items(): + # skip any blocks that we aren't looking for + if category_names is not None and block_type not in category_names: + continue + + # Todo add auto-documntations + #dict_keys(['description', 'file_info', 'label', 'moose_base', 'parameters', 'parent_syntax', 'register_file', 'syntax_path']) + + params = block_attributes['parameters'] + + # Create new subclass of MooseObject with a name that matches the block_type + new_cls = type(block_type, (MooseObject,), dict()) + + # Set the block title + new_cls.set_syntax_type(syntax_type) + + if syntax_type != "nested": + new_cls.syntax_block_name=category + + # loop over the block_type parameters + for param_name, param_info in params.items(): + # determine the type of the parameter + attr_types = tuple(type_mapping[t] for t in param_info['basic_type'].split(':')) + attr_type = attr_types[-1] + + if len(attr_types) > 1: + for t in attr_types[:-1]: + assert issubclass(t, Iterable) + ndim = len(attr_types) - 1 + else: + ndim = 0 + + # set allowed values if present + allowed_values = None + if param_info['options']: + values = param_info['options'].split() + allowed_values = [_convert_to_type(attr_type, v) for v in values] + + # apply the default value if provided + # TODO: default values need to be handled differently. They are replacing + # properties in the type definition as they are now + default = None + if 'default' in param_info.keys() and param_info['default'] != None: + default = _convert_to_type(attr_type, param_info['default']) + # # only supporting defaults for one dimensional dim types + # vals = [_convert_to_type(attr_type, v) for v in param_info['default'].split()] + # if ndim == 0: + # default = vals[0] + # else: + # default = np.array(vals) + + # add an attribute to the class instance for this parameter + new_cls.newattr(param_name, + attr_type, + description=param_info.get('description'), + default=default, + dim=ndim, + allowed_vals=allowed_values) + + # insert new instance into the output dictionary + instances_out[block_type] = new_cls + + return instances_out + +def parse_problems(json_obj, problem_names=None): + return parse_blocks_types(json_obj,'Problem',category_names=problem_names) + +def problems_from_json(json_obj, problem_names=None): + """ + Returns the Python objects corresponding to the MOOSE application described + by the json file. + + Parameters + ---------- + json_obj : dict + Pre-parsed json object containing MOOSE syntax + problems : Iterable of str + Set of problems to generate classes for + + Returns + ------- + dict + A dictionary of problem objects + """ + + assert isinstance(json_obj, dict) + + out = dict() + out['problems'] = parse_problems(json_obj, problem_names=problem_names) + return out + +def problem_from_exec(exec, problem_names=None): + """ + Returns the Python objects corresponding to the MOOSE + application described by the json file. + + Parameters + ---------- + problems : Iterable of str + Set of problems to generate classes for + + Returns + ------- + dict + A dictionary of problem objects + """ + j_obj = json_from_exec(exec) + + return problems_from_json(j_obj, problem_names=problem_names) + +def export_all_blocks_from_exec(exec,name): + j_obj = json_from_exec(exec) + block_dict=parse_blocks(j_obj) + write_json(block_dict,name) diff --git a/catbird/model.py b/catbird/model.py new file mode 100644 index 0000000..12c5595 --- /dev/null +++ b/catbird/model.py @@ -0,0 +1,171 @@ +from copy import deepcopy +from .collection import MooseCollection +from .factory import Factory +from .syntax import get_relation_kwargs + +# TODO should this also be a MooseCollection? +class MooseModel(): + """Class to represent a MOOSE model""" + def __init__(self,factory_in): + assert isinstance(factory_in,Factory) + self.factory=factory_in + self.moose_objects=[] + + # Add attributes to this model with default assignments + self.load_default_syntax() + + # Envisage this being overridden downstream. + def load_default_syntax(self): + self.add_syntax("Executioner", obj_type="Steady") + #self.add_syntax("Executioner.Predictor",obj_type="AdamsPredictor") + self.add_syntax("Problem", obj_type="FEProblem") + self.add_syntax("Mesh", + obj_type="GeneratedMesh", + action="CreateDisplacedProblemAction") + self.add_syntax("Variables") + self.add_syntax("Kernels") + self.add_syntax("Materials") + self.add_syntax("BCs") + self.add_syntax("Outputs") + + + def add_syntax(self,syntax_name,**kwargs_in): + """ + Add an object corresponding to MOOSE syntax + """ + # First, pop out any relation key-word args + obj_types={} + relations=get_relation_kwargs() + kwargs=deepcopy(kwargs_in) + for keyword in kwargs_in.keys(): + if keyword in relations: + obj_type = kwargs.pop(keyword) + obj_types[keyword]=obj_type + + # Construct the object + obj=self.factory.construct_root(syntax_name,obj_types,kwargs) + + # Add to model + self._add_to_model(obj) + + def _add_to_model(self,obj): + """Add object to the model as an attribute""" + # Obtain sequence of objects + obj_classname=obj.__class__.__name__ + obj_path=obj_classname.split(sep=".") + + # Attribute name (non-capitalised) + obj_name=obj_path.pop(-1) + attr_name=obj_name.lower() + + # Recurse through parent attribute objects + parent_obj=self + while len(obj_path)>0: + new_parent_name=obj_path.pop(-1) + new_parent_name=new_parent_name.lower() + if not hasattr(parent_obj,new_parent_name): + msg="Cannot construct {}:\n".format(syntax_name) + msg=msg+"Class {} has no attribute {}".format(parent_obj.__class__.__name__,new_parent_name) + raise RuntimeError(msg) + new_parent_obj=getattr(parent_obj,new_parent_name) + parent_obj=new_parent_obj + + # Avoid overwriting + if hasattr(parent_obj,attr_name): + msg="Class {} already has attribute {}".format(parent_obj.__class__.__name__,attr_name) + raise RuntimeError(msg) + + # Add as attribute + setattr(parent_obj,attr_name,obj) + + # If the parent is a collection, add there for book-keeping purposes + # N.B. this is to support subblock syntax + if isinstance(parent_obj,MooseCollection): + parent_obj.add(obj,attr_name) + + # Book-keeping + if parent_obj == self: + self.moose_objects.append(attr_name) + + def add_to_collection(self, collection_name, class_name, object_name,**kwargs_in): + # First, pop out any relation key-word args + obj_types={} + relations=get_relation_kwargs() + kwargs=deepcopy(kwargs_in) + for keyword in kwargs_in.keys(): + if keyword in relations: + obj_type_value = kwargs.pop(keyword) + obj_types[keyword]=obj_type_value + + + # One basic type is mandatory + if len(obj_types) == 0: + msg="Must specify a relation type" + raise RuntimeError(msg) + + long_class_name=collection_name+"."+class_name + obj=self.factory.construct(collection_name,obj_types,long_class_name,in_collection=True,**kwargs) + + # Fetch collection and add + collection = getattr(self, collection_name.lower()) + collection.add(obj,object_name) + + # Some short-hands for common operations + def add_variable(self,variable_name,variable_type="MooseVariable",**kwargs_in): + collection_kwargs=deepcopy(kwargs_in) + collection_kwargs["collection_type"]=variable_type + collection_kwargs["collection_action"]="AddVariableAction" + + self.add_to_collection("Variables","Variable",variable_name,**collection_kwargs) + + def add_kernel(self,kernel_name,kernel_type,**kwargs_in): + collection_kwargs=deepcopy(kwargs_in) + collection_kwargs["collection_type"]=kernel_type + self.add_to_collection("Kernels","Kernel",kernel_name,**collection_kwargs) + + def add_bc(self,bc_name,bc_type,**kwargs_in): + collection_kwargs=deepcopy(kwargs_in) + collection_kwargs["collection_type"]=bc_type + self.add_to_collection("BCs","BC", bc_name,**collection_kwargs) + + def add_material(self,mat_name,mat_type,**kwargs_in): + collection_kwargs=deepcopy(kwargs_in) + collection_kwargs["collection_type"]=mat_type + self.add_to_collection("Materials","Material",mat_name,**collection_kwargs) + + def add_output(self,output_name,output_type,**kwargs_in): + collection_kwargs=deepcopy(kwargs_in) + collection_kwargs["collection_type"]=output_type + self.add_to_collection("Outputs","Output",output_name,**collection_kwargs) + + def add_mesh_generator(self,mesh_generator_name,mesh_generator_type,**kwargs_in): + collection_kwargs=deepcopy(kwargs_in) + collection_kwargs["collection_type"]=mesh_generator_type + self.add_to_collection("Mesh","MeshGenerator",mesh_generator_name,**collection_kwargs) + + def to_str(self,print_default=False): + model_str="" + for obj_type in self.moose_objects: + obj=getattr(self,obj_type) + model_str+=obj.to_str(print_default) + return model_str + + def write(self, filename, print_default=False): + file_handle = open(filename,'w') + file_handle.write(self.to_str(print_default)) + file_handle.close() + print("Wrote to ",filename) + +class TransientModel(MooseModel): + """Class to represent a MOOSE model""" + def __init__(self,factory_in): + super().__init__(factory_in) + + def load_default_syntax(self): + self.add_syntax("Executioner", obj_type="Transient") + self.add_syntax("Mesh") + self.add_syntax("Variables") + self.add_syntax("Kernels") + self.add_syntax("BCs") + self.add_syntax("Materials") + self.add_syntax("Outputs", action="CommonOutputAction") diff --git a/catbird/obj.py b/catbird/obj.py new file mode 100644 index 0000000..2404a0a --- /dev/null +++ b/catbird/obj.py @@ -0,0 +1,41 @@ +from copy import deepcopy +from .base import MooseBase + +class MooseObject(MooseBase): + params_name="_moose_params" + class_alias="Object" + + def __init__(self): + if hasattr(self,"_moose_params"): + # Dictionary of the attributes this class should have + moose_param_dict_local=getattr(self,"_moose_params") + + # Loop over and make into attributes + for attr_name, moose_param in moose_param_dict_local.items(): + # Crucially, acts on the instance, not the class. + setattr(self,attr_name,moose_param.val) + + + @property + def moose_object_params(self): + """ + Return a unified list of all the parameters we've added. + """ + moose_param_list_local=[] + if hasattr(self,"_moose_params"): + moose_param_dict_local=getattr(self,"_moose_params") + moose_param_list_local=list(moose_param_dict_local.keys()) + return moose_param_list_local + + def inner_to_str(self,print_default=False): + inner_str="" + param_list_local=deepcopy(self.moose_object_params) + + # Formatting convention, start with type + if "type" in param_list_local: + param_list_local.remove("type") + inner_str+=self.attr_to_str("type",True) + + for attr_name in param_list_local: + inner_str+=self.attr_to_str(attr_name,print_default) + return inner_str diff --git a/catbird/param.py b/catbird/param.py new file mode 100644 index 0000000..11dbdd6 --- /dev/null +++ b/catbird/param.py @@ -0,0 +1,42 @@ +class MooseParam(): + """ + Class to contain all information about a MOOSE parameter + """ + def __init__(self,attr_name, attr_type, is_array, default=None, allowed_vals=None, description=None): + + assert attr_type is not type(None) + + self.attr_type=attr_type + self.allowed_vals=allowed_vals + self.is_array=is_array + + # Set name + if not isinstance(attr_name, str): + raise ValueError('Attribute names must be strings') + self.name=attr_name + + # Set default value + if default is not None: + self.default=default + elif self.is_array and attr_type!=str: + self.default=[attr_type()] + else: + self.default=attr_type() + + # Initialise current value to the default + self.val=self.default + + # Set docstring + doc_str = '\n' + doc_str += attr_name+' : ' + doc_str += f'{attr_type.__name__}\n' + if description is not None and description != "": + doc_str += " " + doc_str += description + doc_str += "\n" + if allowed_vals is not None: + doc_str += f' Allowed values: {allowed_vals}\n' + if default is not None: + doc_str += f' Default value: {default}\n' + + self.doc=doc_str diff --git a/catbird/string.py b/catbird/string.py new file mode 100644 index 0000000..796b120 --- /dev/null +++ b/catbird/string.py @@ -0,0 +1,74 @@ +class MooseString(): + """Mixin to assist printing""" + def set_lookup_name(self,name_in): + self._lookup_name=name_in + + @property + def lookup_name(self): + """Optional property for externally set name""" + if hasattr(self,"_lookup_name"): + return self._lookup_name + else: + return None + + @property + def print_name(self): + """ + Return name for printing purposes + """ + if self.lookup_name: + return self.lookup_name + + class_name=self.__class__.__name__ + class_path=class_name.split(sep=".") + print_name=class_path[-1] + return print_name + + @property + def indent_level(self): + """ + Return level of indent for printing purposes + """ + class_name=self.__class__.__name__ + class_path=class_name.split(sep=".") + indent_level=len(class_path) + return indent_level + + @property + def indent(self): + """ + Inner indent string + """ + indent_str="" + for i_level in range(self.indent_level): + # Use two space indent + indent_str=indent_str+" " + return indent_str + + @property + def prepend_indent(self): + """ + Outer indent string + """ + indent_str="" + for i_level in range(self.indent_level-1): + # Use two space indent + indent_str=indent_str+" " + return indent_str + + def to_str(self,print_default=False): + """ + Return syntax as a string + """ + syntax_str='{}[{}]\n'.format(self.prepend_indent,self.print_name) + syntax_str+=self.inner_to_str(print_default) + syntax_str+='{}[]\n'.format(self.prepend_indent) + return syntax_str + + def inner_to_str(self,print_default=False): + # Override me + pass + + def moose_doc(self): + # Override me + return "" diff --git a/catbird/syntax.py b/catbird/syntax.py new file mode 100644 index 0000000..0e1667d --- /dev/null +++ b/catbird/syntax.py @@ -0,0 +1,517 @@ +"""Classes and functions to parse MOOSE syntax""" +from collections.abc import Iterable +from copy import deepcopy +from .obj import MooseObject +from .action import MooseAction +from .param import MooseParam +from .collection import * + +type_mapping = {'Integer' : int, + 'Boolean' : bool, + 'Float' : float, + 'Real' : float, + 'String' : str, + 'Array' : list} + +_relation_syntax=["blocks","subblocks","actions","star","types","subblock_types"] +_relation_shorthands={ + "types/":"obj_type", + "actions/": "action", + "subblocks/":"system", + "star/subblock_types/":"collection_type", + "star/actions/":"collection_action", + "star/subblocks/":"nested_system", + "star/star/actions/":"nested_collection_action", + "star/star/subblock_types/":"nested_collection_type", +} + +_mixin_map={ + "obj_type": MooseObject, + "action": MooseAction, + "system": MooseCollection, + "collection_type" : MooseCollection, + "collection_action": MooseCollection, + "nested_system": None, # The attribute should be added one layer down + "nested_collection_action": None, # Don't support this syntax yet + "nested_collection_type": None, # Don't support this syntax yet +} + +_child_type_map={ + "obj_type": MooseObject, + "action": MooseAction, + "system": None, + "collection_type" : MooseObject, + "collection_action": MooseAction, + "nested_system": None, # Don't support this syntax yet + "nested_collection_action": None, # Don't support this syntax yet + "nested_collection_type": None, # Don't support this syntax yet +} + +_collection_type_map={ + "collection_type" : MooseObject, + "collection_action": MooseAction, +} + + +def get_relation_kwargs(): + return _relation_shorthands.values() + +class SyntaxPath(): + """ + A helper class to store a path of keys from a nested dictionary structure. + """ + def __init__(self, syntax_path_in): + # Initial values + self.name="" + self.unique_key="" + self.has_params=False + self.is_root=False + self.parent_path=None + self.parent_relation=None + self.child_paths={} + self.path=deepcopy(syntax_path_in) + + + syntax_path=deepcopy(syntax_path_in) + + # Type assertions + assert isinstance(syntax_path,list) + assert len(syntax_path)>1 + for key in syntax_path: + assert isinstance(key,str) + + # Check for parameters + pos_now=len(syntax_path)-1 + key_now = syntax_path.pop(pos_now) + if key_now == "parameters": + self.has_params=True + self.path.pop(pos_now) + pos_now=pos_now-1 + key_now = syntax_path.pop(pos_now) + + # Set object name + self.name=key_now + self.unique_key=self._get_lookup_key(syntax_path,key_now) + + if len(syntax_path) > 1 : + relation_path=[] + found_parent=False + while not found_parent and len(syntax_path) > 0: + pos_now=len(syntax_path)-1 + test_key=syntax_path.pop(pos_now) + if test_key in _relation_syntax: + relation_path.insert(0,test_key) + else: + found_parent=True + syntax_path.append(test_key) + + if not found_parent: + raise RuntimeError("Should not get here") + + if relation_path: + _parent_relation=self._key_from_list(relation_path) + if _parent_relation not in _relation_shorthands.keys(): + raise RuntimeError("unknown relation type: {}",format(_parent_relation)) + self.parent_relation=_parent_relation + + self.parent_path=syntax_path + + else: + self.is_root=True + + + def _key_from_list(self,path_in): + path_str="" + for key in path_in: + path_str+=key + path_str+="/" + return path_str + + def _get_lookup_key(self,path_in,name_in): + lookup_path=self._key_from_list(path_in) + lookup_path+=name_in + return lookup_path + + def add_child(self, child_syntax): + assert isinstance(child_syntax,SyntaxPath) + + # Save mapping by relation type + relation_key=_relation_shorthands[child_syntax.parent_relation] + if relation_key not in self.child_paths.keys(): + self.child_paths[relation_key]=[] + self.child_paths[relation_key].append(child_syntax.unique_key) + + def has_child_type(self,relation): + return relation in self.child_paths.keys() and self.child_paths[relation] != None + + @property + def parent_key(self): + if not self.is_root: + parent_path_now=deepcopy(self.parent_path) + parent_len=len(parent_path_now) + parent_name=parent_path_now.pop(parent_len-1) + return self._get_lookup_key(parent_path_now,parent_name) + else: + return None + + @property + def relation_to_parent(self): + if self.parent_relation: + relation=_relation_shorthands[self.parent_relation] + return relation + else: + return None + +class SyntaxBlock(): + """ + A class to represent one block of MOOSE syntax + """ + def __init__(self): + self.name="" + self.path="" + self.has_params=False + self.enabled=False + self.available_syntax={} + self.relation_key=None + self.parent_blocks=[] + self.depth=0 + + def to_dict(self, print_depth=3, verbose=False): + config_entry=None + if ( self.enabled and self.depth < print_depth ) or verbose: + block_dict={ + "name": self.name, + "enabled": self.enabled, + "params": self.has_params, + } + if verbose: + if self.available_syntax: + block_dict["available syntax"] = self.available_syntax + # if self.parent_blocks: + # block_dict["parents"] = self.parent_blocks + + config_entry= { self.path : block_dict } + + return config_entry + + @property + def is_leaf(self): + return self.available_syntax==None + + @property + def is_root(self): + return self.depth==0 + + def path_to_child(self,relation_shortname,child_name): + # Invert dictionary + found=False + relation_type=None + for test_relation_type,test_shortname in _relation_shorthands.items(): + if relation_shortname == test_shortname: + found=True + relation_type=test_relation_type + + if not found: + msg="No known relation syntax maps onto shortname {}".format(relation_shortname) + raise RuntimeError(msg) + path=self.path+"/"+relation_type+child_name + return path + + def get_mixins(self): + mixin_dict={} + for relation_type in self.available_syntax.keys(): + mixin_now=_mixin_map[relation_type] + if mixin_now is not None: + mixin_dict[relation_type]=mixin_now + return mixin_dict + + def get_collection_mixins(self): + mixin_dict={} + for relation_type in self.available_syntax.keys(): + if relation_type in _collection_type_map.keys(): + mixin_now=_collection_type_map[relation_type] + if mixin_now is not None: + mixin_dict[relation_type]=mixin_now + return mixin_dict + + @property + def parent_longname(self): + _longname="" + for parent_name in self.parent_blocks: + _longname=_longname+parent_name+"." + _longname=_longname.rstrip(".") + return _longname + + @property + def longname(self): + _longname="" + for parent_name in self.parent_blocks: + _longname=_longname+parent_name+"." + _longname=_longname+self.name + return _longname + + # def __init__(self, _name, _syntax_type, _known_types): + # self.name=_name + # self.syntax_type=_syntax_type + # self.enabled=False + # self.enabled_types={} + # if _known_types is not None: + # for known_type_name in _known_types: + # self.enabled_types[known_type_name]=False + + # # Store what the default type should be + # self.default_type=None + + # def to_dict(self): + # syntax_dict={ + # "name": self.name, + # "syntax_type": self.syntax_type, + # "enabled": self.enabled, + # "enabled_types": self.enabled_types, + # } + # return syntax_dict + + # @property + # def enabled_subblocks(self): + # if self.enabled_types is not None: + # enabled_type_list=[ type_name for type_name, enabled in self.enabled_types.items() if enabled ] + # else: + # enabled_type_list=None + # return enabled_type_list + + +class SyntaxRegistry(): + """ + A class to store MOOSE syntax extracted from a highly nested dictionary in a flattened format. + + + Entries in the registry are stored in a dictionary with a unique key that maps onto a SyntaxPath object. A unique SyntaxPath may be parsed to identify relationships with child / parent paths to produce a SyntaxBlock that represents a block of syntax in a MOOSE input. + """ + def __init__(self, all_json): + # Flatten highly nested json dict + syntax_paths=key_search_recurse(all_json,[],"parameters",20) + assert isinstance(syntax_paths,list) + + self.syntax_dict={} + for path_now in syntax_paths: + self._recurse_path(path_now) + + def _recurse_path(self, path_in, children=None): + syntax=SyntaxPath(path_in) + self._add_syntax(syntax) + + # Add / update parents + if syntax.parent_key is not None: + if syntax.parent_key not in self.syntax_dict: + self._recurse_path(syntax.parent_path) + # Add current node to parent + self.syntax_dict[syntax.parent_key].add_child(syntax) + + def _add_syntax(self, syntax): + assert isinstance(syntax,SyntaxPath) + assert syntax.unique_key not in self.syntax_dict.keys() + self.syntax_dict[syntax.unique_key]=syntax + + def get_children_of_type(self,syntax_key,relation_type): + children=[] + parent=self.syntax_dict[syntax_key] + if parent.has_child_type(relation_type): + for child_path in parent.child_paths[relation_type]: + child=self.syntax_dict[child_path] + children.append(child.name) + return children + + def get_available_syntax(self,syntax_key): + available={} + for relation,shortname in _relation_shorthands.items(): + syntax_list=self.get_children_of_type(syntax_key,shortname) + if len(syntax_list)>0: + available[shortname]=syntax_list + if len(available.keys()) == 0: + available=None + return available + + def make_block(self,syntax_key): + syntax=self.syntax_dict[syntax_key] + + block=SyntaxBlock() + block.name=syntax.name + block.path=syntax_key + block.has_params=syntax.has_params + block.available_syntax=self.get_available_syntax(syntax_key) + block.relation_key=syntax.relation_to_parent + + # Set block depth and parents + if not syntax.is_root: + # Recurse until we find root + syntax_now=syntax + depth=0 + while not syntax_now.is_root: + depth=depth+1 + parent_key=syntax_now.parent_key + syntax_now=self.syntax_dict[parent_key] + parent_name=syntax_now.name + block.parent_blocks.insert(0,parent_name) + block.depth=depth + + return block + + def get_available_blocks(self): + available={} + for unique_key in self.syntax_dict.keys(): + available[unique_key]=self.make_block(unique_key) + return available + + # def get_available_blocks_sorted_by_depth(self): + # available_by_depth={} + # for unique_key in self.syntax_dict.keys(): + # block_now=self.make_block(unique_key) + # depth_now=block_now.depth + # if depth_now not in available_by_depth.keys(): + # available_by_depth[depth_now]={} + # available_by_depth[depth_now][unique_key]=block_now + # return available_by_depth + + # def blocks_by_depth(self, request_depth): + # return [ unique_key for unique_key, syntax in self.syntax_dict.items() if syntax.depth == request_depth ] + + # @property + # def root_keys(self): + # return [ unique_key for unique_key, syntax in self.syntax_dict.items() if syntax.is_root ] + + +def key_search_recurse(dict_in, test_path, key_test, level_stop=15): + """ + Parse blocks recursively until we hit the given key + """ + if not isinstance(dict_in,dict): + return list() + + if len(test_path) == level_stop: + return list() + + if key_test in dict_in.keys(): + # Success at leaf node! Found key, return path to here + success_path=deepcopy(test_path) + success_path.append(key_test) + return [ success_path ] + + success_paths=[] + for key_now, test_obj in dict_in.items(): + # Path to be tested + path_now=deepcopy(test_path) + path_now.append(key_now) + + paths_to_success=key_search_recurse(test_obj,path_now,key_test,level_stop) + + # If search fails, paths will be empty + # Otherwise add to our known list of success paths from this node + if len( paths_to_success ) != 0: + success_paths.extend(paths_to_success) + + return success_paths + +def fetch_syntax(json_dict,syntax): + assert isinstance(syntax,SyntaxPath) + assert syntax.has_params + + key_list=deepcopy(syntax.path) + assert len(key_list) > 0 + + dict_now=json_dict + while len(key_list) > 0: + key_now=key_list.pop(0) + obj_now=dict_now[key_now] + + assert isinstance(obj_now,dict) + dict_now=deepcopy(obj_now) + + return dict_now + +def parse_block(json_obj,syntax_path,class_name): + # Construct Moose parameters + moose_param_list=get_params_list(json_obj,syntax_path) + + # Deduce type of object by its relation to parent + relation=_relation_shorthands[syntax_path.parent_relation] + class_type=_child_type_map[relation] + + # Generate class documentation + doc_now=class_type.moose_doc(moose_param_list) + + # Create new class with a name that matches the block + new_cls = type(class_name, + (class_type,), + { + "__doc__": doc_now, + }) + + # Add the parameters to the class + for moose_param in moose_param_list: + new_cls.add_moose_param(moose_param) + + # Return our new class + return new_cls + +def get_params_list(json_obj,syntax_path): + # Available syntax for this block as dict + block=fetch_syntax(json_obj,syntax_path) + + # Lift parameters dictionary + params=block["parameters"] + moose_param_list=[] + + for param_name, param_info in params.items(): + attr_types = tuple(type_mapping[t] for t in param_info['basic_type'].split(':')) + attr_type = attr_types[-1] + + is_array=False + if len(attr_types) > 1: + for t in attr_types[:-1]: + assert issubclass(t, Iterable) + is_array=True + + # Set allowed values if present + allowed_values = None + if param_info['options']: + values = param_info['options'].split() + allowed_values = [_convert_to_type(attr_type, v) for v in values] + + # Apply the default value if provided + # TODO: default values need to be handled differently. They are replacing + # properties in the type definition as they are now + default = None + if 'default' in param_info.keys() and param_info['default'] != None and param_info['default'] != '': + if is_array: + defaults = param_info['default'] + if type(defaults) == str: + default = [_convert_to_type(attr_type, v) for v in defaults.split()] + elif issubclass(type(defaults), Iterable): + default = [_convert_to_type(attr_type, v) for v in defaults] + else: + default = [defaults] + else: + default = _convert_to_type(attr_type, param_info['default']) + + # Create and add a MOOSE parameter + moose_param=MooseParam(param_name, + attr_type, + is_array, + default=default, + allowed_vals=allowed_values, + description=param_info.get('description'), + ) + + moose_param_list.append(moose_param) + + # Return list + return moose_param_list + + +# convenience function for converting types +def _convert_to_type(t, val): + if t == bool: + val = bool(int(val)) + else: + val = t(val) + return val diff --git a/catbird/utils.py b/catbird/utils.py new file mode 100644 index 0000000..d26c54d --- /dev/null +++ b/catbird/utils.py @@ -0,0 +1,71 @@ +"""I/O utilities""" +import json +import subprocess + +def json_from_exec(exec): + """ + Returns the Python objects corresponding to the MOOSE application described + by the json file. + + Parameters + ---------- + json_file : str, or Path + Either an open file handle, or a path to the json file. If `json` is a + dict, it is assumed this is a pre-parsed json object. + + Returns + ------- + dict + A dictionary of all MOOSE objects + """ + json_proc = subprocess.Popen([exec, '--json'], stdout=subprocess.PIPE) + json_str = '' + + # filter out the header and footer from the json data + while json_proc.stdout.readline().decode().find('**START JSON DATA**') < 0: + pass + + while True: + line = json_proc.stdout.readline().decode() + if not line or line.find('**END JSON DATA**') >= 0: + break + json_str += line + + j_obj = json.loads(json_str) + + return j_obj + +def write_json(json_dict_out,name): + """ + Write a dictionary in JSON format + + Parameters + ---------- + json_dict_out : dict + name: str + Save as name.json + """ + json_output = json.dumps(json_dict_out, indent=4) + json_name=name + if json_name.find(".json") < 0 : + json_name = name+".json" + + with open(json_name, "w") as fh: + fh.write(json_output) + fh.write("\n") + print("Wrote to ",json_name) + + +def read_json(json_file): + """ + Load the contents of a JSON file into a dict. + + Parameters + ---------- + json_file: str + Name of JSON file + """ + json_dict = {} + with open(json_file) as handle: + json_dict = json.load(handle) + return json_dict diff --git a/examples/thermal.py b/examples/thermal.py new file mode 100644 index 0000000..1734413 --- /dev/null +++ b/examples/thermal.py @@ -0,0 +1,102 @@ +from catbird import * +import os +import subprocess +import sys + +def main(): + # Get path to MOOSE + moose_path=os.environ['MOOSE_DIR'] + + # Path to executable and inputs + module_name="heat_transfer" + app_name=module_name+"-opt" + app_path=os.path.join(moose_path,"modules",module_name) + app_exe=os.path.join(app_path,app_name) + + # Create a factory of available objects from our MOOSE executable + factory=Factory(app_exe) + + config_name="config_heat_conduction.json" + factory.write_config(config_name) + + # Create a boiler plate MOOSE model from a template + model=TransientModel(factory) + + # Set executioner attributes + model.executioner.end_time=5 + model.executioner.dt=1 # Default + + # Add a mesh generator + model.add_mesh_generator("generated","GeneratedMeshGenerator", + dim=2, + nx=10, + ny=10, + xmax=2, + ymax=1) + + # Add variables + var_name="T" + model.add_variable(var_name, initial_condition=300.0) + + # Add kernels + model.add_kernel("heat_conduction", kernel_type="HeatConduction", variable=var_name) + model.add_kernel("time_derivative", kernel_type="HeatConductionTimeDerivative", variable=var_name) + + # Add boundary conditions + model.add_bc("t_left", + bc_type="DirichletBC", + variable=var_name, + value = 300, + boundary='left') + + model.add_bc("t_right", + bc_type="FunctionDirichletBC", + variable=var_name, + function = "'300+5*t'", + boundary = 'right') + + # Add materials + model.add_material("thermal", + mat_type="HeatConductionMaterial", + thermal_conductivity=45.0, + specific_heat=0.5) + + model.add_material("density", + mat_type="GenericConstantMaterial", + prop_names='density', + prop_values=8000.0) + + model.outputs.exodus=True + model.add_output("csv",output_type="CSV", + file_base='thermal_out', + execute_on='final') + + # Add some input syntax that wasn't in the vanilla boilerplate model + model.add_syntax("VectorPostprocessors") + model.add_to_collection("VectorPostprocessors", + "VectorPostprocessor", + "t_sampler", + collection_type="LineValueSampler", + variable=var_name, + start_point='0 0.5 0', + end_point='2 0.5 0', + num_points=20, + sort_by='x') + + # Write out our input file + input_name="thermal.i" + model.write(input_name) + + # Run + args=[app_exe,'-i',input_name] + moose_process=subprocess.Popen(args) + stream_data=moose_process.communicate()[0] + retcode=moose_process.returncode + + # Return moose return code + sys.exit(retcode) + +if __name__ == "__main__": + main() + +