diff --git a/pyiceberg/catalog/glue.py b/pyiceberg/catalog/glue.py index 5c09cdbd0f..2634f79682 100644 --- a/pyiceberg/catalog/glue.py +++ b/pyiceberg/catalog/glue.py @@ -16,6 +16,7 @@ # under the License. +import logging from typing import ( TYPE_CHECKING, Any, @@ -120,6 +121,8 @@ ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional" ICEBERG_FIELD_CURRENT = "iceberg.field.current" +logger = logging.getLogger(__name__) + GLUE_PROFILE_NAME = "glue.profile-name" GLUE_REGION = "glue.region" GLUE_ACCESS_KEY_ID = "glue.access-key-id" @@ -127,6 +130,7 @@ GLUE_SESSION_TOKEN = "glue.session-token" GLUE_MAX_RETRIES = "glue.max-retries" GLUE_RETRY_MODE = "glue.retry-mode" +GLUE_CONNECTION_S3_TABLES = "aws:s3tables" MAX_RETRIES = 10 STANDARD_RETRY_MODE = "standard" @@ -417,6 +421,116 @@ def _get_glue_table(self, database_name: str, table_name: str) -> "TableTypeDef" except self.glue.exceptions.EntityNotFoundException as e: raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e + def _is_s3tables_database(self, database_name: str) -> bool: + """Check if a Glue database is federated with S3 Tables. + + S3 Tables databases have a FederatedDatabase property with + ConnectionType set to aws:s3tables. + + Args: + database_name: The name of the Glue database. + + Returns: + True if the database is an S3 Tables federated database. + """ + try: + database_response = self.glue.get_database(Name=database_name) + except self.glue.exceptions.EntityNotFoundException: + return False + database = database_response["Database"] + federated = database.get("FederatedDatabase", {}) + return (federated.get("ConnectionType") or "").lower() == GLUE_CONNECTION_S3_TABLES + + def _create_table_s3tables( + self, + identifier: str | Identifier, + schema: Union[Schema, "pa.Schema"], + location: str | None, + partition_spec: PartitionSpec, + sort_order: SortOrder, + properties: Properties, + ) -> Table: + """Create an Iceberg table in an S3 Tables federated database. + + S3 Tables manages storage internally, so the table location is not known until the + table is created in the service. This method: + 1. Creates a minimal table entry in Glue (format=ICEBERG), which causes S3 Tables + to allocate storage. + 2. Retrieves the managed storage location via GetTable. + 3. Writes Iceberg metadata to that location. + 4. Updates the Glue table entry with the metadata pointer. + + On failure, the table created in step 1 is deleted. + """ + database_name, table_name = self.identifier_to_database_and_table(identifier) + + if location is not None: + raise ValueError( + f"Cannot specify a location for S3 Tables table {database_name}.{table_name}. " + "S3 Tables manages the storage location automatically." + ) + + # Create a minimal table in Glue so S3 Tables allocates storage. + self._create_glue_table( + database_name=database_name, + table_name=table_name, + table_input={ + "Name": table_name, + "Parameters": {"format": "ICEBERG"}, + }, + ) + + try: + # Retrieve the managed storage location. + glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) + storage_descriptor = glue_table.get("StorageDescriptor", {}) + managed_location = storage_descriptor.get("Location") + if not managed_location: + raise ValueError(f"S3 Tables did not assign a storage location for {database_name}.{table_name}") + + # Build the Iceberg metadata targeting the managed location. + staged_table = self._create_staged_table( + identifier=identifier, + schema=schema, + location=managed_location, + partition_spec=partition_spec, + sort_order=sort_order, + properties=properties, + ) + + # Write metadata and update the Glue table with the metadata pointer. + self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location) + table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata) + version_id = glue_table.get("VersionId") + if not version_id: + raise CommitFailedException( + f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" + ) + self._update_glue_table( + database_name=database_name, + table_name=table_name, + table_input=table_input, + version_id=version_id, + ) + except Exception: + # Clean up the table created in step 1. + try: + self.glue.delete_table(DatabaseName=database_name, Name=table_name) + except Exception: + logger.warning( + f"Failed to clean up S3 Tables table {database_name}.{table_name}", + exc_info=logger.isEnabledFor(logging.DEBUG), + ) + raise + + return Table( + identifier=self.identifier_to_tuple(identifier), + metadata=staged_table.metadata, + metadata_location=staged_table.metadata_location, + io=self._load_file_io(staged_table.metadata.properties, staged_table.metadata_location), + catalog=self, + ) + def create_table( self, identifier: str | Identifier, @@ -433,6 +547,7 @@ def create_table( identifier: Table identifier. schema: Table's schema. location: Location for the table. Optional Argument. + Must not be set for S3 Tables, which manage their own storage. partition_spec: PartitionSpec for the table. sort_order: SortOrder for the table. properties: Table properties that can be a string based dictionary. @@ -442,9 +557,22 @@ def create_table( Raises: AlreadyExistsError: If a table with the name already exists. - ValueError: If the identifier is invalid, or no path is given to store metadata. + ValueError: If the identifier is invalid, no path is given to store metadata, + or a location is specified for an S3 Tables table. """ + database_name, table_name = self.identifier_to_database_and_table(identifier) + + if self._is_s3tables_database(database_name): + return self._create_table_s3tables( + identifier=identifier, + schema=schema, + location=location, + partition_spec=partition_spec, + sort_order=sort_order, + properties=properties, + ) + staged_table = self._create_staged_table( identifier=identifier, schema=schema, @@ -453,13 +581,18 @@ def create_table( sort_order=sort_order, properties=properties, ) - database_name, table_name = self.identifier_to_database_and_table(identifier) self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location) table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata) self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input) - return self.load_table(identifier=identifier) + return Table( + identifier=self.identifier_to_tuple(identifier), + metadata=staged_table.metadata, + metadata_location=staged_table.metadata_location, + io=self._load_file_io(staged_table.metadata.properties, staged_table.metadata_location), + catalog=self, + ) def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table: """Register a new table using existing metadata. diff --git a/tests/catalog/test_glue.py b/tests/catalog/test_glue.py index 5273db22f8..c8da49a87e 100644 --- a/tests/catalog/test_glue.py +++ b/tests/catalog/test_glue.py @@ -21,7 +21,7 @@ import pytest from moto import mock_aws -from pyiceberg.catalog.glue import GlueCatalog +from pyiceberg.catalog.glue import GLUE_CONNECTION_S3_TABLES, GlueCatalog from pyiceberg.exceptions import ( NamespaceAlreadyExistsError, NamespaceNotEmptyError, @@ -43,6 +43,57 @@ UNIFIED_AWS_SESSION_PROPERTIES, ) +S3TABLES_WAREHOUSE_LOCATION = "s3tables-warehouse-location" + + +def _patch_moto_for_s3tables(monkeypatch: pytest.MonkeyPatch) -> None: + """Patch moto to simulate S3 Tables federated databases. + + Moto does not support FederatedDatabase on GetDatabase responses or + auto-populating StorageDescriptor.Location for S3 Tables. These patches + simulate the S3 Tables service behavior so that the GlueCatalog S3 Tables + code path can be tested end-to-end with moto. + """ + from moto.glue.models import FakeDatabase, FakeTable + + # Patch 1: Make GetDatabase return FederatedDatabase from the stored input. + _original_db_as_dict = FakeDatabase.as_dict + + def _db_as_dict_with_federated(self): # type: ignore + result = _original_db_as_dict(self) + if federated := self.input.get("FederatedDatabase"): + result["FederatedDatabase"] = federated + return result + + monkeypatch.setattr(FakeDatabase, "as_dict", _db_as_dict_with_federated) + + # Patch 2: When a table is created with format=ICEBERG (the S3 Tables convention), + # inject a StorageDescriptor.Location to simulate S3 Tables vending a table + # warehouse location. + _original_table_init = FakeTable.__init__ + + def _table_init_with_location(self, database_name, table_name, table_input, catalog_id): # type: ignore + if table_input.get("Parameters", {}).get("format") == "ICEBERG" and "StorageDescriptor" not in table_input: + table_input = { + **table_input, + "StorageDescriptor": { + "Columns": [], + "Location": f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}/", + "InputFormat": "", + "OutputFormat": "", + "SerdeInfo": {}, + }, + } + _original_table_init(self, database_name, table_name, table_input, catalog_id) + + monkeypatch.setattr(FakeTable, "__init__", _table_init_with_location) + + # Create a bucket backing the simulated table warehouse location. S3 Tables manages + # this storage internally, but in tests moto needs a real bucket for metadata file + # writes to succeed. + s3 = boto3.client("s3", region_name="us-east-1") + s3.create_bucket(Bucket=S3TABLES_WAREHOUSE_LOCATION) + @mock_aws def test_create_table_with_database_location( @@ -953,3 +1004,78 @@ def test_glue_client_override() -> None: test_client = boto3.client("glue", region_name="us-west-2") test_catalog = GlueCatalog(catalog_name, test_client) assert test_catalog.glue is test_client + + +def _create_s3tables_database(catalog: GlueCatalog, database_name: str) -> None: + """Create a Glue database with S3 Tables federation metadata.""" + catalog.glue.create_database( + DatabaseInput={ + "Name": database_name, + "FederatedDatabase": { + "Identifier": "arn:aws:s3tables:us-east-1:123456789012:bucket/my-bucket", + "ConnectionType": GLUE_CONNECTION_S3_TABLES, + }, + } + ) + + +@mock_aws +def test_create_table_s3tables( + monkeypatch: pytest.MonkeyPatch, + _bucket_initialize: None, + moto_endpoint_url: str, + table_schema_nested: Schema, + database_name: str, + table_name: str, +) -> None: + _patch_moto_for_s3tables(monkeypatch) + + identifier = (database_name, table_name) + test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url}) + _create_s3tables_database(test_catalog, database_name) + + table = test_catalog.create_table(identifier, table_schema_nested) + assert table.name() == identifier + assert table.location() == f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}" + assert table.metadata_location.startswith(f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}/metadata/00000-") + assert table.metadata_location.endswith(".metadata.json") + assert test_catalog._parse_metadata_version(table.metadata_location) == 0 + + +@mock_aws +def test_create_table_s3tables_rejects_location( + monkeypatch: pytest.MonkeyPatch, + _bucket_initialize: None, + moto_endpoint_url: str, + table_schema_nested: Schema, + database_name: str, + table_name: str, +) -> None: + _patch_moto_for_s3tables(monkeypatch) + + identifier = (database_name, table_name) + test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url}) + _create_s3tables_database(test_catalog, database_name) + + with pytest.raises(ValueError, match="Cannot specify a location for S3 Tables table"): + test_catalog.create_table(identifier, table_schema_nested, location="s3://some-bucket/some-path") + + +@mock_aws +def test_create_table_s3tables_duplicate( + monkeypatch: pytest.MonkeyPatch, + _bucket_initialize: None, + moto_endpoint_url: str, + table_schema_nested: Schema, + database_name: str, + table_name: str, +) -> None: + _patch_moto_for_s3tables(monkeypatch) + + identifier = (database_name, table_name) + test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url}) + _create_s3tables_database(test_catalog, database_name) + + test_catalog.create_table(identifier, table_schema_nested) + with pytest.raises(TableAlreadyExistsError): + test_catalog.create_table(identifier, table_schema_nested)