Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,29 @@ def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest)
Returns:
List of FileScanTask objects ready for execution.

Raises:
RuntimeError: If planning fails, is cancelled, or returns unexpected response.
NotImplementedError: If async planning is required but not yet supported.
"""
tasks, _ = self._plan_scan_for_table(identifier, request)
return tasks

def _plan_scan_for_table(
self, identifier: str | Identifier, request: PlanTableScanRequest, table_location: str | None = None
) -> tuple[list[FileScanTask], Properties]:
"""Plan a table scan and return FileScanTasks along with resolved storage credentials.

Per Iceberg spec: storage-credentials in the PlanCompleted response take precedence over
catalog config and should be applied to the FileIO used to read data files.

Args:
identifier: Table identifier.
request: The scan plan request parameters.
table_location: The table's metadata location, used for credential prefix matching.

Returns:
Tuple of (list of FileScanTask objects, resolved credential Properties).

Raises:
RuntimeError: If planning fails, is cancelled, or returns unexpected response.
NotImplementedError: If async planning is required but not yet supported.
Expand All @@ -548,6 +571,8 @@ def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest)
if not isinstance(response, PlanCompleted):
raise RuntimeError(f"Invalid planStatus for response: {type(response).__name__}")

credential_config = self._resolve_storage_credentials(response.storage_credentials or [], table_location)

tasks: list[FileScanTask] = []

# Collect tasks from initial response
Expand All @@ -563,7 +588,7 @@ def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest)
tasks.append(FileScanTask.from_rest_response(task, batch.delete_files))
pending_tasks.extend(batch.plan_tasks)

return tasks
return tasks, credential_config

def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager:
"""Create the LegacyOAuth2AuthManager by fetching required properties.
Expand Down
17 changes: 14 additions & 3 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2054,7 +2054,12 @@ def _plan_files_server_side(self) -> Iterable[FileScanTask]:
case_sensitive=self.case_sensitive,
)

return self.catalog.plan_scan(self.table_identifier, request)
tasks, credential_config = self.catalog._plan_scan_for_table(self.table_identifier, request, self.table_metadata.location)
# Per Iceberg spec: storage-credentials from PlanCompleted take precedence over config.
# Update the FileIO so data files in this scan are read with the vended credentials.
if credential_config:
self.io = self.catalog._load_file_io({**self.io.properties, **credential_config}, self.table_metadata.location)
return tasks

def _plan_files_local(self) -> Iterable[FileScanTask]:
"""Plan files locally by reading manifests."""
Expand Down Expand Up @@ -2112,9 +2117,13 @@ def to_arrow(self) -> pa.Table:
"""
from pyiceberg.io.pyarrow import ArrowScan

# plan_files() must be called before capturing self.io so that any vended credentials
# returned by server-side scan planning (PlanCompleted.storage_credentials) are applied
# to self.io before ArrowScan is constructed.
tasks = self.plan_files()
return ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_table(self.plan_files())
).to_table(tasks)

def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
"""Return an Arrow RecordBatchReader from this DataScan.
Expand All @@ -2132,9 +2141,11 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow

target_schema = schema_to_pyarrow(self.projection())
# plan_files() must be called before capturing self.io (same reason as to_arrow).
tasks = self.plan_files()
batches = ArrowScan(
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
).to_record_batches(self.plan_files())
).to_record_batches(tasks)

return pa.RecordBatchReader.from_batches(
target_schema,
Expand Down
88 changes: 88 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2621,3 +2621,91 @@ def test_load_table_without_storage_credentials(
)
assert actual.metadata.model_dump() == expected.metadata.model_dump()
assert actual == expected


def test_plan_scan_with_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]) -> None:
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
json={
"metadata-location": metadata_location,
"metadata": example_table_metadata_with_snapshot_v1,
"config": {},
},
status_code=200,
request_headers=TEST_HEADERS,
)
rest_mock.post(
f"{TEST_URI}v1/namespaces/fokko/tables/table/plan",
json={
"status": "completed",
"file-scan-tasks": [],
"delete-files": [],
"plan-tasks": [],
"storage-credentials": [
{
"prefix": "s3://warehouse/database/table",
"config": {
"s3.access-key-id": "plan-vended-key",
"s3.secret-access-key": "plan-vended-secret",
"s3.session-token": "plan-vended-token",
},
}
],
},
status_code=200,
request_headers=TEST_HEADERS,
)
from pyiceberg.catalog.rest.scan_planning import PlanTableScanRequest

catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{"rest-scan-planning-enabled": "true"})
tasks, credential_config = catalog._plan_scan_for_table(
("fokko", "table"),
PlanTableScanRequest(),
table_location="s3://warehouse/database/table",
)

assert tasks == []
assert credential_config == {
"s3.access-key-id": "plan-vended-key",
"s3.secret-access-key": "plan-vended-secret",
"s3.session-token": "plan-vended-token",
}


def test_plan_scan_without_storage_credentials(
rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]
) -> None:
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
json={
"metadata-location": metadata_location,
"metadata": example_table_metadata_with_snapshot_v1,
"config": {},
},
status_code=200,
request_headers=TEST_HEADERS,
)
rest_mock.post(
f"{TEST_URI}v1/namespaces/fokko/tables/table/plan",
json={
"status": "completed",
"file-scan-tasks": [],
"delete-files": [],
"plan-tasks": [],
},
status_code=200,
request_headers=TEST_HEADERS,
)
from pyiceberg.catalog.rest.scan_planning import PlanTableScanRequest

catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{"rest-scan-planning-enabled": "true"})
tasks, credential_config = catalog._plan_scan_for_table(
("fokko", "table"),
PlanTableScanRequest(),
table_location="s3://warehouse/database/table",
)

assert tasks == []
assert credential_config == {}
Loading