-
Notifications
You must be signed in to change notification settings - Fork 450
BigQuery metastore: implement commit_table with commit status verification #3099
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,14 @@ | |
| from google.oauth2 import service_account | ||
|
|
||
| from pyiceberg.catalog import WAREHOUSE_LOCATION, MetastoreCatalog, PropertiesUpdateSummary | ||
| from pyiceberg.exceptions import NamespaceAlreadyExistsError, NoSuchNamespaceError, NoSuchTableError, TableAlreadyExistsError | ||
| from pyiceberg.exceptions import ( | ||
| CommitFailedException, | ||
| CommitStateUnknownException, | ||
| NamespaceAlreadyExistsError, | ||
| NoSuchNamespaceError, | ||
| NoSuchTableError, | ||
| TableAlreadyExistsError, | ||
| ) | ||
| from pyiceberg.io import load_file_io | ||
| from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec | ||
| from pyiceberg.schema import Schema | ||
|
|
@@ -229,7 +236,88 @@ def drop_table(self, identifier: str | Identifier) -> None: | |
| def commit_table( | ||
| self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...] | ||
| ) -> CommitTableResponse: | ||
| raise NotImplementedError | ||
| table_identifier = table.name() | ||
| dataset_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) | ||
| table_ref = TableReference( | ||
| dataset_ref=DatasetReference(project=self.project_id, dataset_id=dataset_name), | ||
| table_id=table_name, | ||
| ) | ||
|
|
||
| current_bq_table: BQTable | None | ||
| current_table: Table | None | ||
| try: | ||
| current_bq_table = self.client.get_table(table_ref) | ||
| except NotFound: | ||
| current_bq_table = None | ||
| current_table = None | ||
| else: | ||
| current_table = self._convert_bigquery_table_to_iceberg_table(table_identifier, current_bq_table) | ||
|
|
||
| updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) | ||
| if current_table and updated_staged_table.metadata == current_table.metadata: | ||
| return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) | ||
|
|
||
| self._write_metadata( | ||
| metadata=updated_staged_table.metadata, | ||
| io=updated_staged_table.io, | ||
| metadata_path=updated_staged_table.metadata_location, | ||
| ) | ||
|
|
||
| commit_error: Exception | None = None | ||
| try: | ||
| if current_bq_table and current_table: | ||
| current_bq_table.external_catalog_table_options = self._create_external_catalog_table_options( | ||
| updated_staged_table.metadata.location, | ||
| self._create_table_parameters( | ||
| metadata_file_location=updated_staged_table.metadata_location, | ||
| table_metadata=updated_staged_table.metadata, | ||
| previous_metadata_location=current_table.metadata_location, | ||
| ), | ||
| ) | ||
| self.client.update_table(current_bq_table, ["external_catalog_table_options"]) | ||
| else: | ||
| self.client.create_table( | ||
| self._make_new_table( | ||
| updated_staged_table.metadata, | ||
| updated_staged_table.metadata_location, | ||
| table_ref, | ||
| ) | ||
| ) | ||
| except NotFound as e: | ||
| commit_error = ( | ||
| CommitFailedException(f"Table does not exist: {dataset_name}.{table_name}") | ||
| if current_table | ||
| else NoSuchNamespaceError(f"Namespace does not exist: {dataset_name}") | ||
| ) | ||
| commit_error.__cause__ = e | ||
| except Conflict as e: | ||
| commit_error = ( | ||
| CommitFailedException(f"Table has been updated by another process: {dataset_name}.{table_name}") | ||
| if current_table | ||
| else TableAlreadyExistsError(f"Table {table_name} already exists") | ||
| ) | ||
| commit_error.__cause__ = e | ||
| except Exception as e: | ||
| commit_error = e | ||
| finally: | ||
| if commit_error: | ||
| commit_status = self._check_bigquery_commit_status(table_ref, updated_staged_table.metadata_location) | ||
| if commit_status == "SUCCESS": | ||
| commit_error = None | ||
| elif commit_status == "UNKNOWN": | ||
| raise CommitStateUnknownException( | ||
| f"Commit state unknown for table {dataset_name}.{table_name}" | ||
| ) from commit_error | ||
|
|
||
| if commit_error: | ||
| raise commit_error | ||
|
|
||
| if current_table: | ||
| self._delete_old_metadata(updated_staged_table.io, current_table.metadata, updated_staged_table.metadata) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
https://github.com/apache/iceberg-python/blob/main/pyiceberg/table/__init__.py#L1507 |
||
|
|
||
| return CommitTableResponse( | ||
| metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location | ||
| ) | ||
|
|
||
| def rename_table(self, from_identifier: str | Identifier, to_identifier: str | Identifier) -> Table: | ||
| raise NotImplementedError | ||
|
|
@@ -381,11 +469,20 @@ def _convert_bigquery_table_to_iceberg_table(self, identifier: str | Identifier, | |
| catalog=self, | ||
| ) | ||
|
|
||
| def _create_table_parameters(self, metadata_file_location: str, table_metadata: TableMetadata) -> dict[str, Any]: | ||
| parameters: dict[str, Any] = table_metadata.properties | ||
| def _create_table_parameters( | ||
| self, | ||
| metadata_file_location: str, | ||
| table_metadata: TableMetadata, | ||
| previous_metadata_location: str | None = None, | ||
| ) -> dict[str, Any]: | ||
| parameters: dict[str, Any] = dict(table_metadata.properties) | ||
| if table_metadata.table_uuid: | ||
| parameters["uuid"] = str(table_metadata.table_uuid) | ||
| parameters[METADATA_LOCATION_PROP] = metadata_file_location | ||
| if previous_metadata_location: | ||
| parameters[PREVIOUS_METADATA_LOCATION_PROP] = previous_metadata_location | ||
| else: | ||
| parameters.pop(PREVIOUS_METADATA_LOCATION_PROP, None) | ||
| parameters[TABLE_TYPE_PROP] = ICEBERG_TABLE_TYPE_VALUE | ||
| parameters["EXTERNAL"] = True | ||
|
|
||
|
|
@@ -405,6 +502,35 @@ def _create_table_parameters(self, metadata_file_location: str, table_metadata: | |
|
|
||
| return parameters | ||
|
|
||
| def _check_bigquery_commit_status(self, table_ref: TableReference, new_metadata_location: str) -> str: | ||
| try: | ||
| bq_table = self.client.get_table(table_ref) | ||
| parameters = ( | ||
| bq_table.external_catalog_table_options.parameters | ||
| if bq_table.external_catalog_table_options and bq_table.external_catalog_table_options.parameters | ||
| else {} | ||
| ) | ||
| current_metadata_location = parameters.get(METADATA_LOCATION_PROP) | ||
| if current_metadata_location == new_metadata_location: | ||
| return "SUCCESS" | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can we extract the status to enum? |
||
|
|
||
| if not current_metadata_location: | ||
| return "FAILURE" | ||
|
|
||
| io = self._load_file_io(location=current_metadata_location) | ||
| current_metadata = FromInputFile.table_metadata(io.new_input(current_metadata_location)) | ||
|
|
||
| previous_metadata_locations = {log.metadata_file for log in current_metadata.metadata_log} | ||
| previous_metadata_location = parameters.get(PREVIOUS_METADATA_LOCATION_PROP) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need this check? |
||
| if previous_metadata_location: | ||
| previous_metadata_locations.add(previous_metadata_location) | ||
|
|
||
| return "SUCCESS" if new_metadata_location in previous_metadata_locations else "FAILURE" | ||
| except NotFound: | ||
| return "FAILURE" | ||
| except Exception: | ||
| return "UNKNOWN" | ||
|
|
||
| def _default_storage_location(self, location: str | None, dataset_ref: DatasetReference) -> str | None: | ||
| if location: | ||
| return location | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: non blocking it's more of an implementation detail but with the java implementation they are preventing orphaned metadata on commit failure. This doesn't affect table state, just maybe a nice to have.
https://github.com/apache/iceberg/blob/96a59408b271881a596f74697c05adb2dbc44094/bigquery/src/main/java/org/apache/iceberg/gcp/bigquery/BigQueryTableOperations.java#L116-L118
I'll let @rambleraptor chime in here.