Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ For the FileIO there are several configuration options available:
| s3.force-virtual-addressing | False | Whether to use virtual addressing of buckets. If true, then virtual addressing is always enabled. If false, then virtual addressing is only enabled if endpoint_override is empty. This can be used for non-AWS backends that only support virtual hosted-style access. |
| s3.retry-strategy-impl | None | Ability to set a custom S3 retry strategy. A full path to a class needs to be given that extends the [S3RetryStrategy](https://github.com/apache/arrow/blob/639201bfa412db26ce45e73851432018af6c945e/python/pyarrow/_s3fs.pyx#L110) base class. |
| s3.anonymous | True | Configure whether to use anonymous connection. If False (default), uses key/secret if configured or boto's credential resolver. |
| s3.access-point.\<bucket\> | my-ap-alias-s3alias | Configure an S3 access point alias for a specific bucket. Enables cross-account access via access points. The alias (format: `<name>-<account-id>-s3alias`) replaces the bucket name in S3 paths. |

<!-- markdown-link-check-enable-->

Expand Down
7 changes: 3 additions & 4 deletions pyiceberg/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@
from abc import ABC, abstractmethod
from io import SEEK_SET
from types import TracebackType
from typing import (
Protocol,
runtime_checkable,
)
from typing import Protocol, runtime_checkable
from urllib.parse import urlparse

from pyiceberg.typedef import EMPTY_DICT, Properties
Expand Down Expand Up @@ -67,6 +64,8 @@
S3_ROLE_SESSION_NAME = "s3.role-session-name"
S3_FORCE_VIRTUAL_ADDRESSING = "s3.force-virtual-addressing"
S3_RETRY_STRATEGY_IMPL = "s3.retry-strategy-impl"
# Prefix for per-bucket access point config: s3.access-point.<bucket> = <access-point-alias>
S3_ACCESS_POINT_PREFIX = "s3.access-point."
HDFS_HOST = "hdfs.host"
HDFS_PORT = "hdfs.port"
HDFS_USER = "hdfs.user"
Expand Down
48 changes: 45 additions & 3 deletions pyiceberg/io/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
HF_ENDPOINT,
HF_TOKEN,
S3_ACCESS_KEY_ID,
S3_ACCESS_POINT_PREFIX,
S3_ANONYMOUS,
S3_CONNECT_TIMEOUT,
S3_ENDPOINT,
Expand Down Expand Up @@ -419,6 +420,29 @@ def __init__(self, properties: Properties):
self._thread_locals = threading.local()
super().__init__(properties=properties)

def _resolve_s3_access_point(self, scheme: str, bucket: str) -> str | None:
"""Resolve S3 access point alias for a bucket if configured.

For cross-account access, S3 paths need to use access point aliases instead of bucket names.
Access point aliases work like bucket names and are in the format: <name>-<account-id>-s3alias
Config format: s3.access-point.<bucket-name> = <access-point-alias>

Args:
scheme: The URI scheme (s3, s3a, s3n)
bucket: The bucket name from the original URI

Returns:
The access point alias if configured, None otherwise
"""
if scheme not in {"s3", "s3a", "s3n"}:
return None

access_point_key = f"{S3_ACCESS_POINT_PREFIX}{bucket}"
if access_point_alias := self.properties.get(access_point_key):
logger.debug("Resolving bucket '%s' to access point alias: %s", bucket, access_point_alias)
return access_point_alias
return None

def new_input(self, location: str) -> FsspecInputFile:
"""Get an FsspecInputFile instance to read bytes from the file at the given location.

Expand All @@ -430,7 +454,13 @@ def new_input(self, location: str) -> FsspecInputFile:
"""
uri = urlparse(location)
fs = self.get_fs(uri.scheme)
return FsspecInputFile(location=location, fs=fs)

# Resolve S3 access point if configured
resolved_location = location
if access_point_alias := self._resolve_s3_access_point(uri.scheme, uri.netloc):
resolved_location = f"{uri.scheme}://{access_point_alias}{uri.path}"

return FsspecInputFile(location=resolved_location, fs=fs)

def new_output(self, location: str) -> FsspecOutputFile:
"""Get an FsspecOutputFile instance to write bytes to the file at the given location.
Expand All @@ -443,7 +473,13 @@ def new_output(self, location: str) -> FsspecOutputFile:
"""
uri = urlparse(location)
fs = self.get_fs(uri.scheme)
return FsspecOutputFile(location=location, fs=fs)

# Resolve S3 access point if configured
resolved_location = location
if access_point_alias := self._resolve_s3_access_point(uri.scheme, uri.netloc):
resolved_location = f"{uri.scheme}://{access_point_alias}{uri.path}"

return FsspecOutputFile(location=resolved_location, fs=fs)

def delete(self, location: str | InputFile | OutputFile) -> None:
"""Delete the file at the given location.
Expand All @@ -460,7 +496,13 @@ def delete(self, location: str | InputFile | OutputFile) -> None:

uri = urlparse(str_location)
fs = self.get_fs(uri.scheme)
fs.rm(str_location)

# Resolve S3 access point if configured
resolved_location = str_location
if access_point_alias := self._resolve_s3_access_point(uri.scheme, uri.netloc):
resolved_location = f"{uri.scheme}://{access_point_alias}{uri.path}"

fs.rm(resolved_location)

def get_fs(self, scheme: str) -> AbstractFileSystem:
"""Get a filesystem for a specific scheme, cached per thread."""
Expand Down
77 changes: 48 additions & 29 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,7 @@
from dataclasses import dataclass
from enum import Enum
from functools import lru_cache, singledispatch
from typing import (
TYPE_CHECKING,
Any,
Generic,
TypeVar,
cast,
)
from typing import TYPE_CHECKING, Any, Generic, TypeVar, cast
from urllib.parse import urlparse

import pyarrow as pa
Expand All @@ -58,22 +52,13 @@
import pyarrow.parquet as pq
from pyarrow import ChunkedArray
from pyarrow._s3fs import S3RetryStrategy
from pyarrow.fs import (
FileInfo,
FileSystem,
FileType,
)
from pyarrow.fs import FileInfo, FileSystem, FileType

from pyiceberg.conversions import to_bytes
from pyiceberg.exceptions import ResolveError
from pyiceberg.expressions import AlwaysTrue, BooleanExpression, BoundIsNaN, BoundIsNull, BoundTerm, Not, Or
from pyiceberg.expressions.literals import Literal
from pyiceberg.expressions.visitors import (
BoundBooleanExpressionVisitor,
bind,
extract_field_ids,
translate_column_names,
)
from pyiceberg.expressions.visitors import BoundBooleanExpressionVisitor, bind, extract_field_ids, translate_column_names
from pyiceberg.expressions.visitors import visit as boolean_expression_visit
from pyiceberg.io import (
ADLS_ACCOUNT_KEY,
Expand Down Expand Up @@ -101,6 +86,7 @@
HDFS_PORT,
HDFS_USER,
S3_ACCESS_KEY_ID,
S3_ACCESS_POINT_PREFIX,
S3_ANONYMOUS,
S3_CONNECT_TIMEOUT,
S3_ENDPOINT,
Expand All @@ -120,11 +106,7 @@
OutputFile,
OutputStream,
)
from pyiceberg.manifest import (
DataFile,
DataFileContent,
FileFormat,
)
from pyiceberg.manifest import DataFile, DataFileContent, FileFormat
from pyiceberg.partitioning import PartitionField, PartitionFieldValue, PartitionKey, PartitionSpec, partition_record_value
from pyiceberg.schema import (
PartnerAccessor,
Expand Down Expand Up @@ -607,6 +589,34 @@ def _initialize_gcs_fs(self) -> FileSystem:
def _initialize_local_fs(self) -> FileSystem:
return PyArrowLocalFileSystem()

def _resolve_s3_access_point(self, scheme: str, netloc: str, path_suffix: str, original_path: str) -> tuple[str, str]:
"""Resolve S3 access point alias for a bucket if configured.

For cross-account access, S3 paths need to use access point aliases instead of bucket names.
Access point aliases work like bucket names and are in the format: <name>-<account-id>-s3alias
Config format: s3.access-point.<bucket-name> = <access-point-alias>

Args:
scheme: The URI scheme (s3, s3a, s3n)
netloc: The bucket name from the original URI
path_suffix: The path within the bucket (without bucket name)
original_path: The original path from parse_location (fallback for non-S3)

Returns:
Tuple of (resolved_netloc, resolved_path) where netloc may be replaced with access point alias
"""
if scheme not in {"s3", "s3a", "s3n"}:
return netloc, original_path

# Check for access point alias configuration for this bucket
access_point_key = f"{S3_ACCESS_POINT_PREFIX}{netloc}"
if access_point_alias := self.properties.get(access_point_key):
logger.debug("Resolving bucket '%s' to access point alias: %s", netloc, access_point_alias)
# Replace bucket with access point alias in the path
return access_point_alias, f"{access_point_alias}{path_suffix}"

return netloc, original_path

def new_input(self, location: str) -> PyArrowFile:
"""Get a PyArrowFile instance to read bytes from the file at the given location.

Expand All @@ -617,10 +627,13 @@ def new_input(self, location: str) -> PyArrowFile:
PyArrowFile: A PyArrowFile instance for the given location.
"""
scheme, netloc, path = self.parse_location(location, self.properties)
# For S3, resolve access point if configured
uri = urlparse(location)
resolved_netloc, resolved_path = self._resolve_s3_access_point(scheme, netloc, uri.path, path)
return PyArrowFile(
fs=self.fs_by_scheme(scheme, netloc),
fs=self.fs_by_scheme(scheme, resolved_netloc),
location=location,
path=path,
path=resolved_path,
buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
)

Expand All @@ -634,10 +647,13 @@ def new_output(self, location: str) -> PyArrowFile:
PyArrowFile: A PyArrowFile instance for the given location.
"""
scheme, netloc, path = self.parse_location(location, self.properties)
# For S3, resolve access point if configured
uri = urlparse(location)
resolved_netloc, resolved_path = self._resolve_s3_access_point(scheme, netloc, uri.path, path)
return PyArrowFile(
fs=self.fs_by_scheme(scheme, netloc),
fs=self.fs_by_scheme(scheme, resolved_netloc),
location=location,
path=path,
path=resolved_path,
buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
)

Expand All @@ -656,10 +672,13 @@ def delete(self, location: str | InputFile | OutputFile) -> None:
"""
str_location = location.location if isinstance(location, (InputFile, OutputFile)) else location
scheme, netloc, path = self.parse_location(str_location, self.properties)
fs = self.fs_by_scheme(scheme, netloc)
# For S3, resolve access point if configured
uri = urlparse(str_location)
resolved_netloc, resolved_path = self._resolve_s3_access_point(scheme, netloc, uri.path, path)
fs = self.fs_by_scheme(scheme, resolved_netloc)

try:
fs.delete_file(path)
fs.delete_file(resolved_path)
except FileNotFoundError:
raise
except PermissionError:
Expand Down
101 changes: 101 additions & 0 deletions tests/io/test_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,107 @@ def test_fsspec_unified_session_properties() -> None:
)


def test_fsspec_s3_access_point_resolution_with_config() -> None:
"""Test that S3 bucket names are resolved to access point aliases when configured."""
from pyiceberg.io import S3_ACCESS_POINT_PREFIX

bucket_name = "my-bucket"
access_point_alias = "my-access-point-abc123-s3alias"
properties = {
f"{S3_ACCESS_POINT_PREFIX}{bucket_name}": access_point_alias,
}

fileio = FsspecFileIO(properties=properties)

# Test _resolve_s3_access_point directly
result = fileio._resolve_s3_access_point("s3", bucket_name)

assert result == access_point_alias


def test_fsspec_s3_access_point_resolution_without_config() -> None:
"""Test that S3 bucket names return None when no access point is configured."""
bucket_name = "my-bucket"
fileio = FsspecFileIO(properties={})

result = fileio._resolve_s3_access_point("s3", bucket_name)

assert result is None


def test_fsspec_s3_access_point_resolution_non_s3_scheme() -> None:
"""Test that non-S3 schemes are not affected by access point configuration."""
from pyiceberg.io import S3_ACCESS_POINT_PREFIX

bucket_name = "my-bucket"
access_point_alias = "my-access-point-abc123-s3alias"
properties = {
f"{S3_ACCESS_POINT_PREFIX}{bucket_name}": access_point_alias,
}

fileio = FsspecFileIO(properties=properties)

# Test with non-S3 scheme (should return None)
result = fileio._resolve_s3_access_point("gs", bucket_name)

assert result is None


def test_fsspec_s3_access_point_resolution_s3a_scheme() -> None:
"""Test that s3a scheme also resolves access points."""
from pyiceberg.io import S3_ACCESS_POINT_PREFIX

bucket_name = "my-bucket"
access_point_alias = "my-access-point-abc123-s3alias"
properties = {
f"{S3_ACCESS_POINT_PREFIX}{bucket_name}": access_point_alias,
}

fileio = FsspecFileIO(properties=properties)

result = fileio._resolve_s3_access_point("s3a", bucket_name)

assert result == access_point_alias


def test_fsspec_s3_access_point_new_input_uses_resolved_location() -> None:
"""Test that new_input uses the resolved access point location."""
from pyiceberg.io import S3_ACCESS_POINT_PREFIX

bucket_name = "my-bucket"
access_point_alias = "my-access-point-abc123-s3alias"
properties = {
f"{S3_ACCESS_POINT_PREFIX}{bucket_name}": access_point_alias,
"s3.region": "us-east-1",
}

with mock.patch("s3fs.S3FileSystem"):
fileio = FsspecFileIO(properties=properties)
input_file = fileio.new_input(f"s3://{bucket_name}/path/to/file.parquet")

# The location should be rewritten to use the access point alias
assert input_file.location == f"s3://{access_point_alias}/path/to/file.parquet"


def test_fsspec_s3_access_point_new_output_uses_resolved_location() -> None:
"""Test that new_output uses the resolved access point location."""
from pyiceberg.io import S3_ACCESS_POINT_PREFIX

bucket_name = "my-bucket"
access_point_alias = "my-access-point-abc123-s3alias"
properties = {
f"{S3_ACCESS_POINT_PREFIX}{bucket_name}": access_point_alias,
"s3.region": "us-east-1",
}

with mock.patch("s3fs.S3FileSystem"):
fileio = FsspecFileIO(properties=properties)
output_file = fileio.new_output(f"s3://{bucket_name}/path/to/file.parquet")

# The location should be rewritten to use the access point alias
assert output_file.location == f"s3://{access_point_alias}/path/to/file.parquet"


@pytest.mark.adls
def test_fsspec_new_input_file_adls(adls_fsspec_fileio: FsspecFileIO) -> None:
"""Test creating a new input file from an fsspec file-io"""
Expand Down
Loading