Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 145 additions & 51 deletions pygeometa/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,36 +47,117 @@
import logging
import os

from typing import Dict, Type
from pygeometa.schemas.base import BaseOutputSchema

LOGGER = logging.getLogger(__name__)
THISDIR = os.path.dirname(os.path.realpath(__file__))

SCHEMAS = {
'dcat': 'pygeometa.schemas.dcat.DCATOutputSchema',
'iso19139': 'pygeometa.schemas.iso19139.ISO19139OutputSchema',
'iso19139-2': 'pygeometa.schemas.iso19139_2.ISO19139_2OutputSchema',
'iso19139-hnap': 'pygeometa.schemas.iso19139_hnap.ISO19139HNAPOutputSchema', # noqa
'oarec-record': 'pygeometa.schemas.ogcapi_records.OGCAPIRecordOutputSchema', # noqa
'schema-org': 'pygeometa.schemas.schema_org.SchemaOrgOutputSchema',
'stac-item': 'pygeometa.schemas.stac.STACItemOutputSchema',
'wmo-cmp': 'pygeometa.schemas.wmo_cmp.WMOCMPOutputSchema',
'wmo-wcmp2': 'pygeometa.schemas.wmo_wcmp2.WMOWCMP2OutputSchema',
'wmo-wigos': 'pygeometa.schemas.wmo_wigos.WMOWIGOSOutputSchema',
'cwl': 'pygeometa.schemas.cwl.CWLOutputSchema'
}
# runtime mapping: schema_key -> class object
_DISCOVERED_SCHEMAS: Dict[str, Type[BaseOutputSchema]] = {}
_DISCOVERY_DONE = False


def _discover_schemas():
"""Discover local schema packages (folders with __init__.py) in ./schemas.

For each discovered package module, import it and choose the first class
that is a subclass of BaseOutputSchema (excluding BaseOutputSchema itself).
The mapping key will be:
- module attribute SCHEMA_NAME if defined, else
- the package (folder) name.

This function is idempotent and caches results in _DISCOVERED_SCHEMAS.
"""
global _DISCOVERY_DONE, _DISCOVERED_SCHEMAS # noqa
if _DISCOVERY_DONE:
return

LOGGER.debug("Discovering schema packages in %s", THISDIR)
pkg_name = __name__ # 'pygeometa.schemas'

try:
entries = sorted(os.listdir(THISDIR))
except OSError:
entries = []

for entry in entries:
path = os.path.join(THISDIR, entry)
# look for directories with __init__.py only
if not os.path.isdir(path):
continue
init_py = os.path.join(path, "__init__.py")
if not os.path.isfile(init_py):
continue

module_name = f"{pkg_name}.{entry}"
try:
# import normally so package semantics and relative imports work
module = importlib.import_module(module_name)
except Exception as exc:
LOGGER.exception(f"Failed to import schema package {module_name}: {exc}") # noqa
continue

# Key selection: prefer explicit module-provided `name`,
# else use dotted name
schema_key = getattr(module, "name", None) or module.__name__

# Collect candidate classes in module that are subclasses
# of BaseOutputSchema
candidates = []
for attr_name in dir(module):
try:
attr = getattr(module, attr_name)
except Exception:
# some modules raise on attribute access; skip those
continue
if not isinstance(attr, type):
continue
# Exclude BaseOutputSchema itself
if attr is BaseOutputSchema:
continue
try:
if issubclass(attr, BaseOutputSchema):
candidates.append(attr)
except TypeError:
# issubclass can raise if attr is not a class; ignore
continue

if not candidates:
LOGGER.warning(f"No BaseOutputSchema subclass found in {module_name}; skipping" ) # noqa
continue

# pick the most concrete subclass (leaf in the inheritance chain)
# the class with the longest method-resolution order (MRO)
chosen_cls = sorted(candidates, key=lambda c: len(getattr(c, "__mro__", ())), reverse=True)[0] # noqa

# Ensure uniqueness of keys
if schema_key in _DISCOVERED_SCHEMAS:
existing = _DISCOVERED_SCHEMAS[schema_key]
if existing is chosen_cls:
LOGGER.debug("Schema key {schema_key} already registered with same class; skipping") # noqa
continue
LOGGER.warning(f"""Duplicate schema key '{schema_key}' (module {module_name}, class {chosen_cls.__name__}). # noqa
Keeping first discovered: {existing.__module__}.{existing.__name__}""") # noqa
continue

_DISCOVERED_SCHEMAS[schema_key] = chosen_cls
LOGGER.info(f"Discovered schema '{schema_key}' -> {chosen_cls.__module__}.{chosen_cls.__name__}") # noqa

_DISCOVERY_DONE = True


def get_supported_schemas(details: bool = False,
include_autodetect: bool = False) -> list:
"""
Get supported schemas
Get supported schemas.

:param details: provide read/write details
:param include_autodetect: include magic auto detection mode

:returns: list of supported schemas
:returns: list of supported schemas (strings) or details matrix
"""
_discover_schemas()

def has_mode(plugin: BaseOutputSchema, mode: str) -> bool:
enabled = False
Expand All @@ -90,36 +171,47 @@ def has_mode(plugin: BaseOutputSchema, mode: str) -> bool:

return enabled

schema_matrix = []

LOGGER.debug('Generating list of supported schemas')

if not details:
schema_names = []
for cls in _DISCOVERED_SCHEMAS.values():
try:
schema_names.append(cls().name)
except Exception:
continue
if include_autodetect:
schemas_keys = list(SCHEMAS.keys())
schemas_keys.append('autodetect')
return schemas_keys
else:
return SCHEMAS.keys()
schema_names.append("autodetect")
return schema_names

for key in SCHEMAS.keys():
schema = load_schema(key)
can_read = has_mode(schema, 'import_')
can_write = has_mode(schema, 'write')
schema_matrix = []
for key, cls in _DISCOVERED_SCHEMAS.items():
nm = key
try:
schema_inst = cls()
nm = schema_inst.name
can_read = has_mode(schema_inst, "import_")
can_write = has_mode(schema_inst, "write")
description = getattr(schema_inst, "description", "")
except Exception:
LOGGER.exception(f"Error instantiating schema class for key {key}") # noqa
can_read = False
can_write = False
description = ""

schema_matrix.append({
'id': key,
'description': schema.description,
'read': can_read,
'write': can_write
"id": nm,
"description": description,
"read": can_read,
"write": can_write
})

if include_autodetect:
schema_matrix.append({
'id': 'autodetect',
'description': 'Auto schema detection',
'read': True,
'write': False
"id": "autodetect",
"description": "Auto schema detection",
"read": True,
"write": False
})

return schema_matrix
Expand All @@ -133,28 +225,30 @@ def load_schema(schema_name: str) -> BaseOutputSchema:

:returns: plugin object
"""
_discover_schemas()
LOGGER.debug("Available schemas: %s", list(_DISCOVERED_SCHEMAS.keys()))

LOGGER.debug(f'Schemas: {SCHEMAS.keys()}')
# allow 'autodetect' to be handled elsewhere (kept for parity)
if schema_name == "autodetect":
raise InvalidSchemaError("Autodetect is not a concrete schema")

if schema_name not in SCHEMAS.keys():
msg = f'Schema {schema_name} not found'
cls = None
for v in _DISCOVERED_SCHEMAS.values():
if v().name == schema_name:
cls = v
break

if not cls:
msg = f"Schema {schema_name} not found"
LOGGER.exception(msg)
raise InvalidSchemaError(msg)

name = SCHEMAS[schema_name]

if '.' in name: # dotted path
packagename, classname = name.rsplit('.', 1)
else:
raise InvalidSchemaError(f'Schema path {name} not found')

LOGGER.debug(f'package name: {packagename}')
LOGGER.debug(f'class name: {classname}')

module = importlib.import_module(packagename)
class_ = getattr(module, classname)

return class_()
try:
return cls()
except Exception as exc:
msg = f"Failed to instantiate schema {schema_name}: {exc}"
LOGGER.exception(msg)
raise InvalidSchemaError(msg)


class InvalidSchemaError(Exception):
Expand Down
5 changes: 3 additions & 2 deletions pygeometa/schemas/ogcapi_records/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import os
from typing import Union

from pygeometa import __version__

from pygeometa.core import get_charstring
from pygeometa.helpers import generate_datetime, json_dumps
from pygeometa.schemas.base import BaseOutputSchema
Expand Down Expand Up @@ -247,7 +247,8 @@ def write(self, mcf: dict, stringify: str = True) -> Union[dict, str]:
for value in mcf['distribution'].values():
record['links'].append(self.generate_link(value))

record['generated_by'] = f'pygeometa {__version__}'
from importlib.metadata import version
record['generated_by'] = f'pygeometa {version("pygeometa")}'

if stringify:
return json_dumps(record)
Expand Down
1 change: 1 addition & 0 deletions pygeometa/schemas/wmo_wcmp2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(self):
super().__init__()

self.description = description
self.name = 'wmo-wcmp2'

def write(self, mcf: dict, stringify: str = True) -> Union[dict, str]:
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ def test_transform_metadata(self):
"""test metadata transform"""

with open(get_abspath('md-SMJP01RJTD-gmd.xml')) as fh:
m = transform_metadata('iso19139', 'oarec-record', fh.read())
m = transform_metadata('autodetect', 'oarec-record', fh.read())

m = json.loads(m)
self.assertEqual(
Expand Down