Skip to content

Commit ddd2380

Browse files
committed
feat(rest): support storage-credentials in PlanCompleted response (partial fix for #3165)
Adds storage-credentials support for CompletedPlanningResult (PlanCompleted). LoadCredentialsResponse support is pending.
1 parent 1a54e9c commit ddd2380

File tree

3 files changed

+136
-4
lines changed

3 files changed

+136
-4
lines changed

pyiceberg/catalog/rest/__init__.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,6 +528,29 @@ def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest)
528528
Returns:
529529
List of FileScanTask objects ready for execution.
530530
531+
Raises:
532+
RuntimeError: If planning fails, is cancelled, or returns unexpected response.
533+
NotImplementedError: If async planning is required but not yet supported.
534+
"""
535+
tasks, _ = self._plan_scan_for_table(identifier, request)
536+
return tasks
537+
538+
def _plan_scan_for_table(
539+
self, identifier: str | Identifier, request: PlanTableScanRequest, table_location: str | None = None
540+
) -> tuple[list[FileScanTask], Properties]:
541+
"""Plan a table scan and return FileScanTasks along with resolved storage credentials.
542+
543+
Per Iceberg spec: storage-credentials in the PlanCompleted response take precedence over
544+
catalog config and should be applied to the FileIO used to read data files.
545+
546+
Args:
547+
identifier: Table identifier.
548+
request: The scan plan request parameters.
549+
table_location: The table's metadata location, used for credential prefix matching.
550+
551+
Returns:
552+
Tuple of (list of FileScanTask objects, resolved credential Properties).
553+
531554
Raises:
532555
RuntimeError: If planning fails, is cancelled, or returns unexpected response.
533556
NotImplementedError: If async planning is required but not yet supported.
@@ -548,6 +571,8 @@ def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest)
548571
if not isinstance(response, PlanCompleted):
549572
raise RuntimeError(f"Invalid planStatus for response: {type(response).__name__}")
550573

574+
credential_config = self._resolve_storage_credentials(response.storage_credentials or [], table_location)
575+
551576
tasks: list[FileScanTask] = []
552577

553578
# Collect tasks from initial response
@@ -563,7 +588,7 @@ def plan_scan(self, identifier: str | Identifier, request: PlanTableScanRequest)
563588
tasks.append(FileScanTask.from_rest_response(task, batch.delete_files))
564589
pending_tasks.extend(batch.plan_tasks)
565590

566-
return tasks
591+
return tasks, credential_config
567592

568593
def _create_legacy_oauth2_auth_manager(self, session: Session) -> AuthManager:
569594
"""Create the LegacyOAuth2AuthManager by fetching required properties.

pyiceberg/table/__init__.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2054,7 +2054,16 @@ def _plan_files_server_side(self) -> Iterable[FileScanTask]:
20542054
case_sensitive=self.case_sensitive,
20552055
)
20562056

2057-
return self.catalog.plan_scan(self.table_identifier, request)
2057+
tasks, credential_config = self.catalog._plan_scan_for_table(
2058+
self.table_identifier, request, self.table_metadata.location
2059+
)
2060+
# Per Iceberg spec: storage-credentials from PlanCompleted take precedence over config.
2061+
# Update the FileIO so data files in this scan are read with the vended credentials.
2062+
if credential_config:
2063+
self.io = self.catalog._load_file_io(
2064+
{**self.io.properties, **credential_config}, self.table_metadata.location
2065+
)
2066+
return tasks
20582067

20592068
def _plan_files_local(self) -> Iterable[FileScanTask]:
20602069
"""Plan files locally by reading manifests."""
@@ -2112,9 +2121,13 @@ def to_arrow(self) -> pa.Table:
21122121
"""
21132122
from pyiceberg.io.pyarrow import ArrowScan
21142123

2124+
# plan_files() must be called before capturing self.io so that any vended credentials
2125+
# returned by server-side scan planning (PlanCompleted.storage_credentials) are applied
2126+
# to self.io before ArrowScan is constructed.
2127+
tasks = self.plan_files()
21152128
return ArrowScan(
21162129
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2117-
).to_table(self.plan_files())
2130+
).to_table(tasks)
21182131

21192132
def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
21202133
"""Return an Arrow RecordBatchReader from this DataScan.
@@ -2132,9 +2145,11 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
21322145
from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
21332146

21342147
target_schema = schema_to_pyarrow(self.projection())
2148+
# plan_files() must be called before capturing self.io (same reason as to_arrow).
2149+
tasks = self.plan_files()
21352150
batches = ArrowScan(
21362151
self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit
2137-
).to_record_batches(self.plan_files())
2152+
).to_record_batches(tasks)
21382153

21392154
return pa.RecordBatchReader.from_batches(
21402155
target_schema,

tests/catalog/test_rest.py

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2621,3 +2621,95 @@ def test_load_table_without_storage_credentials(
26212621
)
26222622
assert actual.metadata.model_dump() == expected.metadata.model_dump()
26232623
assert actual == expected
2624+
2625+
2626+
2627+
2628+
def test_plan_scan_with_storage_credentials(
2629+
rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]
2630+
) -> None:
2631+
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
2632+
rest_mock.get(
2633+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
2634+
json={
2635+
"metadata-location": metadata_location,
2636+
"metadata": example_table_metadata_with_snapshot_v1,
2637+
"config": {},
2638+
},
2639+
status_code=200,
2640+
request_headers=TEST_HEADERS,
2641+
)
2642+
rest_mock.post(
2643+
f"{TEST_URI}v1/namespaces/fokko/tables/table/plan",
2644+
json={
2645+
"status": "completed",
2646+
"file-scan-tasks": [],
2647+
"delete-files": [],
2648+
"plan-tasks": [],
2649+
"storage-credentials": [
2650+
{
2651+
"prefix": "s3://warehouse/database/table",
2652+
"config": {
2653+
"s3.access-key-id": "plan-vended-key",
2654+
"s3.secret-access-key": "plan-vended-secret",
2655+
"s3.session-token": "plan-vended-token",
2656+
},
2657+
}
2658+
],
2659+
},
2660+
status_code=200,
2661+
request_headers=TEST_HEADERS,
2662+
)
2663+
from pyiceberg.catalog.rest.scan_planning import PlanTableScanRequest
2664+
2665+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{"rest-scan-planning-enabled": "true"})
2666+
tasks, credential_config = catalog._plan_scan_for_table(
2667+
("fokko", "table"),
2668+
PlanTableScanRequest(),
2669+
table_location="s3://warehouse/database/table",
2670+
)
2671+
2672+
assert tasks == []
2673+
assert credential_config == {
2674+
"s3.access-key-id": "plan-vended-key",
2675+
"s3.secret-access-key": "plan-vended-secret",
2676+
"s3.session-token": "plan-vended-token",
2677+
}
2678+
2679+
2680+
def test_plan_scan_without_storage_credentials(
2681+
rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]
2682+
) -> None:
2683+
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
2684+
rest_mock.get(
2685+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
2686+
json={
2687+
"metadata-location": metadata_location,
2688+
"metadata": example_table_metadata_with_snapshot_v1,
2689+
"config": {},
2690+
},
2691+
status_code=200,
2692+
request_headers=TEST_HEADERS,
2693+
)
2694+
rest_mock.post(
2695+
f"{TEST_URI}v1/namespaces/fokko/tables/table/plan",
2696+
json={
2697+
"status": "completed",
2698+
"file-scan-tasks": [],
2699+
"delete-files": [],
2700+
"plan-tasks": [],
2701+
},
2702+
status_code=200,
2703+
request_headers=TEST_HEADERS,
2704+
)
2705+
from pyiceberg.catalog.rest.scan_planning import PlanTableScanRequest
2706+
2707+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN, **{"rest-scan-planning-enabled": "true"})
2708+
tasks, credential_config = catalog._plan_scan_for_table(
2709+
("fokko", "table"),
2710+
PlanTableScanRequest(),
2711+
table_location="s3://warehouse/database/table",
2712+
)
2713+
2714+
assert tasks == []
2715+
assert credential_config == {}

0 commit comments

Comments
 (0)