Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ class ListViewsResponse(IcebergBaseModel):
_PLANNING_RESPONSE_ADAPTER = TypeAdapter(PlanningResponse)


def _is_hadoop_only_config(config: Properties) -> bool:
"""Return True if every key is a Hadoop ``fs.*`` key — pyiceberg has no HadoopFileIO to consume them."""
return bool(config) and all(k.startswith("fs.") for k in config)


class RestCatalog(Catalog):
uri: str
_session: Session
Expand Down Expand Up @@ -440,22 +445,30 @@ def _create_session(self) -> Session:

@staticmethod
def _resolve_storage_credentials(storage_credentials: list[StorageCredential], location: str | None) -> Properties:
"""Resolve the best-matching storage credential by longest prefix match.

Mirrors the Java implementation in S3FileIO.clientForStoragePath() which iterates
over storage credential prefixes and selects the one with the longest match.
"""Pick the longest-prefix storage credential for ``location``.

See: https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
Mirrors Java ``S3FileIO.clientForStoragePath``. Hadoop-only (``fs.*``)
credentials are filtered out since pyiceberg has no HadoopFileIO to
consume them — otherwise a catalog vending both ``fs.*`` and ``s3.*``
bundles per location could strand the FileIO with unusable keys.
"""
if not storage_credentials or not location:
return {}

consumable = [c for c in storage_credentials if not _is_hadoop_only_config(c.config)]

best_match: StorageCredential | None = None
for cred in storage_credentials:
for cred in consumable:
if location.startswith(cred.prefix):
if best_match is None or len(cred.prefix) > len(best_match.prefix):
best_match = cred

# Java S3FileIO falls back to the "s3" ROOT_PREFIX credential; scope
# it to schemes pyarrow's S3FileSystem handles so gs:///abfs:// don't
# get handed s3.* keys.
if best_match is None and location.startswith(("s3://", "s3a://", "s3n://", "oss://")):
best_match = next((c for c in consumable if c.prefix == "s3"), None)

return best_match.config if best_match else {}

def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO:
Expand Down
61 changes: 61 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2600,6 +2600,67 @@ def test_resolve_storage_credentials_empty() -> None:
assert RestCatalog._resolve_storage_credentials([], None) == {}


def test_resolve_storage_credentials_skips_hadoop_only() -> None:
from pyiceberg.catalog.rest.scan_planning import StorageCredential

# The longer fs.* prefix would win a blind longest-match; the filter drops it.
credentials = [
StorageCredential(prefix="s3://warehouse/jindo", config={"fs.s3.access-key": "hadoop-k"}),
StorageCredential(prefix="s3://warehouse", config={"s3.access-key-id": "native-k"}),
]
result = RestCatalog._resolve_storage_credentials(credentials, "s3://warehouse/jindo/table/data")
assert result == {"s3.access-key-id": "native-k"}


def test_resolve_storage_credentials_mixed_prefix_namespaces_preserved() -> None:
from pyiceberg.catalog.rest.scan_planning import StorageCredential

credentials = [
StorageCredential(prefix="gs", config={"gs.oauth2.token": "tok"}),
StorageCredential(prefix="s3", config={"s3.access-key-id": "native-k"}),
]
result = RestCatalog._resolve_storage_credentials(credentials, "gs://bucket/path")
assert result == {"gs.oauth2.token": "tok"}


def test_resolve_storage_credentials_all_hadoop_only_returns_empty() -> None:
from pyiceberg.catalog.rest.scan_planning import StorageCredential

credentials = [
StorageCredential(prefix="custom", config={"fs.custom.access-key": "hadoop-k"}),
]
assert RestCatalog._resolve_storage_credentials(credentials, "custom://bucket/path") == {}


def test_resolve_storage_credentials_root_prefix_fallback_for_s3_compatible_scheme() -> None:
from pyiceberg.catalog.rest.scan_planning import StorageCredential

# oss:// is routed through pyarrow's S3FileSystem, so ROOT_PREFIX "s3" applies.
credentials = [
StorageCredential(prefix="s3", config={"s3.access-key-id": "native-k"}),
]
result = RestCatalog._resolve_storage_credentials(credentials, "oss://bucket/path")
assert result == {"s3.access-key-id": "native-k"}


def test_resolve_storage_credentials_root_prefix_fallback_respects_consumable() -> None:
from pyiceberg.catalog.rest.scan_planning import StorageCredential

credentials = [
StorageCredential(prefix="s3", config={"fs.s3.access-key": "hadoop-k"}),
]
assert RestCatalog._resolve_storage_credentials(credentials, "s3://bucket/path") == {}


def test_resolve_storage_credentials_fallback_skipped_for_non_s3_scheme() -> None:
from pyiceberg.catalog.rest.scan_planning import StorageCredential

credentials = [
StorageCredential(prefix="s3", config={"s3.access-key-id": "native-k"}),
]
assert RestCatalog._resolve_storage_credentials(credentials, "gs://bucket/path") == {}


def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]) -> None:
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
rest_mock.get(
Expand Down
Loading