Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 136 additions & 3 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.


import logging
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -120,13 +121,16 @@
ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional"
ICEBERG_FIELD_CURRENT = "iceberg.field.current"

logger = logging.getLogger(__name__)

GLUE_PROFILE_NAME = "glue.profile-name"
GLUE_REGION = "glue.region"
GLUE_ACCESS_KEY_ID = "glue.access-key-id"
GLUE_SECRET_ACCESS_KEY = "glue.secret-access-key"
GLUE_SESSION_TOKEN = "glue.session-token"
GLUE_MAX_RETRIES = "glue.max-retries"
GLUE_RETRY_MODE = "glue.retry-mode"
GLUE_CONNECTION_S3_TABLES = "aws:s3tables"

MAX_RETRIES = 10
STANDARD_RETRY_MODE = "standard"
Expand Down Expand Up @@ -417,6 +421,116 @@ def _get_glue_table(self, database_name: str, table_name: str) -> "TableTypeDef"
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e

def _is_s3tables_database(self, database_name: str) -> bool:
"""Check if a Glue database is federated with S3 Tables.

S3 Tables databases have a FederatedDatabase property with
ConnectionType set to aws:s3tables.

Args:
database_name: The name of the Glue database.

Returns:
True if the database is an S3 Tables federated database.
"""
try:
database_response = self.glue.get_database(Name=database_name)
except self.glue.exceptions.EntityNotFoundException:
return False
database = database_response["Database"]
federated = database.get("FederatedDatabase", {})
return (federated.get("ConnectionType") or "").lower() == GLUE_CONNECTION_S3_TABLES

def _create_table_s3tables(
self,
identifier: str | Identifier,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we have some redundancy with these params passed in: ident, db_name, tbl_name maybe we can derive?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, done

schema: Union[Schema, "pa.Schema"],
location: str | None,
partition_spec: PartitionSpec,
sort_order: SortOrder,
properties: Properties,
) -> Table:
"""Create an Iceberg table in an S3 Tables federated database.

S3 Tables manages storage internally, so the table location is not known until the
table is created in the service. This method:
1. Creates a minimal table entry in Glue (format=ICEBERG), which causes S3 Tables
to allocate storage.
2. Retrieves the managed storage location via GetTable.
3. Writes Iceberg metadata to that location.
4. Updates the Glue table entry with the metadata pointer.

On failure, the table created in step 1 is deleted.
"""
database_name, table_name = self.identifier_to_database_and_table(identifier)

if location is not None:
raise ValueError(
f"Cannot specify a location for S3 Tables table {database_name}.{table_name}. "
"S3 Tables manages the storage location automatically."
)

# Create a minimal table in Glue so S3 Tables allocates storage.
self._create_glue_table(
database_name=database_name,
table_name=table_name,
table_input={
"Name": table_name,
"Parameters": {"format": "ICEBERG"},
},
)

try:
# Retrieve the managed storage location.
glue_table = self._get_glue_table(database_name=database_name, table_name=table_name)
storage_descriptor = glue_table.get("StorageDescriptor", {})
managed_location = storage_descriptor.get("Location")
if not managed_location:
raise ValueError(f"S3 Tables did not assign a storage location for {database_name}.{table_name}")

# Build the Iceberg metadata targeting the managed location.
staged_table = self._create_staged_table(
identifier=identifier,
schema=schema,
location=managed_location,
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
)

# Write metadata and update the Glue table with the metadata pointer.
self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location)
table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata)
version_id = glue_table.get("VersionId")
if not version_id:
raise CommitFailedException(
f"Cannot commit {database_name}.{table_name} because Glue table version id is missing"
)
self._update_glue_table(
database_name=database_name,
table_name=table_name,
table_input=table_input,
version_id=version_id,
)
except Exception:
# Clean up the table created in step 1.
try:
self.glue.delete_table(DatabaseName=database_name, Name=table_name)
except Exception:
logger.warning(
f"Failed to clean up S3 Tables table {database_name}.{table_name}",
exc_info=logger.isEnabledFor(logging.DEBUG),
)
raise

return Table(
identifier=self.identifier_to_tuple(identifier),
metadata=staged_table.metadata,
metadata_location=staged_table.metadata_location,
io=self._load_file_io(staged_table.metadata.properties, staged_table.metadata_location),
catalog=self,
)

def create_table(
self,
identifier: str | Identifier,
Expand All @@ -433,6 +547,7 @@ def create_table(
identifier: Table identifier.
schema: Table's schema.
location: Location for the table. Optional Argument.
Must not be set for S3 Tables, which manage their own storage.
partition_spec: PartitionSpec for the table.
sort_order: SortOrder for the table.
properties: Table properties that can be a string based dictionary.
Expand All @@ -442,9 +557,22 @@ def create_table(

Raises:
AlreadyExistsError: If a table with the name already exists.
ValueError: If the identifier is invalid, or no path is given to store metadata.
ValueError: If the identifier is invalid, no path is given to store metadata,
or a location is specified for an S3 Tables table.

"""
database_name, table_name = self.identifier_to_database_and_table(identifier)

if self._is_s3tables_database(database_name):
return self._create_table_s3tables(
identifier=identifier,
schema=schema,
location=location,
partition_spec=partition_spec,
sort_order=sort_order,
properties=properties,
)

staged_table = self._create_staged_table(
identifier=identifier,
schema=schema,
Expand All @@ -453,13 +581,18 @@ def create_table(
sort_order=sort_order,
properties=properties,
)
database_name, table_name = self.identifier_to_database_and_table(identifier)

self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location)
table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata)
self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input)

return self.load_table(identifier=identifier)
return Table(
identifier=self.identifier_to_tuple(identifier),
metadata=staged_table.metadata,
metadata_location=staged_table.metadata_location,
io=self._load_file_io(staged_table.metadata.properties, staged_table.metadata_location),
catalog=self,
)

def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
"""Register a new table using existing metadata.
Expand Down
128 changes: 127 additions & 1 deletion tests/catalog/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import pytest
from moto import mock_aws

from pyiceberg.catalog.glue import GlueCatalog
from pyiceberg.catalog.glue import GLUE_CONNECTION_S3_TABLES, GlueCatalog
from pyiceberg.exceptions import (
NamespaceAlreadyExistsError,
NamespaceNotEmptyError,
Expand All @@ -43,6 +43,57 @@
UNIFIED_AWS_SESSION_PROPERTIES,
)

S3TABLES_WAREHOUSE_LOCATION = "s3tables-warehouse-location"


def _patch_moto_for_s3tables(monkeypatch: pytest.MonkeyPatch) -> None:
"""Patch moto to simulate S3 Tables federated databases.

Moto does not support FederatedDatabase on GetDatabase responses or
auto-populating StorageDescriptor.Location for S3 Tables. These patches
simulate the S3 Tables service behavior so that the GlueCatalog S3 Tables
code path can be tested end-to-end with moto.
"""
from moto.glue.models import FakeDatabase, FakeTable

# Patch 1: Make GetDatabase return FederatedDatabase from the stored input.
_original_db_as_dict = FakeDatabase.as_dict

def _db_as_dict_with_federated(self): # type: ignore
result = _original_db_as_dict(self)
if federated := self.input.get("FederatedDatabase"):
result["FederatedDatabase"] = federated
return result

monkeypatch.setattr(FakeDatabase, "as_dict", _db_as_dict_with_federated)

# Patch 2: When a table is created with format=ICEBERG (the S3 Tables convention),
# inject a StorageDescriptor.Location to simulate S3 Tables vending a table
# warehouse location.
_original_table_init = FakeTable.__init__

def _table_init_with_location(self, database_name, table_name, table_input, catalog_id): # type: ignore
if table_input.get("Parameters", {}).get("format") == "ICEBERG" and "StorageDescriptor" not in table_input:
table_input = {
**table_input,
"StorageDescriptor": {
"Columns": [],
"Location": f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}/",
"InputFormat": "",
"OutputFormat": "",
"SerdeInfo": {},
},
}
_original_table_init(self, database_name, table_name, table_input, catalog_id)

monkeypatch.setattr(FakeTable, "__init__", _table_init_with_location)

# Create a bucket backing the simulated table warehouse location. S3 Tables manages
# this storage internally, but in tests moto needs a real bucket for metadata file
# writes to succeed.
s3 = boto3.client("s3", region_name="us-east-1")
s3.create_bucket(Bucket=S3TABLES_WAREHOUSE_LOCATION)


@mock_aws
def test_create_table_with_database_location(
Expand Down Expand Up @@ -953,3 +1004,78 @@ def test_glue_client_override() -> None:
test_client = boto3.client("glue", region_name="us-west-2")
test_catalog = GlueCatalog(catalog_name, test_client)
assert test_catalog.glue is test_client


def _create_s3tables_database(catalog: GlueCatalog, database_name: str) -> None:
"""Create a Glue database with S3 Tables federation metadata."""
catalog.glue.create_database(
DatabaseInput={
"Name": database_name,
"FederatedDatabase": {
"Identifier": "arn:aws:s3tables:us-east-1:123456789012:bucket/my-bucket",
"ConnectionType": GLUE_CONNECTION_S3_TABLES,
},
}
)


@mock_aws
def test_create_table_s3tables(
monkeypatch: pytest.MonkeyPatch,
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
) -> None:
_patch_moto_for_s3tables(monkeypatch)

identifier = (database_name, table_name)
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
_create_s3tables_database(test_catalog, database_name)

table = test_catalog.create_table(identifier, table_schema_nested)
assert table.name() == identifier
assert table.location() == f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}"
assert table.metadata_location.startswith(f"s3://{S3TABLES_WAREHOUSE_LOCATION}/{database_name}/{table_name}/metadata/00000-")
assert table.metadata_location.endswith(".metadata.json")
assert test_catalog._parse_metadata_version(table.metadata_location) == 0


@mock_aws
def test_create_table_s3tables_rejects_location(
monkeypatch: pytest.MonkeyPatch,
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
) -> None:
_patch_moto_for_s3tables(monkeypatch)

identifier = (database_name, table_name)
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
_create_s3tables_database(test_catalog, database_name)

with pytest.raises(ValueError, match="Cannot specify a location for S3 Tables table"):
test_catalog.create_table(identifier, table_schema_nested, location="s3://some-bucket/some-path")


@mock_aws
def test_create_table_s3tables_duplicate(
monkeypatch: pytest.MonkeyPatch,
_bucket_initialize: None,
moto_endpoint_url: str,
table_schema_nested: Schema,
database_name: str,
table_name: str,
) -> None:
_patch_moto_for_s3tables(monkeypatch)

identifier = (database_name, table_name)
test_catalog = GlueCatalog("s3tables", **{"s3.endpoint": moto_endpoint_url})
_create_s3tables_database(test_catalog, database_name)

test_catalog.create_table(identifier, table_schema_nested)
with pytest.raises(TableAlreadyExistsError):
test_catalog.create_table(identifier, table_schema_nested)