diff --git a/pyiceberg/manifest.py b/pyiceberg/manifest.py index 4c68f5e3d5..6cae4dee51 100644 --- a/pyiceberg/manifest.py +++ b/pyiceberg/manifest.py @@ -50,7 +50,9 @@ PrimitiveType, StringType, StructType, + strtobool, ) +from pyiceberg.utils.config import Config UNASSIGNED_SEQ = -1 DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024 @@ -891,13 +893,79 @@ def __hash__(self) -> int: return hash(self.manifest_path) -# Global cache for ManifestFile objects, keyed by manifest_path. -# This deduplicates ManifestFile objects across manifest lists, which commonly -# share manifests after append operations. -_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=128) +class _ManifestCacheManager: + """Manages the manifest cache with lazy initialization from config.""" -# Lock for thread-safe cache access -_manifest_cache_lock = threading.RLock() + _DEFAULT_SIZE = 128 + + def __init__(self) -> None: + self._cache: LRUCache[str, ManifestFile] | None = None + self._initialized = False + self._lock = threading.RLock() + + def get_cache(self) -> LRUCache[str, ManifestFile] | None: + """Return the cache if enabled, else None. Initializes from config on first call.""" + with self._lock: + if self._initialized: + return self._cache + + config = Config().config + + # Extract nested config + manifest_val = config.get("manifest") + manifest_config: dict[str, Any] = manifest_val if isinstance(manifest_val, dict) else {} + cache_val = manifest_config.get("cache") + cache_config: dict[str, Any] = cache_val if isinstance(cache_val, dict) else {} + + # Parse and validate enabled flag + enabled_raw = cache_config.get("enabled") + enabled = True + if enabled_raw is not None: + try: + enabled = bool(strtobool(str(enabled_raw))) + except (ValueError, AttributeError) as err: + raise ValueError( + f"manifest.cache.enabled should be a boolean or left unset. Current value: {enabled_raw}" + ) from err + + # Parse and validate cache size + size_raw = cache_config.get("size") + size = self._DEFAULT_SIZE + if size_raw is not None: + try: + size = int(str(size_raw)) + except (ValueError, TypeError) as err: + raise ValueError( + f"manifest.cache.size should be a positive integer or left unset. Current value: {size_raw}" + ) from err + if size < 1: + raise ValueError(f"manifest.cache.size must be >= 1. Current value: {size}") + + if enabled: + self._cache = LRUCache(maxsize=size) + self._initialized = True + return self._cache + + def clear(self) -> None: + """Clear the cache contents. No-op if cache is disabled.""" + cache = self.get_cache() + if cache is not None: + with self._lock: + cache.clear() + + +# Module-level cache manager instance +_manifest_cache_manager = _ManifestCacheManager() + + +def _get_manifest_cache() -> LRUCache[str, ManifestFile] | None: + """Return the manifest cache if enabled, else None. Initializes from config on first call.""" + return _manifest_cache_manager.get_cache() + + +def clear_manifest_cache() -> None: + """Clear the manifest cache. No-op if cache is disabled.""" + _manifest_cache_manager.clear() def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]: @@ -927,14 +995,18 @@ def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]: file = io.new_input(manifest_list) manifest_files = list(read_manifest_list(file)) + cache = _get_manifest_cache() + if cache is None: + return tuple(manifest_files) + result = [] - with _manifest_cache_lock: + with _manifest_cache_manager._lock: for manifest_file in manifest_files: manifest_path = manifest_file.manifest_path - if manifest_path in _manifest_cache: - result.append(_manifest_cache[manifest_path]) + if manifest_path in cache: + result.append(cache[manifest_path]) else: - _manifest_cache[manifest_path] = manifest_file + cache[manifest_path] = manifest_file result.append(manifest_file) return tuple(result) diff --git a/tests/benchmark/test_memory_benchmark.py b/tests/benchmark/test_memory_benchmark.py index 82454c8574..1c2ccd5e13 100644 --- a/tests/benchmark/test_memory_benchmark.py +++ b/tests/benchmark/test_memory_benchmark.py @@ -33,7 +33,7 @@ import pytest from pyiceberg.catalog.memory import InMemoryCatalog -from pyiceberg.manifest import _manifest_cache +from pyiceberg.manifest import _get_manifest_cache, clear_manifest_cache def generate_test_dataframe() -> pa.Table: @@ -64,7 +64,7 @@ def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> InMemoryCatalog: @pytest.fixture(autouse=True) def clear_caches() -> None: """Clear caches before each test.""" - _manifest_cache.clear() + clear_manifest_cache() gc.collect() @@ -95,7 +95,8 @@ def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None: # Sample memory at intervals if (i + 1) % 10 == 0: current, _ = tracemalloc.get_traced_memory() - cache_size = len(_manifest_cache) + cache = _get_manifest_cache() + cache_size = len(cache) if cache is not None else 0 memory_samples.append((i + 1, current, cache_size)) print(f" Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache entries={cache_size}") @@ -150,13 +151,14 @@ def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) -> gc.collect() before_clear_memory, _ = tracemalloc.get_traced_memory() - cache_size_before = len(_manifest_cache) + cache = _get_manifest_cache() + cache_size_before = len(cache) if cache is not None else 0 print(f" Memory before clear: {before_clear_memory / 1024:.1f} KB") print(f" Cache size: {cache_size_before}") # Phase 2: Clear cache and GC print("\nPhase 2: Clearing cache and running GC...") - _manifest_cache.clear() + clear_manifest_cache() gc.collect() gc.collect() # Multiple GC passes for thorough cleanup @@ -191,7 +193,9 @@ def test_manifest_cache_deduplication_efficiency() -> None: FileFormat, ManifestEntry, ManifestEntryStatus, + _get_manifest_cache, _manifests, + clear_manifest_cache, write_manifest, write_manifest_list, ) @@ -245,7 +249,7 @@ def test_manifest_cache_deduplication_efficiency() -> None: num_lists = 10 print(f"Creating {num_lists} manifest lists with overlapping manifests...") - _manifest_cache.clear() + clear_manifest_cache() for i in range(num_lists): list_path = f"{tmp_dir}/manifest-list_{i}.avro" @@ -265,7 +269,8 @@ def test_manifest_cache_deduplication_efficiency() -> None: _manifests(io, list_path) # Analyze cache efficiency - cache_entries = len(_manifest_cache) + cache = _get_manifest_cache() + cache_entries = len(cache) if cache is not None else 0 # List i contains manifests 0..i, so only the first num_lists manifests are actually used manifests_actually_used = num_lists diff --git a/tests/utils/test_manifest.py b/tests/utils/test_manifest.py index 3862d6b682..91ae266d45 100644 --- a/tests/utils/test_manifest.py +++ b/tests/utils/test_manifest.py @@ -16,10 +16,12 @@ # under the License. # pylint: disable=redefined-outer-name,arguments-renamed,fixme from tempfile import TemporaryDirectory +from unittest import mock import fastavro import pytest +import pyiceberg.manifest as manifest_module from pyiceberg.avro.codecs import AvroCompressionCodec from pyiceberg.io import load_file_io from pyiceberg.io.pyarrow import PyArrowFileIO @@ -32,8 +34,9 @@ ManifestEntryStatus, ManifestFile, PartitionFieldSummary, - _manifest_cache, + _get_manifest_cache, _manifests, + clear_manifest_cache, read_manifest_list, write_manifest, write_manifest_list, @@ -46,9 +49,10 @@ @pytest.fixture(autouse=True) -def clear_global_manifests_cache() -> None: - # Clear the global cache before each test - _manifest_cache.clear() +def reset_global_manifests_cache() -> None: + # Reset cache state before each test so config is re-read + manifest_module._manifest_cache_manager._cache = None + manifest_module._manifest_cache_manager._initialized = False def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: dict[str, str]) -> None: @@ -804,9 +808,9 @@ def test_manifest_cache_deduplicates_manifest_files() -> None: # Verify cache size - should only have 3 unique ManifestFile objects # instead of 1 + 2 + 3 = 6 objects as with the old approach - assert len(_manifest_cache) == 3, ( - f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(_manifest_cache)}" - ) + cache = _get_manifest_cache() + assert cache is not None, "Manifest cache should be enabled for this test" + assert len(cache) == 3, f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(cache)}" def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: @@ -879,9 +883,11 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: # With the new approach, we should have exactly N objects # Verify cache has exactly N unique entries - assert len(_manifest_cache) == num_manifests, ( + cache = _get_manifest_cache() + assert cache is not None, "Manifest cache should be enabled for this test" + assert len(cache) == num_manifests, ( f"Cache should contain exactly {num_manifests} ManifestFile objects, " - f"but has {len(_manifest_cache)}. " + f"but has {len(cache)}. " f"Old approach would have {num_manifests * (num_manifests + 1) // 2} objects." ) @@ -897,3 +903,117 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None: if len(references) > 1: for ref in references[1:]: assert ref is references[0], f"All references to manifest {i} should be the same object instance" + + +def test_clear_manifest_cache() -> None: + """Test that clear_manifest_cache() clears cache entries while keeping cache enabled.""" + io = PyArrowFileIO() + + with TemporaryDirectory() as tmp_dir: + schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True)) + spec = UNPARTITIONED_PARTITION_SPEC + + # Create a manifest file + manifest_path = f"{tmp_dir}/manifest.avro" + with write_manifest( + format_version=2, + spec=spec, + schema=schema, + output_file=io.new_output(manifest_path), + snapshot_id=1, + avro_compression="zstandard", + ) as writer: + data_file = DataFile.from_args( + content=DataFileContent.DATA, + file_path=f"{tmp_dir}/data.parquet", + file_format=FileFormat.PARQUET, + partition=Record(), + record_count=100, + file_size_in_bytes=1000, + ) + writer.add_entry( + ManifestEntry.from_args( + status=ManifestEntryStatus.ADDED, + snapshot_id=1, + data_file=data_file, + ) + ) + manifest_file = writer.to_manifest_file() + + # Create a manifest list + list_path = f"{tmp_dir}/manifest-list.avro" + with write_manifest_list( + format_version=2, + output_file=io.new_output(list_path), + snapshot_id=1, + parent_snapshot_id=None, + sequence_number=1, + avro_compression="zstandard", + ) as list_writer: + list_writer.add_manifests([manifest_file]) + + # Populate the cache + _manifests(io, list_path) + + # Verify cache has entries + cache = _get_manifest_cache() + assert cache is not None, "Cache should be enabled" + assert len(cache) > 0, "Cache should have entries after reading manifests" + + # Clear the cache + clear_manifest_cache() + + # Verify cache is empty but still enabled + cache_after = _get_manifest_cache() + assert cache_after is not None, "Cache should still be enabled after clear" + assert len(cache_after) == 0, "Cache should be empty after clear" + + +@pytest.mark.parametrize( + "env_vars,expected_enabled,expected_size", + [ + ({}, True, 128), # defaults + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "true"}, True, 128), + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "false"}, False, 128), + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "64"}, True, 64), + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "256"}, True, 256), + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "false", "PYICEBERG_MANIFEST__CACHE__SIZE": "64"}, False, 64), + ], +) +def test_manifest_cache_config_valid_values(env_vars: dict[str, str], expected_enabled: bool, expected_size: int) -> None: + """Test that valid config values are applied correctly.""" + import os + + with mock.patch.dict(os.environ, env_vars, clear=False): + # Reset cache state so config is re-read + manifest_module._manifest_cache_manager._cache = None + manifest_module._manifest_cache_manager._initialized = False + cache = _get_manifest_cache() + + if expected_enabled: + assert cache is not None, "Cache should be enabled" + assert cache.maxsize == expected_size, f"Cache size should be {expected_size}" + else: + assert cache is None, "Cache should be disabled" + + +@pytest.mark.parametrize( + "env_vars,expected_error_substring", + [ + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "maybe"}, "manifest.cache.enabled should be a boolean"), + ({"PYICEBERG_MANIFEST__CACHE__ENABLED": "invalid"}, "manifest.cache.enabled should be a boolean"), + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "abc"}, "manifest.cache.size should be a positive integer"), + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "0"}, "manifest.cache.size must be >= 1"), + ({"PYICEBERG_MANIFEST__CACHE__SIZE": "-5"}, "manifest.cache.size must be >= 1"), + ], +) +def test_manifest_cache_config_invalid_values(env_vars: dict[str, str], expected_error_substring: str) -> None: + """Test that invalid config values raise ValueError with appropriate message.""" + import os + + with mock.patch.dict(os.environ, env_vars, clear=False): + # Reset cache state so config is re-read + manifest_module._manifest_cache_manager._cache = None + manifest_module._manifest_cache_manager._initialized = False + with pytest.raises(ValueError, match=expected_error_substring): + _get_manifest_cache()