From 9396250b34c05d41a298aaa88cdaa86366ace010 Mon Sep 17 00:00:00 2001 From: jniestroy Date: Fri, 18 Oct 2024 08:48:27 -0400 Subject: [PATCH 1/7] add schema inference to cli --- src/fairscape_cli/models/schema/tabular.py | 58 ++++++++++++++++++++++ src/fairscape_cli/models/schema/utils.py | 16 ++++++ src/fairscape_cli/schema/schema.py | 18 ++++++- 3 files changed, 91 insertions(+), 1 deletion(-) diff --git a/src/fairscape_cli/models/schema/tabular.py b/src/fairscape_cli/models/schema/tabular.py index 8a97f7b..e03a2f9 100644 --- a/src/fairscape_cli/models/schema/tabular.py +++ b/src/fairscape_cli/models/schema/tabular.py @@ -3,6 +3,7 @@ from functools import lru_cache import os import json +import pyarrow.parquet as pq from pydantic import ( BaseModel, @@ -24,6 +25,7 @@ GenerateSlice, PropertyNameException, ColumnIndexException, + map_arrow_type_to_json_schema ) from fairscape_cli.models.utils import ( @@ -351,6 +353,62 @@ def execute_validation(self, dataPath): }) return output_exceptions + + @classmethod + def infer_from_parquet(cls, name: str, description: str, guid: Optional[str], parquet_file: str) -> 'TabularValidationSchema': + try: + table = pq.read_table(parquet_file) + schema = table.schema + + properties = {} + for i, field in enumerate(schema): + field_name = field.name + field_type = map_arrow_type_to_json_schema(field.type) + + if field_type == 'string': + properties[field_name] = StringProperty( + datatype='string', + description=f"Column {field_name}", + index=i + ) + elif field_type == 'integer': + properties[field_name] = IntegerProperty( + datatype='integer', + description=f"Column {field_name}", + index=i + ) + elif field_type == 'number': + properties[field_name] = NumberProperty( + datatype='number', + description=f"Column {field_name}", + index=i + ) + elif field_type == 'boolean': + properties[field_name] = BooleanProperty( + datatype='boolean', + description=f"Column {field_name}", + index=i + ) + elif field_type == 'array': + item_type = map_arrow_type_to_json_schema(field.type.value_type) + properties[field_name] = ArrayProperty( + datatype='array', + description=f"Column {field_name}", + index=str(i), + items=Items(datatype=DatatypeEnum(item_type)) + ) + + return cls( + name=name, + description=description, + guid=guid, + properties=properties, + required=list(properties.keys()), + separator=",", + header=True + ) + except Exception as e: + raise ValueError(f"Error inferring schema: {str(e)}") diff --git a/src/fairscape_cli/models/schema/utils.py b/src/fairscape_cli/models/schema/utils.py index df177ee..8f90353 100644 --- a/src/fairscape_cli/models/schema/utils.py +++ b/src/fairscape_cli/models/schema/utils.py @@ -4,6 +4,7 @@ List ) from itertools import product +import pyarrow as pa class PropertyNameException(Exception): @@ -142,3 +143,18 @@ def CheckOverlap(property_index: Union[str,int], schema_indicies: List[Union[str # TODO check that slice indicies don't overlap any other slice indicies return None + +def map_arrow_type_to_json_schema(arrow_type): + """Converts pyarrow types to their matching type in json schema""" + if arrow_type.equals(pa.string()): + return 'string' + elif arrow_type.equals(pa.int64()) or arrow_type.equals(pa.int32()): + return 'integer' + elif arrow_type.equals(pa.float64()) or arrow_type.equals(pa.float32()): + return 'number' + elif arrow_type.equals(pa.bool_()): + return 'boolean' + elif isinstance(arrow_type, pa.ListType): + return 'array' + else: + return 'string' # Default to string for unsupported types diff --git a/src/fairscape_cli/schema/schema.py b/src/fairscape_cli/schema/schema.py index fb6258d..faa0347 100644 --- a/src/fairscape_cli/schema/schema.py +++ b/src/fairscape_cli/schema/schema.py @@ -6,7 +6,6 @@ ValidationError ) - from fairscape_cli.models.schema.tabular import ( TabularValidationSchema, ReadSchema, @@ -304,4 +303,21 @@ def validate(ctx, schema, data): print('Validation Success') ctx.exit(0) +@schema.command('infer') +@click.option('--name', required=True, type=str) +@click.option('--description', required=True, type=str) +@click.option('--guid', required=False, type=str, default="", show_default=False) +@click.argument('parquet_file', type=click.Path(exists=True)) +@click.argument('schema_file', type=str) +@click.pass_context +def infer_schema(ctx, name, description, guid, parquet_file, schema_file): + """Infer a Tabular Schema from a Parquet file.""" + try: + schema_model = TabularValidationSchema.infer_from_parquet(name, description, guid, parquet_file) + WriteSchema(schema_model, schema_file) + click.echo(f"Inferred Schema: {str(schema_file)}") + except Exception as e: + click.echo(f"Error inferring schema: {str(e)}") + ctx.exit(code=1) + From 3718f2a824539f110163233792a46f597a211a8c Mon Sep 17 00:00:00 2001 From: jniestroy Date: Fri, 18 Oct 2024 20:26:27 -0400 Subject: [PATCH 2/7] add min max --- src/fairscape_cli/models/schema/tabular.py | 53 ++++++++++++++++------ src/fairscape_cli/models/schema/utils.py | 2 +- src/fairscape_cli/schema/schema.py | 5 +- 3 files changed, 42 insertions(+), 18 deletions(-) diff --git a/src/fairscape_cli/models/schema/tabular.py b/src/fairscape_cli/models/schema/tabular.py index e03a2f9..a40974f 100644 --- a/src/fairscape_cli/models/schema/tabular.py +++ b/src/fairscape_cli/models/schema/tabular.py @@ -4,6 +4,7 @@ import os import json import pyarrow.parquet as pq +import pyarrow.compute as pc from pydantic import ( BaseModel, @@ -355,7 +356,7 @@ def execute_validation(self, dataPath): return output_exceptions @classmethod - def infer_from_parquet(cls, name: str, description: str, guid: Optional[str], parquet_file: str) -> 'TabularValidationSchema': + def infer_from_parquet(cls, name: str, description: str, guid: Optional[str], parquet_file: str, include_min_max: bool = False) -> 'TabularValidationSchema': try: table = pq.read_table(parquet_file) schema = table.schema @@ -371,18 +372,42 @@ def infer_from_parquet(cls, name: str, description: str, guid: Optional[str], pa description=f"Column {field_name}", index=i ) - elif field_type == 'integer': - properties[field_name] = IntegerProperty( - datatype='integer', - description=f"Column {field_name}", - index=i - ) - elif field_type == 'number': - properties[field_name] = NumberProperty( - datatype='number', - description=f"Column {field_name}", - index=i - ) + elif field_type in ['integer', 'number']: + if include_min_max: + column = table.column(field_name) + min_max = pc.min_max(column) + min_value = min_max['min'].as_py() + max_value = min_max['max'].as_py() + + if field_type == 'integer': + properties[field_name] = IntegerProperty( + datatype='integer', + description=f"Column {field_name}", + index=i, + minimum=min_value, + maximum=max_value + ) + else: + properties[field_name] = NumberProperty( + datatype='number', + description=f"Column {field_name}", + index=i, + minimum=min_value, + maximum=max_value + ) + else: + if field_type == 'integer': + properties[field_name] = IntegerProperty( + datatype='integer', + description=f"Column {field_name}", + index=i + ) + else: + properties[field_name] = NumberProperty( + datatype='number', + description=f"Column {field_name}", + index=i + ) elif field_type == 'boolean': properties[field_name] = BooleanProperty( datatype='boolean', @@ -411,8 +436,6 @@ def infer_from_parquet(cls, name: str, description: str, guid: Optional[str], pa raise ValueError(f"Error inferring schema: {str(e)}") - - def AppendProperty(schemaFilepath: str, propertyInstance, propertyName: str) -> None: # check that schemaFile exists schemaPath = pathlib.Path(schemaFilepath) diff --git a/src/fairscape_cli/models/schema/utils.py b/src/fairscape_cli/models/schema/utils.py index 8f90353..a67c4a8 100644 --- a/src/fairscape_cli/models/schema/utils.py +++ b/src/fairscape_cli/models/schema/utils.py @@ -157,4 +157,4 @@ def map_arrow_type_to_json_schema(arrow_type): elif isinstance(arrow_type, pa.ListType): return 'array' else: - return 'string' # Default to string for unsupported types + return 'string' \ No newline at end of file diff --git a/src/fairscape_cli/schema/schema.py b/src/fairscape_cli/schema/schema.py index faa0347..6d63263 100644 --- a/src/fairscape_cli/schema/schema.py +++ b/src/fairscape_cli/schema/schema.py @@ -307,13 +307,14 @@ def validate(ctx, schema, data): @click.option('--name', required=True, type=str) @click.option('--description', required=True, type=str) @click.option('--guid', required=False, type=str, default="", show_default=False) +@click.option('--include-min-max', is_flag=True, help="Include min and max values for numeric and integer fields") @click.argument('parquet_file', type=click.Path(exists=True)) @click.argument('schema_file', type=str) @click.pass_context -def infer_schema(ctx, name, description, guid, parquet_file, schema_file): +def infer_schema(ctx, name, description, guid, include_min_max, parquet_file, schema_file): """Infer a Tabular Schema from a Parquet file.""" try: - schema_model = TabularValidationSchema.infer_from_parquet(name, description, guid, parquet_file) + schema_model = TabularValidationSchema.infer_from_parquet(name, description, guid, parquet_file, include_min_max) WriteSchema(schema_model, schema_file) click.echo(f"Inferred Schema: {str(schema_file)}") except Exception as e: From 74690956c9128388f0487724a60bd50a79338aa8 Mon Sep 17 00:00:00 2001 From: jniestroy Date: Thu, 24 Oct 2024 14:22:59 -0400 Subject: [PATCH 3/7] refactor plus h5 --- src/fairscape_cli/models/schema/tabular.py | 591 +++++++++++---------- src/fairscape_cli/schema/schema.py | 167 +++--- 2 files changed, 410 insertions(+), 348 deletions(-) diff --git a/src/fairscape_cli/models/schema/tabular.py b/src/fairscape_cli/models/schema/tabular.py index a40974f..fafa3c5 100644 --- a/src/fairscape_cli/models/schema/tabular.py +++ b/src/fairscape_cli/models/schema/tabular.py @@ -3,9 +3,11 @@ from functools import lru_cache import os import json +import pandas as pd import pyarrow.parquet as pq import pyarrow.compute as pc - +import h5py +from enum import Enum from pydantic import ( BaseModel, ConfigDict, @@ -19,7 +21,8 @@ List, Optional, Union, - Literal + Literal, + Type ) from fairscape_cli.models.schema.utils import ( @@ -39,12 +42,26 @@ NAAN, ) -import datetime -from enum import Enum -import re - +class FileType(str, Enum): + CSV = "csv" + TSV = "tsv" + PARQUET = "parquet" + HDF5 = "h5" + + @classmethod + def from_extension(cls, filepath: str) -> 'FileType': + ext = pathlib.Path(filepath).suffix.lower()[1:] # Remove the dot + if ext == 'h5' or ext == 'hdf5': + return cls.HDF5 + elif ext == 'parquet': + return cls.PARQUET + elif ext == 'tsv': + return cls.TSV + elif ext == 'csv': + return cls.CSV + else: + raise ValueError(f"Unsupported file extension: {ext}") -# datatype enum class DatatypeEnum(str, Enum): NULL = "null" BOOLEAN = "boolean" @@ -60,19 +77,16 @@ class Items(BaseModel): ) datatype: DatatypeEnum = Field(alias="type") - class BaseProperty(BaseModel): description: str = Field(description="description of field") model_config = ConfigDict(populate_by_name = True) index: Union[int,str] = Field(description="index of the column for this value") - valueURL: Optional[str] = Field(default=None) - + valueURL: Optional[str] = Field(default=None) class NullProperty(BaseProperty): datatype: Literal['null'] = Field(alias="type", default='null') index: int - class StringProperty(BaseProperty): datatype: Literal['string'] = Field(alias="type") pattern: Optional[str] = Field(description="Regex pattern to execute against values", default=None) @@ -80,7 +94,6 @@ class StringProperty(BaseProperty): minLength: Optional[int] = Field(description="Inclusive minimum length for string values", default=None) index: int - class ArrayProperty(BaseProperty): datatype: Literal['array'] = Field(alias="type") maxItems: Optional[int] = Field(description="max items in array, validation fails if length is greater than this value", default=None) @@ -89,12 +102,10 @@ class ArrayProperty(BaseProperty): index: str items: Items - class BooleanProperty(BaseProperty): datatype: Literal['boolean'] = Field(alias="type") index: int - class NumberProperty(BaseProperty): datatype: Literal['number'] = Field(alias="type") maximum: Optional[float] = Field(description="Inclusive Upper Limit for Values", default=None) @@ -102,19 +113,17 @@ class NumberProperty(BaseProperty): index: int @model_validator(mode='after') - def check_max_min(self) -> 'IntegerProperty': + def check_max_min(self) -> 'NumberProperty': minimum = self.minimum maximum = self.maximum if maximum is not None and minimum is not None: - if maximum == minimum: - raise ValueError('IntegerProperty attribute minimum != maximum') + raise ValueError('NumberProperty attribute minimum != maximum') elif maximum < minimum: - raise ValueError('IntegerProperty attribute maximum !< minimum') + raise ValueError('NumberProperty attribute maximum !< minimum') return self - class IntegerProperty(BaseProperty): datatype: Literal['integer'] = Field(alias="type") maximum: Optional[int] = Field(description="Inclusive Upper Limit for Values", default=None) @@ -127,315 +136,321 @@ def check_max_min(self) -> 'IntegerProperty': maximum = self.maximum if maximum is not None and minimum is not None: - if maximum == minimum: raise ValueError('IntegerProperty attribute minimum != maximum') elif maximum < minimum: raise ValueError('IntegerProperty attribute maximum !< minimum') return self - - PropertyUnion = Union[StringProperty, ArrayProperty, BooleanProperty, NumberProperty, IntegerProperty, NullProperty] - - -class TabularValidationSchema(BaseModel): +class BaseSchema(BaseModel): guid: Optional[str] = Field(alias="@id", default=None) context: Optional[Dict] = Field(default=DEFAULT_CONTEXT, alias="@context") metadataType: Optional[str] = Field(default=DEFAULT_SCHEMA_TYPE, alias="@type") schema_version: str = Field(default="https://json-schema.org/draft/2020-12/schema", alias="schema") name: str description: str - properties: Dict[str, PropertyUnion] = Field(default={}) datatype: str = Field(default="object", alias="type") additionalProperties: bool = Field(default=True) required: List[str] = Field(description="list of required properties by name", default=[]) - separator: str = Field(description="Field seperator for the file") - header: bool = Field(description="Do files of this schema have a header row", default=False) - examples: Optional[List[Dict[str, str ]]] = Field(default=[]) + examples: Optional[List[Dict[str, str]]] = Field(default=[]) - # Computed Field implementation for guid generation def generate_guid(self) -> str: - """ Generate an ARK for the Schema - """ - # if guid is already set if self.guid is None: - prefix=f"schema-{self.name.lower().replace(' ', '-')}" + prefix = f"schema-{self.name.lower().replace(' ', '-')}" sq = GenerateDatetimeSquid() self.guid = f"ark:{NAAN}/{prefix}-{sq}" return self.guid + + @model_validator(mode='after') + def generate_all_guids(self) -> 'BaseSchema': + """Generate GUIDs for this schema and any nested schemas""" + self.generate_guid() + + # Generate GUIDs for any nested schemas in properties + if hasattr(self, 'properties'): + for prop in self.properties.values(): + if isinstance(prop, BaseSchema): + prop.generate_guid() + + return self + + def to_json_schema(self) -> dict: + """Convert the HDF5Schema to JSON Schema format""" + schema = self.model_dump( + by_alias=True, + exclude_unset=True, + exclude_none=True + ) + return schema + +class TabularValidationSchema(BaseSchema): + properties: Dict[str, PropertyUnion] = Field(default={}) + separator: str = Field(description="Field separator for the file") + header: bool = Field(description="Do files of this schema have a header row", default=False) + @classmethod + def infer_from_file(cls, filepath: str, name: str, description: str, include_min_max: bool = False) -> 'TabularValidationSchema': + """Infer schema from a file""" + file_type = FileType.from_extension(filepath) + + if file_type == FileType.PARQUET: + return cls.infer_from_parquet(name, description, None, filepath, include_min_max) + else: # csv or tsv + separator = '\t' if file_type == FileType.TSV else ',' + df = pd.read_csv(filepath, sep=separator) + return cls.infer_from_dataframe(df, name, description, include_min_max, separator) + + @classmethod + def infer_from_dataframe(cls, df: pd.DataFrame, name: str, description: str, include_min_max: bool = False, separator: str = ',') -> 'TabularValidationSchema': + """Infer schema from a pandas DataFrame""" + type_map = { + 'int16': ('integer', IntegerProperty, int), + 'int32': ('integer', IntegerProperty, int), + 'int64': ('integer', IntegerProperty, int), + 'uint8': ('integer', IntegerProperty, int), + 'uint16': ('integer', IntegerProperty, int), + 'uint32': ('integer', IntegerProperty, int), + 'uint64': ('integer', IntegerProperty, int), + 'float16': ('number', NumberProperty, float), + 'float32': ('number', NumberProperty, float), + 'float64': ('number', NumberProperty, float), + 'bool': ('boolean', BooleanProperty, None), + } + + properties = {} + for i, (column_name, dtype) in enumerate(df.dtypes.items()): + dtype_str = str(dtype) + datatype, property_class, converter = type_map.get(dtype_str, ('string', StringProperty, None)) + + kwargs = { + "datatype": datatype, + "description": f"Column {column_name}", + "index": i + } + + if include_min_max and converter: + kwargs.update({ + "minimum": converter(df[column_name].min()), + "maximum": converter(df[column_name].max()) + }) + + properties[column_name] = property_class(**kwargs) + + return cls( + name=name, + description=description, + properties=properties, + required=list(properties.keys()), + separator=separator, + header=True + ) - def load_data(self, dataPath: str) -> List[List[str]]: - """ Load data from a given path and return a list of rows split by the seperator character - """ - # wrong data for example - rows = [] - sep = self.separator - with open(dataPath, "r") as data_file: - data_row = data_file.readline() - - # if the file has a header move on to the next row - if self.header: - data_row = data_file.readline() - - # iterate through the file slicing each row into - while data_row != "": - data_line = data_row.replace(",,", ", ,").replace("\n", "").split(sep) - rows.append(data_line) - data_row = data_file.readline() - - return rows - - - def convert_json(self, dataPath: str): - """ Given a path to a Tabular File, load in the data and generate a list of JSON equivalent data structures - """ - - data_rows = self.load_data(dataPath) - - row_lengths = set([len(row) for row in data_rows]) - if len(row_lengths) == 1: - default_row_length = list(row_lengths)[0] - else: - # TODO set to most common row_length - default_row_length = list(row_lengths)[0] - #raise Exception - - - schema_definition = self.model_dump( - by_alias=True, - exclude_unset=True, - exclude_none=True + @classmethod + def infer_from_parquet(cls, name: str, description: str, guid: Optional[str], filepath: str, include_min_max: bool = False) -> 'TabularValidationSchema': + """Infer schema from a Parquet file""" + table = pq.read_table(filepath) + schema = table.schema + properties = {} + + for i, field in enumerate(schema): + field_name = field.name + field_type = map_arrow_type_to_json_schema(field.type) + + if field_type == 'string': + properties[field_name] = StringProperty( + datatype='string', + description=f"Column {field_name}", + index=i + ) + elif field_type == 'integer': + if include_min_max: + column = table.column(field_name) + min_max = pc.min_max(column) + properties[field_name] = IntegerProperty( + datatype='integer', + description=f"Column {field_name}", + index=i, + minimum=min_max['min'].as_py(), + maximum=min_max['max'].as_py() + ) + else: + properties[field_name] = IntegerProperty( + datatype='integer', + description=f"Column {field_name}", + index=i + ) + elif field_type == 'number': + if include_min_max: + column = table.column(field_name) + min_max = pc.min_max(column) + properties[field_name] = NumberProperty( + datatype='number', + description=f"Column {field_name}", + index=i, + minimum=min_max['min'].as_py(), + maximum=min_max['max'].as_py() + ) + else: + properties[field_name] = NumberProperty( + datatype='number', + description=f"Column {field_name}", + index=i + ) + elif field_type == 'boolean': + properties[field_name] = BooleanProperty( + datatype='boolean', + description=f"Column {field_name}", + index=i ) - # reorganize property data for forming json - properties_simplified = [ - { - "name": property_name, - "index": property_data.get("index"), - "type": property_data.get("type"), - "items": property_data.get("items", {}).get("type"), - "index_slice": None, - "access_function": None - } - for property_name, property_data in schema_definition.get("properties").items() - ] - - - # index slice is going to change on each iteration, as a local variable this causes - # problems for all of the - updated_properties = [] - for prop in properties_simplified: + return cls( + name=name, + description=description, + guid=guid, + properties=properties, + required=list(properties.keys()), + separator=",", # Not used for parquet but required + header=True # Not used for parquet but required + ) + + def validate_dataframe(self, df: pd.DataFrame) -> List[Dict]: + """Validate a dataframe against the schema with lenient string type checking. + Only reports string validation errors for pattern mismatches, not type mismatches.""" + json_schema = self.to_json_schema() + validator = jsonschema.Draft202012Validator(json_schema) + errors = [] + + for i, row in df.iterrows(): + row_dict = row.to_dict() + validation_errors = sorted(validator.iter_errors(row_dict), key=lambda e: e.path) - index_slice = prop.get("index") - datatype = prop.get("type") - item_datatype = prop.get('items') - - prop['index_slice'] = index_slice - - - if datatype == 'array': - generated_slice = GenerateSlice(index_slice, default_row_length) - prop['index_slice'] = generated_slice + for err in validation_errors: + # Skip type validation errors for string fields unless there's a pattern mismatch + if err.validator == "type": + field_name = list(err.path)[-1] if err.path else None + if field_name in self.properties: + prop = self.properties[field_name] + if prop.datatype == "string": + # Skip string type validation errors + continue - if item_datatype ==" boolean": - prop['access_function'] = lambda row, passed_slice: [ bool(item) for item in list(row[passed_slice])] + # Include all other validation errors + errors.append({ + "message": err.message, + "row": i, + "field": list(err.path)[-1] if err.path else None, + "type": "ValidationError", + "failed_keyword": err.validator + }) + + return errors + + def validate_file(self, filepath: str) -> List[Dict]: + """Validate a file against the schema""" + file_type = FileType.from_extension(filepath) + + if file_type == FileType.PARQUET: + df = pd.read_parquet(filepath) + else: # csv or tsv + sep = '\t' if file_type == FileType.TSV else self.separator + df = pd.read_csv(filepath, sep=sep, header=0 if self.header else None) + + return self.validate_dataframe(df) - if item_datatype == "integer": - prop['access_function'] = lambda row, passed_slice: [ int(item) for item in list(row[passed_slice])] - - if item_datatype == "number": - prop['access_function'] = lambda row, passed_slice: [ float(item) for item in row[passed_slice]] +HDF5Union = Union[TabularValidationSchema] +class HDF5Schema(BaseSchema): + properties: Dict[str, HDF5Union] = Field(default={}) - if item_datatype == "string": - prop['access_function'] = lambda row, passed_slice: [ str(item) for item in list(row[passed_slice])] + @staticmethod + def dataset_to_dataframe(dataset: h5py.Dataset) -> pd.DataFrame: + """Convert any HDF5 dataset to a pandas DataFrame""" + data = dataset[()] + + # If it's a structured array (compound dtype), pandas can handle it directly + if dataset.dtype.fields: + return pd.DataFrame(data) + + # For multi-dimensional arrays, create column names based on shape + elif len(dataset.shape) > 1: + n_cols = dataset.shape[1] if len(dataset.shape) > 1 else 1 + columns = [f"column_{i}" for i in range(n_cols)] + return pd.DataFrame(data, columns=columns) + + # For 1D arrays, convert to single column DataFrame + else: + return pd.DataFrame(data, columns=['value']) - if datatype == "boolean": - prop['access_function'] = lambda row, passed_slice: bool(row[passed_slice]) - - if datatype == "integer": - prop['access_function'] = lambda row, passed_slice: int(row[passed_slice]) + @classmethod + def infer_from_file(cls, filepath: str, name: str, description: str, include_min_max: bool = False) -> 'HDF5Schema': + """Infer schema from HDF5 file""" + schema = cls(name=name, description=description) + properties = {} + + with h5py.File(filepath, 'r') as f: + def process_group(group, parent_path=""): + for key, item in group.items(): + path = f"{parent_path}/{key}" if parent_path else key + + if isinstance(item, h5py.Dataset): + try: + df = cls.dataset_to_dataframe(item) + properties[path] = TabularValidationSchema.infer_from_dataframe( + df, + name=f"{name}_{path.replace('/', '_')}", + description=f"Dataset at {path}", + include_min_max=include_min_max + ) + except Exception as e: + print(f"Warning: Could not convert dataset {path} to DataFrame: {str(e)}") - if datatype == "number": - prop['access_function'] = lambda row, passed_slice: float(row[passed_slice]) + elif isinstance(item, h5py.Group): + # Recursively process group contents + process_group(item, path) - if datatype == "string": - prop['access_function'] = lambda row, passed_slice: str(row[passed_slice]) - - if datatype == "null": - prop['access_function'] = lambda row, passed_slice: None - - updated_properties.append(prop) - - - # coerce types and generate lists - json_output = [] - parsing_failures = [] - - for i, row in enumerate(data_rows): - json_row = {} - row_error = False - - for json_attribute in updated_properties: - attribute_name = json_attribute.get("name") - access_func = json_attribute.get("access_function") - attribute_slice = json_attribute.get("index_slice") - try: - json_row[attribute_name] = access_func(row, attribute_slice) - - except ValueError as e: - parsing_failures.append({ - "message": f"ValueError: Failed to Parse Attribute {attribute_name} for Row {i}", - "type": "ParsingError", - "row": i, - "exception": e - }) - row_error = True - - except IndexError as e: - parsing_failures.append({ - "message": f"IndexError: Failed to Parse Attribute {attribute_name} for Row {i}", - "type": "ParsingError", - "row": i, - "exception": e - }) - row_error = True - - except Exception as e: - parsing_failures.append({ - "message": f"Error: Failed to Parse Attribute {attribute_name} for Row {i}", - "type": "ParsingError", - "row": i, - "exception": e - }) - row_error = True - - # if there was an error parsing a row - if row_error: - pass - else: - json_output.append(json_row) - - return json_output, parsing_failures - - - def execute_validation(self, dataPath): - ''' Given a path to a tabular data file, execute the schema against the - ''' - - schema_definition = self.model_dump( - by_alias=True, - exclude_unset=True, - exclude_none=True - ) - - json_objects, parsing_failures = self.convert_json(dataPath) - + process_group(f) + schema.properties = properties + schema.required = list(properties.keys()) + + return schema - output_exceptions = parsing_failures + def validate_file(self, filepath: str) -> List[Dict]: + """Validate an HDF5 file against the schema""" + errors = [] - validator = jsonschema.Draft202012Validator(schema_definition) - for i, json_elem in enumerate(json_objects): - errors = sorted(validator.iter_errors(json_elem), key=lambda e: e.path) - - if len(errors) == 0: - pass - else: - for err in errors: - output_exceptions.append({ - "message": err.message, - "row": i, + with h5py.File(filepath, 'r') as f: + for path, schema in self.properties.items(): + try: + # Try to get the dataset using the path + dataset = f[path] + if isinstance(dataset, h5py.Dataset): + # Convert dataset to DataFrame + df = self.dataset_to_dataframe(dataset) + # Validate using the TabularValidationSchema's validate_dataframe method + dataset_errors = schema.validate_dataframe(df) + # Add path information to errors + for error in dataset_errors: + error['path'] = path + errors.extend(dataset_errors) + except KeyError: + errors.append({ + "message": f"Dataset {path} not found", + "path": path, "type": "ValidationError", - #"exception": e , - "failed_keyword": err.validator, - "schema_path": err.schema_path + "failed_keyword": "required" }) + except Exception as e: + errors.append({ + "message": f"Error validating dataset {path}: {str(e)}", + "path": path, + "type": "ValidationError", + "failed_keyword": "format" + }) + + return errors - return output_exceptions - @classmethod - def infer_from_parquet(cls, name: str, description: str, guid: Optional[str], parquet_file: str, include_min_max: bool = False) -> 'TabularValidationSchema': - try: - table = pq.read_table(parquet_file) - schema = table.schema - - properties = {} - for i, field in enumerate(schema): - field_name = field.name - field_type = map_arrow_type_to_json_schema(field.type) - - if field_type == 'string': - properties[field_name] = StringProperty( - datatype='string', - description=f"Column {field_name}", - index=i - ) - elif field_type in ['integer', 'number']: - if include_min_max: - column = table.column(field_name) - min_max = pc.min_max(column) - min_value = min_max['min'].as_py() - max_value = min_max['max'].as_py() - - if field_type == 'integer': - properties[field_name] = IntegerProperty( - datatype='integer', - description=f"Column {field_name}", - index=i, - minimum=min_value, - maximum=max_value - ) - else: - properties[field_name] = NumberProperty( - datatype='number', - description=f"Column {field_name}", - index=i, - minimum=min_value, - maximum=max_value - ) - else: - if field_type == 'integer': - properties[field_name] = IntegerProperty( - datatype='integer', - description=f"Column {field_name}", - index=i - ) - else: - properties[field_name] = NumberProperty( - datatype='number', - description=f"Column {field_name}", - index=i - ) - elif field_type == 'boolean': - properties[field_name] = BooleanProperty( - datatype='boolean', - description=f"Column {field_name}", - index=i - ) - elif field_type == 'array': - item_type = map_arrow_type_to_json_schema(field.type.value_type) - properties[field_name] = ArrayProperty( - datatype='array', - description=f"Column {field_name}", - index=str(i), - items=Items(datatype=DatatypeEnum(item_type)) - ) - - return cls( - name=name, - description=description, - guid=guid, - properties=properties, - required=list(properties.keys()), - separator=",", - header=True - ) - except Exception as e: - raise ValueError(f"Error inferring schema: {str(e)}") - - def AppendProperty(schemaFilepath: str, propertyInstance, propertyName: str) -> None: # check that schemaFile exists schemaPath = pathlib.Path(schemaFilepath) diff --git a/src/fairscape_cli/schema/schema.py b/src/fairscape_cli/schema/schema.py index 6d63263..e17b9d3 100644 --- a/src/fairscape_cli/schema/schema.py +++ b/src/fairscape_cli/schema/schema.py @@ -5,6 +5,11 @@ from pydantic import ( ValidationError ) +from typing import ( + Union, + Type +) + from fairscape_cli.models.schema.tabular import ( TabularValidationSchema, @@ -20,7 +25,9 @@ PropertyNameException, ColumnIndexException, DatatypeEnum, - Items + Items, + FileType, + HDF5Schema ) from fairscape_cli.config import ( @@ -52,7 +59,7 @@ def create_tabular_schema( separator, schema_file ): - """Initalize a Tabular Schema. + """Initialize a Tabular Schema. """ # create the model try: @@ -60,9 +67,9 @@ def create_tabular_schema( "name": name, "description":description, "guid":guid, - "propeties":{}, + "properties":{}, "required": [], - "header" :header, + "header":header, "separator": separator }) @@ -76,7 +83,6 @@ def create_tabular_schema( click.echo(f"Wrote Schema: {str(schema_file)}") - @schema.group('add-property') def add_property(): """Add a Property to an existing schema. @@ -95,7 +101,6 @@ def add_property(): def add_property_string(ctx, name, index, description, value_url, pattern, schema_file): """Add a String Property to an existing Schema. """ - # instantiate the StringProperty try: stringPropertyModel = StringProperty.model_validate({ "name": name, @@ -114,7 +119,6 @@ def add_property_string(ctx, name, index, description, value_url, pattern, schem ClickAppendProperty(ctx, schema_file, stringPropertyModel, name) - @add_property.command('number') @click.option('--name', type=str, required=True) @click.option('--index', type=int, required=True) @@ -125,9 +129,8 @@ def add_property_string(ctx, name, index, description, value_url, pattern, schem @click.argument('schema_file', type=click.Path(exists=True)) @click.pass_context def add_property_number(ctx, name, index, description, maximum, minimum, value_url, schema_file): - """Add a Numberic property to an existing Schema. + """Add a Numeric property to an existing Schema. """ - # instantiate the NumberPropertyModel try: numberPropertyModel = NumberProperty.model_validate({ "name": name, @@ -148,7 +151,6 @@ def add_property_number(ctx, name, index, description, maximum, minimum, value_u ClickAppendProperty(ctx, schema_file, numberPropertyModel, name) - @add_property.command('boolean') @click.option('--name', type=str, required=True) @click.option('--index', type=int, required=True) @@ -189,7 +191,6 @@ def add_property_boolean(ctx, name, index, description, value_url, schema_file): def add_property_integer(ctx, name, index, description, maximum, minimum, value_url, schema_file): """Add an Integer property to an existing Schema. """ - try: integerPropertyModel = IntegerProperty.model_validate({ "name": name, @@ -234,14 +235,14 @@ def add_property_array(ctx, name, index, description, value_url, items_datatype, try: arrayPropertyModel = ArrayProperty( - datatype = 'array', - index = index, - description = description, - valueURL = value_url, - maxItems = max_items, - minItems = min_items, - uniqueItems = unique_items, - items = Items(datatype=datatype_enum) + datatype='array', + index=index, + description=description, + valueURL=value_url, + maxItems=max_items, + minItems=min_items, + uniqueItems=unique_items, + items=Items(datatype=datatype_enum) ) except ValidationError as metadataError: @@ -251,74 +252,120 @@ def add_property_array(ctx, name, index, description, value_url, items_datatype, ctx.exit(code=1) ClickAppendProperty(ctx, schema_file, arrayPropertyModel, name) - +def determine_schema_type(filepath: str) -> Type[Union[TabularValidationSchema, HDF5Schema]]: + """Determine which schema type to use based on file extension""" + ext = pathlib.Path(filepath).suffix.lower()[1:] + if ext in ('h5', 'hdf5'): + return HDF5Schema + elif ext in ('csv', 'tsv', 'parquet'): + return TabularValidationSchema + else: + raise ValueError(f"Unsupported file extension: {ext}") + @schema.command('validate') @click.option('--schema', type=str, required=True) @click.option('--data', type=str, required=True) -#@click.option('--ro-crate', type=str, required=False, default=None) @click.pass_context def validate(ctx, schema, data): - """Execute validation of a Schema against the provided data. - """ - - - # if not a default schema + """Execute validation of a Schema against the provided data.""" + # Check if schema file exists (if not a default schema) if 'ark' not in schema: - schemaPath = pathlib.Path(schema) - if not schemaPath.exists(): + schema_path = pathlib.Path(schema) + if not schema_path.exists(): click.echo(f"ERROR: Schema file at path {schema} does not exist") ctx.exit(1) - dataPath = pathlib.Path(data) - if not dataPath.exists(): + data_path = pathlib.Path(data) + if not data_path.exists(): click.echo(f"ERROR: Data file at path {data} does not exist") ctx.exit(1) try: - tabular_schema = ReadSchema(schema) - except ValidationError as metadataError: + # Load the schema file + with open(schema) as f: + schema_json = json.load(f) + + # Determine schema type based on the data file + schema_class = determine_schema_type(data) + validation_schema = schema_class.model_validate(schema_json) + + # Validate the file + validation_errors = validation_schema.validate_file(data) + + if len(validation_errors) != 0: + # Create a pretty table of validation errors + error_table = PrettyTable() + if isinstance(validation_schema, HDF5Schema): + error_table.field_names = ['path', 'error_type', 'failed_keyword', 'message'] + else: + error_table.field_names = ['row', 'error_type', 'failed_keyword', 'message'] + + for err in validation_errors: + if isinstance(validation_schema, HDF5Schema): + error_table.add_row([ + err.get("path"), + err.get("type"), + err.get("failed_keyword"), + str(err.get('message')) + ]) + else: + error_table.add_row([ + err.get("row"), + err.get("type"), + err.get("failed_keyword"), + str(err.get('message')) + ]) + + print(error_table) + ctx.exit(1) + else: + print('Validation Success') + ctx.exit(0) + + except ValidationError as metadata_error: click.echo("Error with schema definition") - for validationFailure in metadataError.errors(): - click.echo(f"property: {validationFailure.get('loc')} \tmsg: {validationFailure.get('msg')}") + for validation_failure in metadata_error.errors(): + click.echo(f"property: {validation_failure.get('loc')} \tmsg: {validation_failure.get('msg')}") ctx.exit(1) - - validation_errors = tabular_schema.execute_validation(data) - - if len(validation_errors) !=0: - # print out all errors - - # create a pretty table of validation errors - errorTable = PrettyTable() - errorTable.field_names = ['row', 'error_type', 'failed_keyword', 'message'] - - for err in validation_errors: - errorTable.add_row([err.get("row"), err.get("type"), err.get("failed_keyword"), str(err.get('message'))]) - - print(errorTable) + except Exception as e: + click.echo(f"Error during validation: {str(e)}") ctx.exit(1) - else: - print('Validation Success') - ctx.exit(0) - @schema.command('infer') @click.option('--name', required=True, type=str) @click.option('--description', required=True, type=str) @click.option('--guid', required=False, type=str, default="", show_default=False) @click.option('--include-min-max', is_flag=True, help="Include min and max values for numeric and integer fields") -@click.argument('parquet_file', type=click.Path(exists=True)) +@click.argument('input_file', type=click.Path(exists=True)) @click.argument('schema_file', type=str) @click.pass_context -def infer_schema(ctx, name, description, guid, include_min_max, parquet_file, schema_file): - """Infer a Tabular Schema from a Parquet file.""" +def infer_schema(ctx, name, description, guid, include_min_max, input_file, schema_file): + """Infer a schema from a file (CSV, TSV, Parquet, or HDF5).""" try: - schema_model = TabularValidationSchema.infer_from_parquet(name, description, guid, parquet_file, include_min_max) + # Determine which schema type to use based on input file + schema_class = determine_schema_type(input_file) + + # Infer the schema + schema_model = schema_class.infer_from_file( + input_file, + name, + description, + include_min_max + ) + if guid: + schema_model.guid = guid + WriteSchema(schema_model, schema_file) - click.echo(f"Inferred Schema: {str(schema_file)}") + + # Get file type for display + ext = pathlib.Path(input_file).suffix.lower()[1:] + click.echo(f"Inferred Schema from {ext} file: {str(schema_file)}") + + except ValueError as e: + click.echo(f"Error with file type: {str(e)}") + ctx.exit(code=1) except Exception as e: click.echo(f"Error inferring schema: {str(e)}") - ctx.exit(code=1) - - + ctx.exit(code=1) \ No newline at end of file From c31dad8f7fd979d35e814729f075392905acc708 Mon Sep 17 00:00:00 2001 From: jniestroy Date: Thu, 24 Oct 2024 14:25:16 -0400 Subject: [PATCH 4/7] spacing --- src/fairscape_cli/models/schema/tabular.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/fairscape_cli/models/schema/tabular.py b/src/fairscape_cli/models/schema/tabular.py index fafa3c5..2aaffd6 100644 --- a/src/fairscape_cli/models/schema/tabular.py +++ b/src/fairscape_cli/models/schema/tabular.py @@ -143,7 +143,6 @@ def check_max_min(self) -> 'IntegerProperty': return self PropertyUnion = Union[StringProperty, ArrayProperty, BooleanProperty, NumberProperty, IntegerProperty, NullProperty] - class BaseSchema(BaseModel): guid: Optional[str] = Field(alias="@id", default=None) context: Optional[Dict] = Field(default=DEFAULT_CONTEXT, alias="@context") @@ -491,7 +490,6 @@ def AppendProperty(schemaFilepath: str, propertyInstance, propertyName: str) -> schemaFile.seek(0) schemaFile.write(schemaJson) - def ClickAppendProperty(ctx, schemaFile, propertyModel, name): try: # append the property to the @@ -508,15 +506,12 @@ def ClickAppendProperty(ctx, schemaFile, propertyModel, name): print(str(propertyException)) ctx.exit(code=1) - def ReadSchemaGithub(schemaURI: str) -> TabularValidationSchema: pass def ReadSchemaFairscape(schemaArk: str) -> TabularValidationSchema: pass - - def ReadSchemaLocal(schemaFile: str) -> TabularValidationSchema: """ Helper function for reading the schema and marshaling into the pydantic model """ @@ -531,7 +526,6 @@ def ReadSchemaLocal(schemaFile: str) -> TabularValidationSchema: tabularSchema = TabularValidationSchema.model_validate(schemaJson) return tabularSchema - def ReadSchema(schemaFile:str) -> TabularValidationSchema: ''' Read a schema specified by the argument schemaFile @@ -562,8 +556,6 @@ def ReadSchema(schemaFile:str) -> TabularValidationSchema: schemaInstance = ReadSchemaLocal(schemaFile) return schemaInstance - - def WriteSchema(tabular_schema: TabularValidationSchema, schema_file): """ Helper Function for writing files """ @@ -575,8 +567,6 @@ def WriteSchema(tabular_schema: TabularValidationSchema, schema_file): with open(schema_file, "w") as output_file: output_file.write(schema_json) - - @lru_cache def ImportDefaultSchemas()-> List[TabularValidationSchema]: defaultSchemaLocation = pathlib.Path(os.path.dirname(os.path.realpath(__file__))) / 'default_schemas' From 19957339649736deea8ef9906f7419e3df90140d Mon Sep 17 00:00:00 2001 From: jniestroy Date: Thu, 24 Oct 2024 14:28:21 -0400 Subject: [PATCH 5/7] move union --- src/fairscape_cli/models/schema/tabular.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/fairscape_cli/models/schema/tabular.py b/src/fairscape_cli/models/schema/tabular.py index 2aaffd6..5d12c1e 100644 --- a/src/fairscape_cli/models/schema/tabular.py +++ b/src/fairscape_cli/models/schema/tabular.py @@ -142,7 +142,6 @@ def check_max_min(self) -> 'IntegerProperty': raise ValueError('IntegerProperty attribute maximum !< minimum') return self -PropertyUnion = Union[StringProperty, ArrayProperty, BooleanProperty, NumberProperty, IntegerProperty, NullProperty] class BaseSchema(BaseModel): guid: Optional[str] = Field(alias="@id", default=None) context: Optional[Dict] = Field(default=DEFAULT_CONTEXT, alias="@context") @@ -184,6 +183,7 @@ def to_json_schema(self) -> dict: ) return schema +PropertyUnion = Union[StringProperty, ArrayProperty, BooleanProperty, NumberProperty, IntegerProperty, NullProperty] class TabularValidationSchema(BaseSchema): properties: Dict[str, PropertyUnion] = Field(default={}) separator: str = Field(description="Field separator for the file") @@ -358,9 +358,8 @@ def validate_file(self, filepath: str) -> List[Dict]: return self.validate_dataframe(df) -HDF5Union = Union[TabularValidationSchema] class HDF5Schema(BaseSchema): - properties: Dict[str, HDF5Union] = Field(default={}) + properties: Dict[str, TabularValidationSchema] = Field(default={}) @staticmethod def dataset_to_dataframe(dataset: h5py.Dataset) -> pd.DataFrame: From df869945fe5b2f3b21e09e3d98cb11b66eaf92dd Mon Sep 17 00:00:00 2001 From: jniestroy Date: Thu, 24 Oct 2024 14:30:10 -0400 Subject: [PATCH 6/7] move funcs around --- src/fairscape_cli/models/schema/tabular.py | 24 +++++++++++----------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/fairscape_cli/models/schema/tabular.py b/src/fairscape_cli/models/schema/tabular.py index 5d12c1e..523bc53 100644 --- a/src/fairscape_cli/models/schema/tabular.py +++ b/src/fairscape_cli/models/schema/tabular.py @@ -314,6 +314,18 @@ def infer_from_parquet(cls, name: str, description: str, guid: Optional[str], fi header=True # Not used for parquet but required ) + def validate_file(self, filepath: str) -> List[Dict]: + """Validate a file against the schema""" + file_type = FileType.from_extension(filepath) + + if file_type == FileType.PARQUET: + df = pd.read_parquet(filepath) + else: # csv or tsv + sep = '\t' if file_type == FileType.TSV else self.separator + df = pd.read_csv(filepath, sep=sep, header=0 if self.header else None) + + return self.validate_dataframe(df) + def validate_dataframe(self, df: pd.DataFrame) -> List[Dict]: """Validate a dataframe against the schema with lenient string type checking. Only reports string validation errors for pattern mismatches, not type mismatches.""" @@ -346,18 +358,6 @@ def validate_dataframe(self, df: pd.DataFrame) -> List[Dict]: return errors - def validate_file(self, filepath: str) -> List[Dict]: - """Validate a file against the schema""" - file_type = FileType.from_extension(filepath) - - if file_type == FileType.PARQUET: - df = pd.read_parquet(filepath) - else: # csv or tsv - sep = '\t' if file_type == FileType.TSV else self.separator - df = pd.read_csv(filepath, sep=sep, header=0 if self.header else None) - - return self.validate_dataframe(df) - class HDF5Schema(BaseSchema): properties: Dict[str, TabularValidationSchema] = Field(default={}) From 6217b63da23e3f36f1e70a29811a64a8ef56057b Mon Sep 17 00:00:00 2001 From: jniestroy Date: Thu, 24 Oct 2024 14:36:15 -0400 Subject: [PATCH 7/7] columns --- src/fairscape_cli/models/schema/tabular.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/fairscape_cli/models/schema/tabular.py b/src/fairscape_cli/models/schema/tabular.py index 523bc53..387444a 100644 --- a/src/fairscape_cli/models/schema/tabular.py +++ b/src/fairscape_cli/models/schema/tabular.py @@ -366,17 +366,17 @@ def dataset_to_dataframe(dataset: h5py.Dataset) -> pd.DataFrame: """Convert any HDF5 dataset to a pandas DataFrame""" data = dataset[()] - # If it's a structured array (compound dtype), pandas can handle it directly + # structured array convert directly if dataset.dtype.fields: return pd.DataFrame(data) - # For multi-dimensional arrays, create column names based on shape + # For multi-dimensional arrays make up column name elif len(dataset.shape) > 1: n_cols = dataset.shape[1] if len(dataset.shape) > 1 else 1 columns = [f"column_{i}" for i in range(n_cols)] return pd.DataFrame(data, columns=columns) - # For 1D arrays, convert to single column DataFrame + # For 1D arrays convert to single column DataFrame else: return pd.DataFrame(data, columns=['value'])