diff --git a/pydantic/_internal/_generate_schema.py b/pydantic/_internal/_generate_schema.py index 00245975b43..e6bf6fcfaa0 100644 --- a/pydantic/_internal/_generate_schema.py +++ b/pydantic/_internal/_generate_schema.py @@ -480,6 +480,53 @@ def ser_ip(ip: Any, info: core_schema.SerializationInfo) -> str | IpType: }, ) + def _path_schema(self, tp: Any, path_type: Any) -> CoreSchema: + if tp is os.PathLike and (path_type not in {str, bytes} and not _typing_extra.is_any(path_type)): + raise PydanticUserError( + '`os.PathLike` can only be used with `str`, `bytes` or `Any`', code='schema-for-unknown-type' + ) + + path_constructor = pathlib.PurePath if tp is os.PathLike else tp + constrained_schema = core_schema.bytes_schema() if (path_type is bytes) else core_schema.str_schema() + + def path_validator(input_value: str | bytes) -> os.PathLike[Any]: # type: ignore + try: + if path_type is bytes: + if isinstance(input_value, bytes): + try: + input_value = input_value.decode() + except UnicodeDecodeError as e: + raise PydanticCustomError('bytes_type', 'Input must be valid bytes') from e + else: + raise PydanticCustomError('bytes_type', 'Input must be bytes') + elif not isinstance(input_value, str): + raise PydanticCustomError('path_type', 'Input is not a valid path') + + return path_constructor(input_value) # type: ignore + except TypeError as e: + raise PydanticCustomError('path_type', 'Input is not a valid path') from e + + instance_schema = core_schema.json_or_python_schema( + json_schema=core_schema.no_info_after_validator_function(path_validator, constrained_schema), + python_schema=core_schema.is_instance_schema(tp), + ) + + schema = core_schema.lax_or_strict_schema( + lax_schema=core_schema.union_schema( + [ + instance_schema, + core_schema.no_info_after_validator_function(path_validator, constrained_schema), + ], + custom_error_type='path_type', + custom_error_message=f'Input is not a valid path for {tp}', + strict=True, + ), + strict_schema=instance_schema, + serialization=core_schema.to_string_ser_schema(), + metadata={'pydantic_js_functions': [lambda source, handler: {**handler(source), 'format': 'path'}]}, + ) + return schema + def _fraction_schema(self) -> CoreSchema: """Support for [`fractions.Fraction`][fractions.Fraction].""" from ._validators import fraction_validator @@ -944,6 +991,8 @@ def match_type(self, obj: Any) -> core_schema.CoreSchema: # noqa: C901 return self._sequence_schema(Any) elif obj in DICT_TYPES: return self._dict_schema(Any, Any) + elif obj in PATH_TYPES: + return self._path_schema(obj, Any) elif _typing_extra.is_type_alias_type(obj): return self._type_alias_type_schema(obj) elif obj is type: @@ -1022,6 +1071,8 @@ def _match_generic_type(self, obj: Any, origin: Any) -> CoreSchema: # noqa: C90 return self._frozenset_schema(self._get_first_arg_or_any(obj)) elif origin in DICT_TYPES: return self._dict_schema(*self._get_first_two_args_or_any(obj)) + elif origin in PATH_TYPES: + return self._path_schema(origin, self._get_first_arg_or_any(obj)) elif is_typeddict(origin): return self._typed_dict_schema(obj, origin) elif origin in (typing.Type, type): @@ -1999,7 +2050,6 @@ def _get_prepare_pydantic_annotations_for_known_type( from ._std_types_schema import ( deque_schema_prepare_pydantic_annotations, mapping_like_prepare_pydantic_annotations, - path_schema_prepare_pydantic_annotations, ) # Check for hashability @@ -2014,9 +2064,7 @@ def _get_prepare_pydantic_annotations_for_known_type( # not always called from match_type, but sometimes from _apply_annotations obj_origin = get_origin(obj) or obj - if obj_origin in PATH_TYPES: - return path_schema_prepare_pydantic_annotations(obj, annotations) - elif obj_origin in DEQUE_TYPES: + if obj_origin in DEQUE_TYPES: return deque_schema_prepare_pydantic_annotations(obj, annotations) elif obj_origin in MAPPING_TYPES: return mapping_like_prepare_pydantic_annotations(obj, annotations) diff --git a/pydantic/_internal/_std_types_schema.py b/pydantic/_internal/_std_types_schema.py index 84a7a4a4225..c3455349847 100644 --- a/pydantic/_internal/_std_types_schema.py +++ b/pydantic/_internal/_std_types_schema.py @@ -11,7 +11,6 @@ import collections import collections.abc import dataclasses -import os import typing from functools import partial from typing import Any, Callable, Iterable, Tuple, TypeVar, cast @@ -19,20 +18,17 @@ import typing_extensions from pydantic_core import ( CoreSchema, - PydanticCustomError, core_schema, ) from typing_extensions import get_args, get_origin from pydantic._internal._serializers import serialize_sequence_via_list from pydantic.errors import PydanticSchemaGenerationError -from pydantic.types import Strict -from ..json_schema import JsonSchemaValue from . import _known_annotated_metadata, _typing_extra from ._import_utils import import_cached_field_info from ._internal_dataclass import slots_true -from ._schema_generation_shared import GetCoreSchemaHandler, GetJsonSchemaHandler +from ._schema_generation_shared import GetCoreSchemaHandler FieldInfo = import_cached_field_info() @@ -42,110 +38,6 @@ StdSchemaFunction = Callable[[GenerateSchema, type[Any]], core_schema.CoreSchema] -@dataclasses.dataclass(**slots_true) -class InnerSchemaValidator: - """Use a fixed CoreSchema, avoiding interference from outward annotations.""" - - core_schema: CoreSchema - js_schema: JsonSchemaValue | None = None - js_core_schema: CoreSchema | None = None - js_schema_update: JsonSchemaValue | None = None - - def __get_pydantic_json_schema__(self, _schema: CoreSchema, handler: GetJsonSchemaHandler) -> JsonSchemaValue: - if self.js_schema is not None: - return self.js_schema - js_schema = handler(self.js_core_schema or self.core_schema) - if self.js_schema_update is not None: - js_schema.update(self.js_schema_update) - return js_schema - - def __get_pydantic_core_schema__(self, _source_type: Any, _handler: GetCoreSchemaHandler) -> CoreSchema: - return self.core_schema - - -def path_schema_prepare_pydantic_annotations( - source_type: Any, annotations: Iterable[Any] -) -> tuple[Any, list[Any]] | None: - import pathlib - - orig_source_type: Any = get_origin(source_type) or source_type - if ( - (source_type_args := get_args(source_type)) - and orig_source_type is os.PathLike - and source_type_args[0] not in {str, bytes, Any} - ): - return None - - if orig_source_type not in { - os.PathLike, - pathlib.Path, - pathlib.PurePath, - pathlib.PosixPath, - pathlib.PurePosixPath, - pathlib.PureWindowsPath, - }: - return None - - metadata, remaining_annotations = _known_annotated_metadata.collect_known_metadata(annotations) - _known_annotated_metadata.check_metadata(metadata, _known_annotated_metadata.STR_CONSTRAINTS, orig_source_type) - - is_first_arg_byte = source_type_args and source_type_args[0] is bytes - construct_path = pathlib.PurePath if orig_source_type is os.PathLike else orig_source_type - constrained_schema = ( - core_schema.bytes_schema(**metadata) if is_first_arg_byte else core_schema.str_schema(**metadata) - ) - - def path_validator(input_value: str | bytes) -> os.PathLike[Any]: # type: ignore - try: - if is_first_arg_byte: - if isinstance(input_value, bytes): - try: - input_value = input_value.decode() - except UnicodeDecodeError as e: - raise PydanticCustomError('bytes_type', 'Input must be valid bytes') from e - else: - raise PydanticCustomError('bytes_type', 'Input must be bytes') - elif not isinstance(input_value, str): - raise PydanticCustomError('path_type', 'Input is not a valid path') - - return construct_path(input_value) - except TypeError as e: - raise PydanticCustomError('path_type', 'Input is not a valid path') from e - - instance_schema = core_schema.json_or_python_schema( - json_schema=core_schema.no_info_after_validator_function(path_validator, constrained_schema), - python_schema=core_schema.is_instance_schema(orig_source_type), - ) - - strict: bool | None = None - for annotation in annotations: - if isinstance(annotation, Strict): - strict = annotation.strict - - schema = core_schema.lax_or_strict_schema( - lax_schema=core_schema.union_schema( - [ - instance_schema, - core_schema.no_info_after_validator_function(path_validator, constrained_schema), - ], - custom_error_type='path_type', - custom_error_message=f'Input is not a valid path for {orig_source_type}', - strict=True, - ), - strict_schema=instance_schema, - serialization=core_schema.to_string_ser_schema(), - strict=strict, - ) - - return ( - orig_source_type, - [ - InnerSchemaValidator(schema, js_core_schema=constrained_schema, js_schema_update={'format': 'path'}), - *remaining_annotations, - ], - ) - - def deque_validator( input_value: Any, handler: core_schema.ValidatorFunctionWrapHandler, maxlen: None | int ) -> collections.deque[Any]: diff --git a/tests/test_types.py b/tests/test_types.py index ff877d8d1b9..19f524e5aa3 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -3498,15 +3498,6 @@ class Model(BaseModel): assert Model.model_validate_json(json.dumps({'foo': str(value)})).foo == result -def test_path_validation_constrained(): - ta = TypeAdapter(Annotated[Path, Field(min_length=9, max_length=20)]) - with pytest.raises(ValidationError): - ta.validate_python('/short') - with pytest.raises(ValidationError): - ta.validate_python('/' + 'long' * 100) - assert ta.validate_python('/just/right/enough') == Path('/just/right/enough') - - def test_path_like(): class Model(BaseModel): foo: os.PathLike