From cd844d3d142b81077a61e366ba2a73f711e9b7a6 Mon Sep 17 00:00:00 2001 From: Tanmay Rauth Date: Sat, 11 Apr 2026 08:49:36 -0700 Subject: [PATCH] feat(rest): support storage-credentials in PlanCompleted response (partial fix for #3165) Adds storage-credentials support for CompletedPlanningResult (PlanCompleted). LoadCredentialsResponse support is pending. --- pyiceberg/catalog/rest/__init__.py | 27 ++++++++- pyiceberg/table/__init__.py | 17 +++++- tests/catalog/test_rest.py | 88 ++++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+), 4 deletions(-) diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index b617cfa7da..17e570b90b 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -528,6 +528,29 @@ def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) Returns: List of FileScanTask objects ready for execution. + Raises: + RuntimeError: If planning fails, is cancelled, or returns unexpected response. + NotImplementedError: If async planning is required but not yet supported. + """ + tasks, _ = self._plan_scan_for_table(identifier, request) + return tasks + + def _plan_scan_for_table( + self, identifier: str | Identifier, request: PlanTableScanRequest, table_location: str | None = None + ) -> tuple[list[FileScanTask], Properties]: + """Plan a table scan and return FileScanTasks along with resolved storage credentials. + + Per Iceberg spec: storage-credentials in the PlanCompleted response take precedence over + catalog config and should be applied to the FileIO used to read data files. + + Args: + identifier: Table identifier. + request: The scan plan request parameters. + table_location: The table's metadata location, used for credential prefix matching. + + Returns: + Tuple of (list of FileScanTask objects, resolved credential Properties). + Raises: RuntimeError: If planning fails, is cancelled, or returns unexpected response. NotImplementedError: If async planning is required but not yet supported. @@ -548,6 +571,8 @@ def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) if not isinstance(response, PlanCompleted): raise RuntimeError(f"Invalid planStatus for response: {type(response).__name__}") + credential_config = self._resolve_storage_credentials(response.storage_credentials or [], table_location) + tasks: list[FileScanTask] = [] # Collect tasks from initial response @@ -563,7 +588,7 @@ def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest) tasks.append(FileScanTask.from_rest_response(task, batch.delete_files)) pending_tasks.extend(batch.plan_tasks) - return tasks + return tasks, credential_config def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager: """Create the LegacyOAuth2AuthManager by fetching required properties. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index bb8765b651..66ab738c86 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2054,7 +2054,12 @@ def _plan_files_server_side(self) -> Iterable[FileScanTask]: case_sensitive=self.case_sensitive, ) - return self.catalog.plan_scan(self.table_identifier, request) + tasks, credential_config = self.catalog._plan_scan_for_table(self.table_identifier, request, self.table_metadata.location) + # Per Iceberg spec: storage-credentials from PlanCompleted take precedence over config. + # Update the FileIO so data files in this scan are read with the vended credentials. + if credential_config: + self.io = self.catalog._load_file_io({**self.io.properties, **credential_config}, self.table_metadata.location) + return tasks def _plan_files_local(self) -> Iterable[FileScanTask]: """Plan files locally by reading manifests.""" @@ -2112,9 +2117,13 @@ def to_arrow(self) -> pa.Table: """ from pyiceberg.io.pyarrow import ArrowScan + # plan_files() must be called before capturing self.io so that any vended credentials + # returned by server-side scan planning (PlanCompleted.storage_credentials) are applied + # to self.io before ArrowScan is constructed. + tasks = self.plan_files() return ArrowScan( self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit - ).to_table(self.plan_files()) + ).to_table(tasks) def to_arrow_batch_reader(self) -> pa.RecordBatchReader: """Return an Arrow RecordBatchReader from this DataScan. @@ -2132,9 +2141,11 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow target_schema = schema_to_pyarrow(self.projection()) + # plan_files() must be called before capturing self.io (same reason as to_arrow). + tasks = self.plan_files() batches = ArrowScan( self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit - ).to_record_batches(self.plan_files()) + ).to_record_batches(tasks) return pa.RecordBatchReader.from_batches( target_schema, diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 99d1ef947b..e3dbc7b003 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -2621,3 +2621,91 @@ def test_load_table_without_storage_credentials( ) assert actual.metadata.model_dump() == expected.metadata.model_dump() assert actual == expected + + +def test_plan_scan_with_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]) -> None: + metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json" + rest_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/table", + json={ + "metadata-location": metadata_location, + "metadata": example_table_metadata_with_snapshot_v1, + "config": {}, + }, + status_code=200, + request_headers=TEST_HEADERS, + ) + rest_mock.post( + f"{TEST_URI}v1/namespaces/fokko/tables/table/plan", + json={ + "status": "completed", + "file-scan-tasks": [], + "delete-files": [], + "plan-tasks": [], + "storage-credentials": [ + { + "prefix": "s3://warehouse/database/table", + "config": { + "s3.access-key-id": "plan-vended-key", + "s3.secret-access-key": "plan-vended-secret", + "s3.session-token": "plan-vended-token", + }, + } + ], + }, + status_code=200, + request_headers=TEST_HEADERS, + ) + from pyiceberg.catalog.rest.scan_planning import PlanTableScanRequest + + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{"rest-scan-planning-enabled": "true"}) + tasks, credential_config = catalog._plan_scan_for_table( + ("fokko", "table"), + PlanTableScanRequest(), + table_location="s3://warehouse/database/table", + ) + + assert tasks == [] + assert credential_config == { + "s3.access-key-id": "plan-vended-key", + "s3.secret-access-key": "plan-vended-secret", + "s3.session-token": "plan-vended-token", + } + + +def test_plan_scan_without_storage_credentials( + rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any] +) -> None: + metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json" + rest_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/table", + json={ + "metadata-location": metadata_location, + "metadata": example_table_metadata_with_snapshot_v1, + "config": {}, + }, + status_code=200, + request_headers=TEST_HEADERS, + ) + rest_mock.post( + f"{TEST_URI}v1/namespaces/fokko/tables/table/plan", + json={ + "status": "completed", + "file-scan-tasks": [], + "delete-files": [], + "plan-tasks": [], + }, + status_code=200, + request_headers=TEST_HEADERS, + ) + from pyiceberg.catalog.rest.scan_planning import PlanTableScanRequest + + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{"rest-scan-planning-enabled": "true"}) + tasks, credential_config = catalog._plan_scan_for_table( + ("fokko", "table"), + PlanTableScanRequest(), + table_location="s3://warehouse/database/table", + ) + + assert tasks == [] + assert credential_config == {}