Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 82 additions & 10 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@
PrimitiveType,
StringType,
StructType,
strtobool,
)
from pyiceberg.utils.config import Config

UNASSIGNED_SEQ = -1
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
Expand Down Expand Up @@ -891,13 +893,79 @@ def __hash__(self) -> int:
return hash(self.manifest_path)


# Global cache for ManifestFile objects, keyed by manifest_path.
# This deduplicates ManifestFile objects across manifest lists, which commonly
# share manifests after append operations.
_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=128)
class _ManifestCacheManager:
"""Manages the manifest cache with lazy initialization from config."""

# Lock for thread-safe cache access
_manifest_cache_lock = threading.RLock()
_DEFAULT_SIZE = 128

def __init__(self) -> None:
self._cache: LRUCache[str, ManifestFile] | None = None
self._initialized = False
self._lock = threading.RLock()

def get_cache(self) -> LRUCache[str, ManifestFile] | None:
"""Return the cache if enabled, else None. Initializes from config on first call."""
with self._lock:
if self._initialized:
return self._cache

config = Config().config

# Extract nested config
manifest_val = config.get("manifest")
manifest_config: dict[str, Any] = manifest_val if isinstance(manifest_val, dict) else {}
cache_val = manifest_config.get("cache")
cache_config: dict[str, Any] = cache_val if isinstance(cache_val, dict) else {}

# Parse and validate enabled flag
enabled_raw = cache_config.get("enabled")
enabled = True
if enabled_raw is not None:
try:
enabled = bool(strtobool(str(enabled_raw)))
except (ValueError, AttributeError) as err:
raise ValueError(
f"manifest.cache.enabled should be a boolean or left unset. Current value: {enabled_raw}"
) from err

# Parse and validate cache size
size_raw = cache_config.get("size")
size = self._DEFAULT_SIZE
if size_raw is not None:
try:
size = int(str(size_raw))
except (ValueError, TypeError) as err:
raise ValueError(
f"manifest.cache.size should be a positive integer or left unset. Current value: {size_raw}"
) from err
if size < 1:
raise ValueError(f"manifest.cache.size must be >= 1. Current value: {size}")

if enabled:
self._cache = LRUCache(maxsize=size)
self._initialized = True
return self._cache

def clear(self) -> None:
"""Clear the cache contents. No-op if cache is disabled."""
cache = self.get_cache()
if cache is not None:
with self._lock:
cache.clear()


# Module-level cache manager instance
_manifest_cache_manager = _ManifestCacheManager()


def _get_manifest_cache() -> LRUCache[str, ManifestFile] | None:
"""Return the manifest cache if enabled, else None. Initializes from config on first call."""
return _manifest_cache_manager.get_cache()


def clear_manifest_cache() -> None:
"""Clear the manifest cache. No-op if cache is disabled."""
_manifest_cache_manager.clear()


def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
Expand Down Expand Up @@ -927,14 +995,18 @@ def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
file = io.new_input(manifest_list)
manifest_files = list(read_manifest_list(file))

cache = _get_manifest_cache()
if cache is None:
return tuple(manifest_files)

result = []
with _manifest_cache_lock:
with _manifest_cache_manager._lock:
for manifest_file in manifest_files:
manifest_path = manifest_file.manifest_path
if manifest_path in _manifest_cache:
result.append(_manifest_cache[manifest_path])
if manifest_path in cache:
result.append(cache[manifest_path])
else:
_manifest_cache[manifest_path] = manifest_file
cache[manifest_path] = manifest_file
result.append(manifest_file)

return tuple(result)
Expand Down
19 changes: 12 additions & 7 deletions tests/benchmark/test_memory_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import pytest

from pyiceberg.catalog.memory import InMemoryCatalog
from pyiceberg.manifest import _manifest_cache
from pyiceberg.manifest import _get_manifest_cache, clear_manifest_cache


def generate_test_dataframe() -> pa.Table:
Expand Down Expand Up @@ -64,7 +64,7 @@ def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> InMemoryCatalog:
@pytest.fixture(autouse=True)
def clear_caches() -> None:
"""Clear caches before each test."""
_manifest_cache.clear()
clear_manifest_cache()
gc.collect()


Expand Down Expand Up @@ -95,7 +95,8 @@ def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None:
# Sample memory at intervals
if (i + 1) % 10 == 0:
current, _ = tracemalloc.get_traced_memory()
cache_size = len(_manifest_cache)
cache = _get_manifest_cache()
cache_size = len(cache) if cache is not None else 0

memory_samples.append((i + 1, current, cache_size))
print(f" Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache entries={cache_size}")
Expand Down Expand Up @@ -150,13 +151,14 @@ def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) ->

gc.collect()
before_clear_memory, _ = tracemalloc.get_traced_memory()
cache_size_before = len(_manifest_cache)
cache = _get_manifest_cache()
cache_size_before = len(cache) if cache is not None else 0
print(f" Memory before clear: {before_clear_memory / 1024:.1f} KB")
print(f" Cache size: {cache_size_before}")

# Phase 2: Clear cache and GC
print("\nPhase 2: Clearing cache and running GC...")
_manifest_cache.clear()
clear_manifest_cache()
gc.collect()
gc.collect() # Multiple GC passes for thorough cleanup

Expand Down Expand Up @@ -191,7 +193,9 @@ def test_manifest_cache_deduplication_efficiency() -> None:
FileFormat,
ManifestEntry,
ManifestEntryStatus,
_get_manifest_cache,
_manifests,
clear_manifest_cache,
write_manifest,
write_manifest_list,
)
Expand Down Expand Up @@ -245,7 +249,7 @@ def test_manifest_cache_deduplication_efficiency() -> None:
num_lists = 10
print(f"Creating {num_lists} manifest lists with overlapping manifests...")

_manifest_cache.clear()
clear_manifest_cache()

for i in range(num_lists):
list_path = f"{tmp_dir}/manifest-list_{i}.avro"
Expand All @@ -265,7 +269,8 @@ def test_manifest_cache_deduplication_efficiency() -> None:
_manifests(io, list_path)

# Analyze cache efficiency
cache_entries = len(_manifest_cache)
cache = _get_manifest_cache()
cache_entries = len(cache) if cache is not None else 0
# List i contains manifests 0..i, so only the first num_lists manifests are actually used
manifests_actually_used = num_lists

Expand Down
138 changes: 129 additions & 9 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
# under the License.
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
from tempfile import TemporaryDirectory
from unittest import mock

import fastavro
import pytest

import pyiceberg.manifest as manifest_module
from pyiceberg.avro.codecs import AvroCompressionCodec
from pyiceberg.io import load_file_io
from pyiceberg.io.pyarrow import PyArrowFileIO
Expand All @@ -32,8 +34,9 @@
ManifestEntryStatus,
ManifestFile,
PartitionFieldSummary,
_manifest_cache,
_get_manifest_cache,
_manifests,
clear_manifest_cache,
read_manifest_list,
write_manifest,
write_manifest_list,
Expand All @@ -46,9 +49,10 @@


@pytest.fixture(autouse=True)
def clear_global_manifests_cache() -> None:
# Clear the global cache before each test
_manifest_cache.clear()
def reset_global_manifests_cache() -> None:
# Reset cache state before each test so config is re-read
manifest_module._manifest_cache_manager._cache = None
manifest_module._manifest_cache_manager._initialized = False


def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: dict[str, str]) -> None:
Expand Down Expand Up @@ -804,9 +808,9 @@ def test_manifest_cache_deduplicates_manifest_files() -> None:

# Verify cache size - should only have 3 unique ManifestFile objects
# instead of 1 + 2 + 3 = 6 objects as with the old approach
assert len(_manifest_cache) == 3, (
f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(_manifest_cache)}"
)
cache = _get_manifest_cache()
assert cache is not None, "Manifest cache should be enabled for this test"
assert len(cache) == 3, f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(cache)}"


def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
Expand Down Expand Up @@ -879,9 +883,11 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
# With the new approach, we should have exactly N objects

# Verify cache has exactly N unique entries
assert len(_manifest_cache) == num_manifests, (
cache = _get_manifest_cache()
assert cache is not None, "Manifest cache should be enabled for this test"
assert len(cache) == num_manifests, (
f"Cache should contain exactly {num_manifests} ManifestFile objects, "
f"but has {len(_manifest_cache)}. "
f"but has {len(cache)}. "
f"Old approach would have {num_manifests * (num_manifests + 1) // 2} objects."
)

Expand All @@ -897,3 +903,117 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
if len(references) > 1:
for ref in references[1:]:
assert ref is references[0], f"All references to manifest {i} should be the same object instance"


def test_clear_manifest_cache() -> None:
"""Test that clear_manifest_cache() clears cache entries while keeping cache enabled."""
io = PyArrowFileIO()

with TemporaryDirectory() as tmp_dir:
schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True))
spec = UNPARTITIONED_PARTITION_SPEC

# Create a manifest file
manifest_path = f"{tmp_dir}/manifest.avro"
with write_manifest(
format_version=2,
spec=spec,
schema=schema,
output_file=io.new_output(manifest_path),
snapshot_id=1,
avro_compression="zstandard",
) as writer:
data_file = DataFile.from_args(
content=DataFileContent.DATA,
file_path=f"{tmp_dir}/data.parquet",
file_format=FileFormat.PARQUET,
partition=Record(),
record_count=100,
file_size_in_bytes=1000,
)
writer.add_entry(
ManifestEntry.from_args(
status=ManifestEntryStatus.ADDED,
snapshot_id=1,
data_file=data_file,
)
)
manifest_file = writer.to_manifest_file()

# Create a manifest list
list_path = f"{tmp_dir}/manifest-list.avro"
with write_manifest_list(
format_version=2,
output_file=io.new_output(list_path),
snapshot_id=1,
parent_snapshot_id=None,
sequence_number=1,
avro_compression="zstandard",
) as list_writer:
list_writer.add_manifests([manifest_file])

# Populate the cache
_manifests(io, list_path)

# Verify cache has entries
cache = _get_manifest_cache()
assert cache is not None, "Cache should be enabled"
assert len(cache) > 0, "Cache should have entries after reading manifests"

# Clear the cache
clear_manifest_cache()

# Verify cache is empty but still enabled
cache_after = _get_manifest_cache()
assert cache_after is not None, "Cache should still be enabled after clear"
assert len(cache_after) == 0, "Cache should be empty after clear"


@pytest.mark.parametrize(
"env_vars,expected_enabled,expected_size",
[
({}, True, 128), # defaults
({"PYICEBERG_MANIFEST__CACHE__ENABLED": "true"}, True, 128),
({"PYICEBERG_MANIFEST__CACHE__ENABLED": "false"}, False, 128),
({"PYICEBERG_MANIFEST__CACHE__SIZE": "64"}, True, 64),
({"PYICEBERG_MANIFEST__CACHE__SIZE": "256"}, True, 256),
({"PYICEBERG_MANIFEST__CACHE__ENABLED": "false", "PYICEBERG_MANIFEST__CACHE__SIZE": "64"}, False, 64),
],
)
def test_manifest_cache_config_valid_values(env_vars: dict[str, str], expected_enabled: bool, expected_size: int) -> None:
"""Test that valid config values are applied correctly."""
import os

with mock.patch.dict(os.environ, env_vars, clear=False):
# Reset cache state so config is re-read
manifest_module._manifest_cache_manager._cache = None
manifest_module._manifest_cache_manager._initialized = False
cache = _get_manifest_cache()

if expected_enabled:
assert cache is not None, "Cache should be enabled"
assert cache.maxsize == expected_size, f"Cache size should be {expected_size}"
else:
assert cache is None, "Cache should be disabled"


@pytest.mark.parametrize(
"env_vars,expected_error_substring",
[
({"PYICEBERG_MANIFEST__CACHE__ENABLED": "maybe"}, "manifest.cache.enabled should be a boolean"),
({"PYICEBERG_MANIFEST__CACHE__ENABLED": "invalid"}, "manifest.cache.enabled should be a boolean"),
({"PYICEBERG_MANIFEST__CACHE__SIZE": "abc"}, "manifest.cache.size should be a positive integer"),
({"PYICEBERG_MANIFEST__CACHE__SIZE": "0"}, "manifest.cache.size must be >= 1"),
({"PYICEBERG_MANIFEST__CACHE__SIZE": "-5"}, "manifest.cache.size must be >= 1"),
],
)
def test_manifest_cache_config_invalid_values(env_vars: dict[str, str], expected_error_substring: str) -> None:
"""Test that invalid config values raise ValueError with appropriate message."""
import os

with mock.patch.dict(os.environ, env_vars, clear=False):
# Reset cache state so config is re-read
manifest_module._manifest_cache_manager._cache = None
manifest_module._manifest_cache_manager._initialized = False
with pytest.raises(ValueError, match=expected_error_substring):
_get_manifest_cache()