Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hed/errors/error_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@ def val_warning_capitalization(tag):

@hed_tag_error(ValidationErrors.UNITS_MISSING, default_severity=ErrorSeverity.WARNING)
def val_warning_default_units_used(tag, default_unit):
if default_unit is None:
return f"No unit specified on - '{tag}'. Multiple default values exist and cannot be inferred"
return f"No unit specified. Using '{default_unit}' as the default - '{tag}'"


Expand Down
40 changes: 36 additions & 4 deletions hed/models/expression_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class Token:
Wildcard = 10
ExactMatch = 11
ExactMatchEnd = 12
ExactMatchOptional = 14
NotInLine = 13 # Not currently a token. In development and may become one.

def __init__(self, text):
Expand All @@ -83,6 +84,7 @@ def __init__(self, text):
"???": Token.Wildcard, # Any Group
"{": Token.ExactMatch, # Nothing else
"}": Token.ExactMatchEnd, # Nothing else
":": Token.ExactMatchOptional,
"@": Token.NotInLine
}
self.kind = tokens.get(text, Token.Tag)
Expand Down Expand Up @@ -158,7 +160,11 @@ def handle_expr(self, hed_group, exact=False):
if not groups1:
return groups1
groups2 = self.right.handle_expr(hed_group, exact=exact)
# this is slow...

return self.merge_groups(groups1, groups2)

@staticmethod
def merge_groups(groups1, groups2):
return_list = []
for group in groups1:
for other_group in groups2:
Expand Down Expand Up @@ -308,6 +314,20 @@ def handle_expr(self, hed_group, exact=False):
if return_list:
return return_list

# Basically if we don't have an exact match above, do the more complex matching including optional
if self.left:
optional_groups = self.left.handle_expr(hed_group, exact=True)
found_groups = ExpressionAnd.merge_groups(found_groups, optional_groups)

if found_groups:
return_list = []
for group in found_groups:
if len(group.group.children) == len(group.tags):
return_list.append(group)

if return_list:
return return_list

return []


Expand Down Expand Up @@ -336,6 +356,11 @@ def __init__(self, expression_string):

'[[Event and Action]]' - Find a group with Event And Action at the same level.

Practical Complex Example:

[[{(Onset or Offset), (Def or [[Def-expand]]): ???}]] - A group with an onset tag,
a def tag or def-expand group, and an optional wildcard group

Parameters:
expression_string(str): The query string
"""
Expand Down Expand Up @@ -382,6 +407,9 @@ def _handle_negation(self):
next_token = self._next_token_is([Token.LogicalNegation])
if next_token == Token.LogicalNegation:
interior = self._handle_grouping_op()
if "?" in str(interior):
raise ValueError("Cannot negate wildcards, or expressions that contain wildcards."
"Use {required_expression : optional_expression}.")
expr = ExpressionNegation(next_token, right=interior)
return expr
else:
Expand Down Expand Up @@ -411,8 +439,12 @@ def _handle_grouping_op(self):
elif next_token == Token.ExactMatch:
interior = self._handle_and_op()
expr = ExpressionExactMatch(next_token, right=interior)
next_token = self._next_token_is([Token.ExactMatchEnd])
if next_token != Token.ExactMatchEnd:
next_token = self._next_token_is([Token.ExactMatchEnd, Token.ExactMatchOptional])
if next_token == Token.ExactMatchOptional:
optional_portion = self._handle_and_op()
expr.left = optional_portion
next_token = self._next_token_is([Token.ExactMatchEnd])
if next_token is None:
raise ValueError("Parse error: Missing closing curly bracket")
else:
next_token = self._get_next_token()
Expand All @@ -434,7 +466,7 @@ def _parse(self, expression_string):
return expr

def _tokenize(self, expression_string):
grouping_re = r"\[\[|\[|\]\]|\]|}|{"
grouping_re = r"\[\[|\[|\]\]|\]|}|{|:"
paren_re = r"\)|\(|~"
word_re = r"\?+|\band\b|\bor\b|,|[\"_\-a-zA-Z0-9/.^#\*@]+"
re_string = fr"({grouping_re}|{paren_re}|{word_re})"
Expand Down
6 changes: 4 additions & 2 deletions hed/models/hed_string.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(self, hed_string, hed_schema, def_dict=None, _contents=None):
super().__init__(hed_string, contents=contents, startpos=0, endpos=len(hed_string))
self._schema = hed_schema
self._from_strings = None
self._def_dict = def_dict

@classmethod
def from_hed_strings(cls, hed_strings):
Expand All @@ -55,7 +56,8 @@ def from_hed_strings(cls, hed_strings):
hed_string = ",".join([group._hed_string for group in hed_strings])
contents = [child for sub_string in hed_strings for child in sub_string.children]
first_schema = hed_strings[0]._schema
new_string.__init__(hed_string=hed_string, _contents=contents, hed_schema=first_schema)
first_dict = hed_strings[0]._def_dict
new_string.__init__(hed_string=hed_string, _contents=contents, hed_schema=first_schema, def_dict=first_dict)
new_string._from_strings = hed_strings
return new_string

Expand Down Expand Up @@ -344,7 +346,7 @@ def validate(self, allow_placeholders=True, error_handler=None):
"""
from hed.validator import HedValidator

validator = HedValidator(self._schema)
validator = HedValidator(self._schema, def_dicts=self._def_dict)
return validator.validate(self, allow_placeholders=allow_placeholders, error_handler=error_handler)

def find_top_level_tags(self, anchor_tags, include_groups=2):
Expand Down
50 changes: 38 additions & 12 deletions hed/models/hed_tag.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ def _calculate_to_canonical_forms(self, hed_schema):
return tag_issues

def get_stripped_unit_value(self):
""" Return the extension portion without units.
""" Return the extension divided into value and units, if the units are valid.

Returns:
stripped_unit_value (str): The extension portion with the units removed.
Expand All @@ -345,6 +345,32 @@ def get_stripped_unit_value(self):

return self.extension, None

def value_as_default_unit(self):
""" Returns the value converted to default units if possible.

Returns None if the units are invalid.(No default unit or invalid)

Returns:
value (float or None): The extension value as default units.
If there are not default units, returns None.

Examples:
'Duration/300 ms' will return .3

"""
tag_unit_classes = self.unit_classes
value, _, units = self.extension.rpartition(" ")
if not value:
stripped_value = units
unit = self.default_unit
else:
stripped_value, unit = self._get_tag_units_portion(tag_unit_classes)

if stripped_value:
if unit.attributes.get("conversionFactor"):
conversion_factor = unit.attributes.get("conversionFactor", 1.0)
return float(stripped_value) * float(conversion_factor)

@property
def unit_classes(self):
""" Return a dict of all the unit classes this tag accepts.
Expand Down Expand Up @@ -476,20 +502,19 @@ def get_tag_unit_class_units(self):

return units

def get_unit_class_default_unit(self):
@property
def default_unit(self):
""" Get the default unit class unit for this tag.

Only a tag with a single unit class can have default units.
Returns:
str: The default unit class unit associated with the specific tag or an empty string.

unit(UnitEntry or None): the default unit entry for this tag, or None
"""
default_unit = ''
unit_classes = self.unit_classes.values()
if unit_classes:
if len(unit_classes) == 1:
first_unit_class_entry = list(unit_classes)[0]
default_unit = first_unit_class_entry.has_attribute(HedKey.DefaultUnits, return_value=True)

return default_unit
return first_unit_class_entry.units.get(default_unit, None)

def base_tag_has_attribute(self, tag_attribute):
""" Check to see if the tag has a specific attribute.
Expand Down Expand Up @@ -536,8 +561,9 @@ def _get_tag_units_portion(self, tag_unit_classes):
tag_unit_classes (dict): Dictionary of valid UnitClassEntry objects for this tag.

Returns:
stripped_value (str): The value with the units removed.

stripped_value (str or None): The value with the units removed.
This is filled in if there are no units as well.
unit (UnitEntry or None): The matching unit entry if one is found
"""
value, _, units = self.extension.rpartition(" ")
if not units:
Expand All @@ -548,12 +574,12 @@ def _get_tag_units_portion(self, tag_unit_classes):

possible_match = self._find_modifier_unit_entry(units, all_valid_unit_permutations)
if possible_match and not possible_match.has_attribute(HedKey.UnitPrefix):
return value, units
return value, possible_match

# Repeat the above, but as a prefix
possible_match = self._find_modifier_unit_entry(value, all_valid_unit_permutations)
if possible_match and possible_match.has_attribute(HedKey.UnitPrefix):
return units, value
return possible_match, value

return None, None

Expand Down
36 changes: 34 additions & 2 deletions hed/schema/hed_schema_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from hed.schema.hed_schema_constants import HedKey

import inflect
import copy


pluralize = inflect.engine()
pluralize.defnoun("hertz", "hertz")
Expand Down Expand Up @@ -133,6 +135,20 @@ def get_known_attributes(self):
return {key: value for key, value in self.attributes.items()
if not self._unknown_attributes or key not in self._unknown_attributes}

# Give a default deep copy that excludes the _section attribute
def __deepcopy__(self, memo):
# Create a new instance
new_obj = self.__class__.__new__(self.__class__)
memo[id(self)] = new_obj # Add the new object to the memo to handle cyclic references

for k, v in self.__dict__.items():
if k != "_section":
new_val = copy.deepcopy(v, memo)
else:
new_val = v
setattr(new_obj, k, new_val)
return new_obj


class UnitClassEntry(HedSchemaEntry):
""" A single unit class entry in the HedSchema. """
Expand Down Expand Up @@ -169,7 +185,11 @@ def finalize_entry(self, schema):
for derived_unit in new_derivative_units:
derivative_units[derived_unit] = unit_entry
for modifier in unit_entry.unit_modifiers:
derivative_units[modifier.name + derived_unit] = unit_entry
new_entry = copy.deepcopy(unit_entry)
derivative_units[modifier.name + derived_unit] = new_entry
new_entry.unit_class_name = derived_unit
new_entry.attributes["conversionFactor"] = new_entry.get_conversion_factor(modifier_entry=modifier)

self.derivative_units = derivative_units

def __eq__(self, other):
Expand All @@ -182,7 +202,6 @@ def __eq__(self, other):

class UnitEntry(HedSchemaEntry):
""" A single unit entry with modifiers in the HedSchema. """

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.unit_class_name = None
Expand All @@ -197,6 +216,19 @@ def finalize_entry(self, schema):
"""
self.unit_modifiers = schema._get_modifiers_for_unit(self.name)

def get_conversion_factor(self, modifier_entry):
"""Returns the conversion factor from combining this unit with the specified modifier

Parameters:
modifier_entry (HedSchemaEntry): The modifier to apply

Returns:
conversion_factor(float): Defaults to 1.0 conversion factor if not present on unit and modifier.
"""
base_factor = float(self.attributes.get("conversionFactor", "1.0").replace("^", "e"))
modifier_factor = float(modifier_entry.attributes.get("conversionFactor", "1.0").replace("^", "e"))
return base_factor * modifier_factor


class HedTagEntry(HedSchemaEntry):
""" A single tag entry in the HedSchema. """
Expand Down
22 changes: 1 addition & 21 deletions hed/validator/tag_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ def _check_units(self, original_tag, bad_units, report_as):
validation_issue = ErrorHandler.format_error(ValidationErrors.UNITS_INVALID,
tag=report_as, units=tag_unit_class_units)
else:
default_unit = original_tag.get_unit_class_default_unit()
default_unit = original_tag.default_unit
validation_issue = ErrorHandler.format_error(ValidationErrors.UNITS_MISSING,
tag=report_as, default_unit=default_unit)
return validation_issue
Expand Down Expand Up @@ -378,26 +378,6 @@ def check_tag_requires_child(self, original_tag):
tag=original_tag)
return validation_issues

def check_tag_unit_class_units_exist(self, original_tag):
""" Report warning if tag has a unit class tag with no units.

Parameters:
original_tag (HedTag): The original tag that is used to report the error.

Returns:
list: Validation issues. Each issue is a dictionary.

"""
validation_issues = []
if original_tag.is_unit_class_tag():
tag_unit_values = original_tag.extension
if tag_validator_util.validate_numeric_value_class(tag_unit_values):
default_unit = original_tag.get_unit_class_default_unit()
validation_issues += ErrorHandler.format_error(ValidationErrors.UNITS_MISSING,
tag=original_tag,
default_unit=default_unit)
return validation_issues

def check_for_invalid_extension_chars(self, original_tag):
"""Report invalid characters in extension/value.

Expand Down
5 changes: 3 additions & 2 deletions readthedocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ formats:
- pdf

build:
image: latest
os: "ubuntu-22.04"
tools:
python: "3.8"

# Build documentation in the docs/ directory with Sphinx
sphinx:
Expand All @@ -15,7 +17,6 @@ sphinx:


python:
version: 3.8
install:
- requirements: docs/requirements.txt
system_packages: true
37 changes: 36 additions & 1 deletion tests/models/test_expression_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,4 +694,39 @@ def test_not_in_line3(self):
"(A, B, (C)), D": True,
"(A, B, (C)), (D), E": True,
}
self.base_test("@C or B", test_strings)
self.base_test("@C or B", test_strings)

def test_optional_exact_group(self):
test_strings = {
"A, C": True,
}
self.base_test("{a and (b or c)}", test_strings)

test_strings = {
"A, B, C, D": True,
}
self.base_test("{a and b: c and d}", test_strings)

test_strings = {
"A, B, C": True,
"A, B, C, D": False,
}
self.base_test("{a and b: c or d}", test_strings)

test_strings = {
"A, C": True,
"A, D": True,
"A, B, C": False,
"A, B, C, D": False,
}
self.base_test("{a or b: c or d}", test_strings)

test_strings = {
"(Onset, (Def-expand/taco))": True,
"(Onset, (Def-expand/taco, (Label/DefContents)))": True,
"(Onset, (Def-expand/taco), (Label/OnsetContents))": True,
"(Onset, (Def-expand/taco), (Label/OnsetContents, Description/MoreContents))": True,
"Onset, (Def-expand/taco), (Label/OnsetContents)": False,
"(Onset, (Def-expand/taco), Label/OnsetContents)": False,
}
self.base_test("[[{(Onset or Offset), (Def or [[Def-expand]]): ???}]]", test_strings)
Loading