diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py b/paimon-python/pypaimon/manifest/manifest_file_manager.py index 0ed50918253c..5975fcbc9fa9 100644 --- a/paimon-python/pypaimon/manifest/manifest_file_manager.py +++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py @@ -53,17 +53,6 @@ def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest def _process_single_manifest(manifest_file: ManifestFileMeta) -> List[ManifestEntry]: return self.read(manifest_file.file_name, manifest_entry_filter, drop_stats) - def _entry_identifier(e: ManifestEntry) -> tuple: - return ( - tuple(e.partition.values), - e.bucket, - e.file.level, - e.file.file_name, - tuple(e.file.extra_files) if e.file.extra_files else (), - e.file.embedded_index, - e.file.external_path, - ) - deleted_entry_keys = set() added_entries = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: @@ -73,11 +62,11 @@ def _entry_identifier(e: ManifestEntry) -> tuple: if entry.kind == 0: # ADD added_entries.append(entry) else: # DELETE - deleted_entry_keys.add(_entry_identifier(entry)) + deleted_entry_keys.add(entry.identifier()) final_entries = [ entry for entry in added_entries - if _entry_identifier(entry) not in deleted_entry_keys + if entry.identifier() not in deleted_entry_keys ] return final_entries diff --git a/paimon-python/pypaimon/manifest/schema/file_entry.py b/paimon-python/pypaimon/manifest/schema/file_entry.py new file mode 100644 index 000000000000..5429f25cb65c --- /dev/null +++ b/paimon-python/pypaimon/manifest/schema/file_entry.py @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Entry representing a file. +""" + + +class FileEntry: + """Entry representing a file. + + The same Identifier indicates that the FileEntry refers to the same data file. + """ + + class Identifier: + """Unique identifier for a file entry. + + Uses partition, bucket, level, fileName, extraFiles, + embeddedIndex and externalPath to identify a file. + """ + + def __init__(self, partition, bucket, level, file_name, + extra_files, embedded_index, external_path): + self.partition = partition + self.bucket = bucket + self.level = level + self.file_name = file_name + self.extra_files = extra_files + self.embedded_index = embedded_index + self.external_path = external_path + self._hash = None + + def __eq__(self, other): + if self is other: + return True + if other is None or not isinstance(other, FileEntry.Identifier): + return False + return (self.bucket == other.bucket + and self.level == other.level + and self.partition == other.partition + and self.file_name == other.file_name + and self.extra_files == other.extra_files + and self.embedded_index == other.embedded_index + and self.external_path == other.external_path) + + def __hash__(self): + if self._hash is None: + self._hash = hash(( + self.partition, + self.bucket, + self.level, + self.file_name, + self.extra_files, + self.embedded_index, + self.external_path, + )) + return self._hash + + def identifier(self): + """Build a unique Identifier for this file entry. + + Returns: + An Identifier instance. + """ + extra_files = (tuple(self.file.extra_files) + if self.file.extra_files else ()) + return FileEntry.Identifier( + partition=self.partition, + bucket=self.bucket, + level=self.file.level, + file_name=self.file.file_name, + extra_files=extra_files, + embedded_index=self.file.embedded_index, + external_path=self.file.external_path, + ) + + @staticmethod + def merge_entries(entries): + """Merge file entries: ADD and DELETE of the same file cancel each other. + + - ADD: if identifier already in map, raise error; otherwise add to map. + - DELETE: if identifier already in map, remove both (cancel); + otherwise add to map. + + Args: + entries: Iterable of FileEntry. + + Returns: + List of merged FileEntry values, preserving insertion order. + + Raises: + RuntimeError: If trying to add a file that is already in the map. + """ + entry_map = {} + + for entry in entries: + entry_identifier = entry.identifier() + if entry.kind == 0: # ADD + if entry_identifier in entry_map: + raise RuntimeError( + "Trying to add file {} which is already added.".format( + entry.file.file_name)) + entry_map[entry_identifier] = entry + elif entry.kind == 1: # DELETE + if entry_identifier in entry_map: + del entry_map[entry_identifier] + else: + entry_map[entry_identifier] = entry + else: + raise RuntimeError( + "Unknown entry kind: {}".format(entry.kind)) + + return list(entry_map.values()) diff --git a/paimon-python/pypaimon/manifest/schema/manifest_entry.py b/paimon-python/pypaimon/manifest/schema/manifest_entry.py index b1fd244dafc0..eba241786387 100644 --- a/paimon-python/pypaimon/manifest/schema/manifest_entry.py +++ b/paimon-python/pypaimon/manifest/schema/manifest_entry.py @@ -20,11 +20,12 @@ from pypaimon.manifest.schema.data_file_meta import (DATA_FILE_META_SCHEMA, DataFileMeta) +from pypaimon.manifest.schema.file_entry import FileEntry from pypaimon.table.row.generic_row import GenericRow @dataclass -class ManifestEntry: +class ManifestEntry(FileEntry): kind: int partition: GenericRow bucket: int diff --git a/paimon-python/pypaimon/schema/data_types.py b/paimon-python/pypaimon/schema/data_types.py index 318ddfe02fcf..6befaa4d4068 100755 --- a/paimon-python/pypaimon/schema/data_types.py +++ b/paimon-python/pypaimon/schema/data_types.py @@ -73,6 +73,16 @@ def __init__(self, type: str, nullable: bool = True): super().__init__(nullable) self.type = type + def __eq__(self, other): + if self is other: + return True + if not isinstance(other, AtomicType): + return False + return self.type == other.type and self.nullable == other.nullable + + def __hash__(self): + return hash((self.type, self.nullable)) + def to_dict(self) -> str: if not self.nullable: return self.type + " NOT NULL" @@ -95,6 +105,16 @@ def __init__(self, nullable: bool, element_type: DataType): super().__init__(nullable) self.element = element_type + def __eq__(self, other): + if self is other: + return True + if not isinstance(other, ArrayType): + return False + return self.element == other.element and self.nullable == other.nullable + + def __hash__(self): + return hash((self.element, self.nullable)) + def to_dict(self) -> Dict[str, Any]: return { "type": "ARRAY" + (" NOT NULL" if not self.nullable else ""), @@ -119,6 +139,16 @@ def __init__(self, nullable: bool, element_type: DataType): super().__init__(nullable) self.element = element_type + def __eq__(self, other): + if self is other: + return True + if not isinstance(other, MultisetType): + return False + return self.element == other.element and self.nullable == other.nullable + + def __hash__(self): + return hash((self.element, self.nullable)) + def to_dict(self) -> Dict[str, Any]: return { "type": "MULTISET{}{}".format('<' + str(self.element) + '>' if self.element else '', @@ -150,6 +180,18 @@ def __init__( self.key = key_type self.value = value_type + def __eq__(self, other): + if self is other: + return True + if not isinstance(other, MapType): + return False + return (self.key == other.key + and self.value == other.value + and self.nullable == other.nullable) + + def __hash__(self): + return hash((self.key, self.value, self.nullable)) + def to_dict(self) -> Dict[str, Any]: return { "type": "MAP<{}, {}>".format(self.key, self.value), @@ -199,6 +241,21 @@ def __init__( def from_dict(cls, data: Dict[str, Any]) -> "DataField": return DataTypeParser.parse_data_field(data) + def __eq__(self, other): + if self is other: + return True + if not isinstance(other, DataField): + return False + return (self.id == other.id + and self.name == other.name + and self.type == other.type + and self.description == other.description + and self.default_value == other.default_value) + + def __hash__(self): + return hash((self.id, self.name, self.type, + self.description, self.default_value)) + def to_dict(self) -> Dict[str, Any]: result = { self.FIELD_ID: self.id, @@ -223,6 +280,16 @@ def __init__(self, nullable: bool, fields: List[DataField]): super().__init__(nullable) self.fields = fields or [] + def __eq__(self, other): + if self is other: + return True + if not isinstance(other, RowType): + return False + return self.fields == other.fields and self.nullable == other.nullable + + def __hash__(self): + return hash((tuple(self.fields), self.nullable)) + def to_dict(self) -> Dict[str, Any]: return { "type": "ROW" + ("" if self.nullable else " NOT NULL"), @@ -587,7 +654,7 @@ def to_avro_type(field_type: pyarrow.DataType, field_name: str, parent_name: str = "record") -> Union[str, Dict[str, Any]]: if pyarrow.types.is_integer(field_type): if (pyarrow.types.is_signed_integer(field_type) and field_type.bit_width <= 32) or \ - (pyarrow.types.is_unsigned_integer(field_type) and field_type.bit_width < 32): + (pyarrow.types.is_unsigned_integer(field_type) and field_type.bit_width < 32): return "int" else: return "long" diff --git a/paimon-python/pypaimon/table/row/generic_row.py b/paimon-python/pypaimon/table/row/generic_row.py index be5c1ec80f39..4aa740de7219 100644 --- a/paimon-python/pypaimon/table/row/generic_row.py +++ b/paimon-python/pypaimon/table/row/generic_row.py @@ -51,6 +51,16 @@ def get_row_kind(self) -> RowKind: def __len__(self) -> int: return len(self.values) + def __eq__(self, other): + if self is other: + return True + if not isinstance(other, GenericRow): + return False + return self.values == other.values and self.row_kind == other.row_kind + + def __hash__(self): + return hash((tuple(self.values), tuple(self.fields), self.row_kind)) + def __str__(self): field_strs = [f"{field.name}={repr(value)}" for field, value in zip(self.fields, self.values)] return f"GenericRow(row_kind={self.row_kind.name}, {', '.join(field_strs)})" diff --git a/paimon-python/pypaimon/tests/table_update_test.py b/paimon-python/pypaimon/tests/table_update_test.py index ad9158e9febf..bf46f7bbac27 100644 --- a/paimon-python/pypaimon/tests/table_update_test.py +++ b/paimon-python/pypaimon/tests/table_update_test.py @@ -859,6 +859,199 @@ def test_update_partial_rows_across_two_files(self): 'Seattle', 'Boston', 'Denver', 'Miami', 'Atlanta'] self.assertEqual(expected_cities, cities, "Cities should remain unchanged") + def test_concurrent_updates_with_retry(self): + """Test data evolution with multiple threads performing concurrent updates. + + Each thread updates different rows of the same column. If a conflict occurs, + the thread retries until the update succeeds. After all threads complete, + the final result is verified to ensure all updates were applied correctly. + """ + import threading + table = self._create_table() + + # Table has 5 rows (row_id 0-4) after _create_table: + # row 0: age=25, row 1: age=30, row 2: age=35, row 3: age=40, row 4: age=45 + + # Thread 1 updates rows 0 and 1 + # Thread 2 updates rows 2 and 3 + # Thread 3 updates row 4 + thread_updates = [ + {'row_ids': [0, 1], 'ages': [100, 200]}, + {'row_ids': [2, 3], 'ages': [300, 400]}, + {'row_ids': [4], 'ages': [500]}, + ] + + errors = [] + success_counts = [0] * len(thread_updates) + + def update_worker(thread_index, update_spec): + max_retries = 20 + for attempt in range(max_retries): + try: + write_builder = table.new_batch_write_builder() + table_update = write_builder.new_update().with_update_type(['age']) + + update_data = pa.Table.from_pydict({ + '_ROW_ID': update_spec['row_ids'], + 'age': update_spec['ages'], + }) + + commit_messages = table_update.update_by_arrow_with_row_id(update_data) + + table_commit = write_builder.new_commit() + table_commit.commit(commit_messages) + table_commit.close() + + success_counts[thread_index] = attempt + 1 + return + except Exception as e: + import traceback + self.assertIn( + "For Data Evolution table, multiple 'MERGE INTO' operations have encountered conflicts", + str(e), + "Thread-{} attempt {} unexpected error: {}\n{}".format( + thread_index, attempt + 1, e, traceback.format_exc() + ) + ) + if attempt == max_retries - 1: + errors.append( + "Thread-{} failed after {} retries: {}".format( + thread_index, max_retries, e + ) + ) + + threads = [] + for idx, spec in enumerate(thread_updates): + thread = threading.Thread(target=update_worker, args=(idx, spec)) + threads.append(thread) + + for thread in threads: + thread.start() + + for thread in threads: + thread.join(timeout=120) + + if errors: + self.fail("Some threads failed:\n" + "\n".join(errors)) + + for idx, count in enumerate(success_counts): + self.assertGreater( + count, 0, + "Thread-{} did not succeed".format(idx) + ) + + # Verify the final data + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + result = table_read.to_arrow(splits) + + ages = result['age'].to_pylist() + expected_ages = [100, 200, 300, 400, 500] + self.assertEqual(expected_ages, ages, + "Concurrent updates did not produce correct final result") + + def test_concurrent_updates_same_rows_with_retry(self): + """Test data evolution with multiple threads updating overlapping rows. + + Multiple threads compete to update the same rows. Each thread retries + on conflict until success. The final result should reflect one of the + successful updates for each row (last-writer-wins). + """ + import threading + + table = self._create_table() + + # All threads update the same rows but with different values + thread_updates = [ + {'row_ids': [0, 1, 2], 'ages': [101, 201, 301], 'thread_name': 'A'}, + {'row_ids': [0, 1, 2], 'ages': [102, 202, 302], 'thread_name': 'B'}, + {'row_ids': [0, 1, 2], 'ages': [103, 203, 303], 'thread_name': 'C'}, + ] + + errors = [] + completion_order = [] + order_lock = threading.Lock() + + def update_worker(thread_index, update_spec): + max_retries = 30 + for attempt in range(max_retries): + try: + write_builder = table.new_batch_write_builder() + table_update = write_builder.new_update().with_update_type(['age']) + + update_data = pa.Table.from_pydict({ + '_ROW_ID': update_spec['row_ids'], + 'age': update_spec['ages'], + }) + + commit_messages = table_update.update_by_arrow_with_row_id(update_data) + + table_commit = write_builder.new_commit() + table_commit.commit(commit_messages) + table_commit.close() + + with order_lock: + completion_order.append(thread_index) + return + except Exception as e: + import traceback + self.assertIn( + "For Data Evolution table, multiple 'MERGE INTO' operations have encountered conflicts", + str(e), + "Thread-{} attempt {} unexpected error: {}\n{}".format( + thread_index, attempt + 1, e, traceback.format_exc() + ) + ) + if attempt == max_retries - 1: + errors.append( + "Thread-{} ({}) failed after {} retries: {}".format( + thread_index, update_spec['thread_name'], + max_retries, e + ) + ) + + threads = [] + for idx, spec in enumerate(thread_updates): + thread = threading.Thread(target=update_worker, args=(idx, spec)) + threads.append(thread) + + for thread in threads: + thread.start() + + for thread in threads: + thread.join(timeout=120) + + if errors: + self.fail("Some threads failed:\n" + "\n".join(errors)) + + self.assertEqual( + len(completion_order), len(thread_updates), + "Not all threads completed successfully" + ) + + # Verify the final data: the last thread to commit wins + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + splits = read_builder.new_scan().plan().splits() + result = table_read.to_arrow(splits) + + ages = result['age'].to_pylist() + + # The last thread to successfully commit determines rows 0-2 + last_winner = completion_order[-1] + winner_ages = thread_updates[last_winner]['ages'] + self.assertEqual(winner_ages[0], ages[0], + "Row 0 should reflect last writer's value") + self.assertEqual(winner_ages[1], ages[1], + "Row 1 should reflect last writer's value") + self.assertEqual(winner_ages[2], ages[2], + "Row 2 should reflect last writer's value") + + # Rows 3 and 4 should remain unchanged + self.assertEqual(40, ages[3], "Row 3 should remain unchanged") + self.assertEqual(45, ages[4], "Row 4 should remain unchanged") + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/utils/range_helper.py b/paimon-python/pypaimon/utils/range_helper.py new file mode 100644 index 000000000000..703d01859925 --- /dev/null +++ b/paimon-python/pypaimon/utils/range_helper.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +A helper class to handle ranges. +""" + + +class RangeHelper: + """A helper class to handle ranges. + + Provides methods to check if all ranges are the same and to merge + overlapping ranges into groups, preserving original order within groups. + + Args: + range_function: A callable that extracts a Range from an element T. + """ + + def __init__(self, range_function): + self._range_function = range_function + + def are_all_ranges_same(self, items): + """Check if all items have the same range. + + Args: + items: List of items to check. + + Returns: + True if all items have the same range, False otherwise. + """ + if not items: + return True + + first = items[0] + first_range = self._range_function(first) + if first_range is None: + return False + + for item in items[1:]: + if item is None: + return False + current_range = self._range_function(item) + if current_range is None: + return False + if current_range.from_ != first_range.from_ or current_range.to != first_range.to: + return False + + return True + + def merge_overlapping_ranges(self, items): + """Merge items with overlapping ranges into groups. + + Sorts items by range start, then merges overlapping groups. + Within each group, items are sorted by their original index. + + Args: + items: List of items with non-null ranges. + + Returns: + List of groups, where each group is a list of items + with overlapping ranges. + """ + if not items: + return [] + + # Create indexed values to track original indices + indexed = [] + for original_index, item in enumerate(items): + item_range = self._range_function(item) + if item_range is not None: + indexed.append(_IndexedValue(item, item_range, original_index)) + + if not indexed: + return [] + + # Sort by range start, then by range end + indexed.sort(key=lambda iv: (iv.start(), iv.end())) + + groups = [] + current_group = [indexed[0]] + current_end = indexed[0].end() + + # Iterate through sorted ranges and merge overlapping ones + for i in range(1, len(indexed)): + current = indexed[i] + if current.start() <= current_end: + current_group.append(current) + if current.end() > current_end: + current_end = current.end() + else: + groups.append(current_group) + current_group = [current] + current_end = current.end() + + # Add the last group + groups.append(current_group) + + # Convert groups to result, sorting each group by original index + result = [] + for group in groups: + group.sort(key=lambda iv: iv.original_index) + result.append([iv.value for iv in group]) + + return result + + +class _IndexedValue: + """A helper class to track original indices during range merging.""" + + def __init__(self, value, item_range, original_index): + self.value = value + self.range = item_range + self.original_index = original_index + + def start(self): + return self.range.from_ + + def end(self): + return self.range.to diff --git a/paimon-python/pypaimon/write/commit/__init__.py b/paimon-python/pypaimon/write/commit/__init__.py new file mode 100644 index 000000000000..a67d5ea255b2 --- /dev/null +++ b/paimon-python/pypaimon/write/commit/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/paimon-python/pypaimon/write/commit/commit_rollback.py b/paimon-python/pypaimon/write/commit/commit_rollback.py new file mode 100644 index 000000000000..66106efae7bd --- /dev/null +++ b/paimon-python/pypaimon/write/commit/commit_rollback.py @@ -0,0 +1,62 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +""" +Commit rollback to rollback 'COMPACT' commits for resolving conflicts. +""" + +from pypaimon.table.instant import Instant + + +class CommitRollback: + """Rollback COMPACT commits to resolve conflicts. + + When a conflict is detected during commit, if the latest snapshot is a + COMPACT commit, it can be rolled back via TableRollback. + """ + + def __init__(self, table_rollback): + """Initialize CommitRollback. + + Args: + table_rollback: A TableRollback instance used to perform the rollback. + """ + self._table_rollback = table_rollback + + def try_to_rollback(self, latest_snapshot): + """Try to rollback a COMPACT commit to resolve conflicts. + + Only rolls back COMPACT type commits. Delegates to TableRollback + to rollback to the previous snapshot (latest - 1), passing the + latest snapshot ID as from_snapshot. + + Args: + latest_snapshot: The latest snapshot that may need to be rolled back. + + Returns: + True if rollback succeeded, False otherwise. + """ + if latest_snapshot.commit_kind == "COMPACT": + latest_id = latest_snapshot.id + try: + self._table_rollback.rollback_to( + Instant.snapshot(latest_id - 1), latest_id) + return True + except Exception: + pass + return False diff --git a/paimon-python/pypaimon/write/commit/commit_scanner.py b/paimon-python/pypaimon/write/commit/commit_scanner.py new file mode 100644 index 000000000000..6158c86f5c81 --- /dev/null +++ b/paimon-python/pypaimon/write/commit/commit_scanner.py @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Manifest entries scanner for commit operations. +""" +from typing import Optional, List + +from pypaimon.common.predicate_builder import PredicateBuilder +from pypaimon.manifest.manifest_list_manager import ManifestListManager +from pypaimon.manifest.schema.manifest_entry import ManifestEntry +from pypaimon.read.scanner.file_scanner import FileScanner +from pypaimon.snapshot.snapshot import Snapshot + + +class CommitScanner: + """Manifest entries scanner for commit operations. + + This class provides methods to scan manifest entries for commit operations + """ + + def __init__(self, table, manifest_list_manager: ManifestListManager): + """Initialize CommitScanner. + + Args: + table: The FileStoreTable instance. + manifest_list_manager: Manager for reading manifest lists. + """ + self.table = table + self.manifest_list_manager = manifest_list_manager + + def read_all_entries_from_changed_partitions(self, latest_snapshot: Optional[Snapshot], + commit_entries: List[ManifestEntry]): + """Read all entries from the latest snapshot for partitions that are changed. + + Builds a partition predicate from delta entries and passes it to FileScanner, + so that manifest files and entries are filtered during reading rather than + after a full scan. + + Args: + latest_snapshot: The latest snapshot to read entries from. + commit_entries: The delta entries being committed, used to determine + which partitions have changed. + + Returns: + List of ManifestEntry from the latest snapshot for changed partitions. + """ + if latest_snapshot is None: + return [] + + partition_filter = self._build_partition_filter_from_entries(commit_entries) + + all_manifests = self.manifest_list_manager.read_all(latest_snapshot) + return FileScanner( + self.table, lambda: [], partition_filter + ).read_manifest_entries(all_manifests) + + def read_incremental_entries_from_changed_partitions(self, snapshot: Snapshot, + commit_entries: List[ManifestEntry]): + """Read incremental manifest entries from a snapshot's delta manifest list. + + Builds a partition predicate from delta entries and passes it to FileScanner, + so that manifest files and entries are filtered during reading rather than + after a full scan. + + Args: + snapshot: The snapshot to read incremental entries from. + commit_entries: The delta entries being committed, used to determine + which partitions have changed. + + Returns: + List of ManifestEntry matching the partition filter. + """ + delta_manifests = self.manifest_list_manager.read_delta(snapshot) + if not delta_manifests: + return [] + + partition_filter = self._build_partition_filter_from_entries(commit_entries) + + return FileScanner( + self.table, lambda: [], partition_filter + ).read_manifest_entries(delta_manifests) + + def _build_partition_filter_from_entries(self, entries: List[ManifestEntry]): + """Build a partition predicate that matches all partitions present in the given entries. + + Args: + entries: List of ManifestEntry whose partitions should be matched. + + Returns: + A Predicate matching any of the changed partitions, or None if + partition keys are empty. + """ + partition_keys = self.table.partition_keys + if not partition_keys: + return None + + changed_partitions = set() + for entry in entries: + changed_partitions.add(tuple(entry.partition.values)) + + if not changed_partitions: + return None + + predicate_builder = PredicateBuilder(self.table.fields) + partition_predicates = [] + for partition_values in changed_partitions: + sub_predicates = [] + for i, key in enumerate(partition_keys): + sub_predicates.append(predicate_builder.equal(key, partition_values[i])) + partition_predicates.append(predicate_builder.and_predicates(sub_predicates)) + + return predicate_builder.or_predicates(partition_predicates) diff --git a/paimon-python/pypaimon/write/commit/conflict_detection.py b/paimon-python/pypaimon/write/commit/conflict_detection.py new file mode 100644 index 000000000000..8e62c946e04d --- /dev/null +++ b/paimon-python/pypaimon/write/commit/conflict_detection.py @@ -0,0 +1,203 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Conflict detection for commit operations. +""" + + +from pypaimon.manifest.manifest_list_manager import ManifestListManager +from pypaimon.manifest.schema.data_file_meta import DataFileMeta +from pypaimon.manifest.schema.file_entry import FileEntry +from pypaimon.utils.range import Range +from pypaimon.utils.range_helper import RangeHelper +from pypaimon.write.commit.commit_scanner import CommitScanner + + +class ConflictDetection: + """Detects conflicts between base and delta files during commit. + + This class provides row ID range conflict checks and row ID from snapshot conflict checks + for Data Evolution tables. + """ + + def __init__(self, data_evolution_enabled, snapshot_manager, + manifest_list_manager: ManifestListManager, table, commit_scanner: CommitScanner): + """Initialize ConflictDetection. + + Args: + data_evolution_enabled: Whether data evolution feature is enabled. + snapshot_manager: Manager for reading snapshot metadata. + manifest_list_manager: Manager for reading manifest lists. + table: The FileStoreTable instance. + """ + self.data_evolution_enabled = data_evolution_enabled + self.snapshot_manager = snapshot_manager + self.manifest_list_manager = manifest_list_manager + self.table = table + self._row_id_check_from_snapshot = None + self.commit_scanner = commit_scanner + + def should_be_overwrite_commit(self): + """Check if the commit should be treated as an overwrite commit. + + returns True if rowIdCheckFromSnapshot is set. + + Returns: + True if the commit should be treated as OVERWRITE. + """ + return self._row_id_check_from_snapshot is not None + + def check_conflicts(self, latest_snapshot, base_entries, delta_entries, commit_kind): + """Run all conflict checks and return the first detected conflict. + + merges base_entries and delta_entries, then runs conflict checks + on the merged result. + + Args: + latest_snapshot: The latest snapshot at commit time. + base_entries: All entries read from the latest snapshot. + delta_entries: The delta entries being committed. + commit_kind: The kind of commit (e.g. "APPEND", "COMPACT", "OVERWRITE"). + + Returns: + A RuntimeError if a conflict is detected, otherwise None. + """ + all_entries = list(base_entries) + list(delta_entries) + + try: + merged_entries = FileEntry.merge_entries(all_entries) + except Exception as e: + return RuntimeError( + "File deletion conflicts detected! Give up committing. " + str(e)) + + conflict = self.check_row_id_range_conflicts(commit_kind, merged_entries) + if conflict is not None: + return conflict + + return self.check_row_id_from_snapshot(latest_snapshot, delta_entries) + + def check_row_id_range_conflicts(self, commit_kind, commit_entries): + """Check for row ID range conflicts among merged entries. + + only enabled when data evolution is active, and checks that + overlapping row ID ranges in non-blob data files are identical. + + Args: + commit_kind: The kind of commit (e.g. "APPEND", "COMPACT"). + commit_entries: The entries being committed. + + Returns: + A RuntimeError if conflict is detected, otherwise None. + """ + if not self.data_evolution_enabled: + return None + if self._row_id_check_from_snapshot is None and commit_kind != "COMPACT": + return None + + entries_with_row_id = [ + entry for entry in commit_entries + if entry.file.first_row_id is not None + ] + + if not entries_with_row_id: + return None + + range_helper = RangeHelper(lambda entry: entry.file.row_id_range()) + merged_groups = range_helper.merge_overlapping_ranges(entries_with_row_id) + + for group in merged_groups: + data_files = [ + entry for entry in group + if not DataFileMeta.is_blob_file(entry.file.file_name) + ] + if not range_helper.are_all_ranges_same(data_files): + file_descriptions = [ + "{name}(rowId={row_id}, count={count})".format( + name=entry.file.file_name, + row_id=entry.file.first_row_id, + count=entry.file.row_count, + ) + for entry in data_files + ] + return RuntimeError( + "For Data Evolution table, multiple 'MERGE INTO' and 'COMPACT' " + "operations have encountered conflicts, data files: " + + str(file_descriptions)) + + return None + + def check_row_id_from_snapshot(self, latest_snapshot, commit_entries): + """Check for row ID conflicts from a specific snapshot onwards. + + collects row ID ranges from delta entries, then checks if any + incremental changes between the check snapshot and latest snapshot + have overlapping row ID ranges. + + Args: + latest_snapshot: The latest snapshot at commit time. + commit_entries: The delta entries being committed. + + Returns: + A RuntimeError if conflict is detected, otherwise None. + """ + if not self.data_evolution_enabled: + return None + if self._row_id_check_from_snapshot is None: + return None + + history_id_ranges = [] + for entry in commit_entries: + first_row_id = entry.file.first_row_id + row_count = entry.file.row_count + if first_row_id is not None: + history_id_ranges.append( + Range(first_row_id, first_row_id + row_count - 1)) + + check_snapshot = self.snapshot_manager.get_snapshot_by_id( + self._row_id_check_from_snapshot) + if check_snapshot is None or check_snapshot.next_row_id is None: + raise RuntimeError( + "Next row id cannot be null for snapshot " + "{snapshot}.".format(snapshot=self._row_id_check_from_snapshot)) + check_next_row_id = check_snapshot.next_row_id + + for snapshot_id in range( + self._row_id_check_from_snapshot + 1, + latest_snapshot.id + 1): + snapshot = self.snapshot_manager.get_snapshot_by_id(snapshot_id) + if snapshot is None: + continue + if snapshot.commit_kind == "COMPACT": + continue + + incremental_entries = self.commit_scanner.read_incremental_entries_from_changed_partitions( + snapshot, commit_entries) + for entry in incremental_entries: + file_range = entry.file.row_id_range() + if file_range is None: + continue + if file_range.from_ < check_next_row_id: + for history_range in history_id_ranges: + if history_range.overlaps(file_range): + return RuntimeError( + "For Data Evolution table, multiple 'MERGE INTO' " + "operations have encountered conflicts, updating " + "the same file, which can render some updates " + "ineffective.") + + return None diff --git a/paimon-python/pypaimon/write/commit_message.py b/paimon-python/pypaimon/write/commit_message.py index b36a1b1bbf4f..d560c5a2479f 100644 --- a/paimon-python/pypaimon/write/commit_message.py +++ b/paimon-python/pypaimon/write/commit_message.py @@ -17,7 +17,7 @@ ################################################################################ from dataclasses import dataclass -from typing import List, Tuple +from typing import List, Tuple, Optional from pypaimon.manifest.schema.data_file_meta import DataFileMeta @@ -27,6 +27,7 @@ class CommitMessage: partition: Tuple bucket: int new_files: List[DataFileMeta] + check_from_snapshot: Optional[int] = -1 def is_empty(self): return not self.new_files diff --git a/paimon-python/pypaimon/write/file_store_commit.py b/paimon-python/pypaimon/write/file_store_commit.py index 80eb858087ef..ee7d7a9694aa 100644 --- a/paimon-python/pypaimon/write/file_store_commit.py +++ b/paimon-python/pypaimon/write/file_store_commit.py @@ -27,6 +27,7 @@ from pypaimon.manifest.manifest_list_manager import ManifestListManager from pypaimon.manifest.schema.data_file_meta import DataFileMeta from pypaimon.manifest.schema.manifest_entry import ManifestEntry + from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta from pypaimon.manifest.schema.simple_stats import SimpleStats from pypaimon.read.scanner.file_scanner import FileScanner @@ -36,6 +37,9 @@ from pypaimon.snapshot.snapshot_manager import SnapshotManager from pypaimon.table.row.generic_row import GenericRow from pypaimon.table.row.offset_row import OffsetRow +from pypaimon.write.commit.commit_rollback import CommitRollback +from pypaimon.write.commit.commit_scanner import CommitScanner +from pypaimon.write.commit.conflict_detection import ConflictDetection from pypaimon.write.commit_message import CommitMessage logger = logging.getLogger(__name__) @@ -93,11 +97,30 @@ def __init__(self, snapshot_commit: SnapshotCommit, table, commit_user: str): self.commit_min_retry_wait = table.options.commit_min_retry_wait() self.commit_max_retry_wait = table.options.commit_max_retry_wait() + self.commit_scanner = CommitScanner(table, self.manifest_list_manager) + + self.conflict_detection = ConflictDetection( + data_evolution_enabled=table.options.data_evolution_enabled(), + snapshot_manager=self.snapshot_manager, + manifest_list_manager=self.manifest_list_manager, + table=table, + commit_scanner=self.commit_scanner + ) + + table_rollback = table.catalog_environment.catalog_table_rollback() + self.rollback = CommitRollback(table_rollback) if table_rollback is not None else None + def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): """Commit the given commit messages in normal append mode.""" if not commit_messages: return + # Extract the minimum check_from_snapshot from commit messages + valid_snapshots = [msg.check_from_snapshot for msg in commit_messages + if msg.check_from_snapshot != -1] + if valid_snapshots: + self.conflict_detection._row_id_check_from_snapshot = min(valid_snapshots) + logger.info( "Ready to commit to table %s, number of commit messages: %d", self.table.identifier, @@ -116,9 +139,20 @@ def commit(self, commit_messages: List[CommitMessage], commit_identifier: int): )) logger.info("Finished collecting changes, including: %d entries", len(commit_entries)) - self._try_commit(commit_kind="APPEND", + + commit_kind = "APPEND" + detect_conflicts = False + allow_rollback = False + if self.conflict_detection.should_be_overwrite_commit(): + commit_kind = "OVERWRITE" + detect_conflicts = True + allow_rollback = True + + self._try_commit(commit_kind=commit_kind, commit_identifier=commit_identifier, - commit_entries_plan=lambda snapshot: commit_entries) + commit_entries_plan=lambda snapshot: commit_entries, + detect_conflicts=detect_conflicts, + allow_rollback=allow_rollback) def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], commit_identifier: int): """Commit the given commit messages in overwrite mode.""" @@ -149,7 +183,9 @@ def overwrite(self, overwrite_partition, commit_messages: List[CommitMessage], c commit_kind="OVERWRITE", commit_identifier=commit_identifier, commit_entries_plan=lambda snapshot: self._generate_overwrite_entries( - snapshot, partition_filter, commit_messages) + snapshot, partition_filter, commit_messages), + detect_conflicts=True, + allow_rollback=False, ) def drop_partitions(self, partitions: List[Dict[str, str]], commit_identifier: int) -> None: @@ -187,10 +223,13 @@ def drop_partitions(self, partitions: List[Dict[str, str]], commit_identifier: i commit_kind="OVERWRITE", commit_identifier=commit_identifier, commit_entries_plan=lambda snapshot: self._generate_overwrite_entries( - snapshot, partition_filter, []) + snapshot, partition_filter, []), + detect_conflicts=True, + allow_rollback=False, ) - def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan): + def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan, + detect_conflicts=False, allow_rollback=False): import threading retry_count = 0 @@ -211,7 +250,9 @@ def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan): commit_kind=commit_kind, commit_entries=commit_entries, commit_identifier=commit_identifier, - latest_snapshot=latest_snapshot + latest_snapshot=latest_snapshot, + detect_conflicts=detect_conflicts, + allow_rollback=allow_rollback, ) if result.is_success(): @@ -267,11 +308,13 @@ def _try_commit(self, commit_kind, commit_identifier, commit_entries_plan): def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str, commit_entries: List[ManifestEntry], commit_identifier: int, - latest_snapshot: Optional[Snapshot]) -> CommitResult: + latest_snapshot: Optional[Snapshot], + detect_conflicts: bool = False, + allow_rollback: bool = False) -> CommitResult: start_millis = int(time.time() * 1000) if self._is_duplicate_commit(retry_result, latest_snapshot, commit_identifier, commit_kind): return SuccessResult() - + unique_id = uuid.uuid4() base_manifest_list = f"manifest-list-{unique_id}-0" delta_manifest_list = f"manifest-list-{unique_id}-1" @@ -296,8 +339,20 @@ def _try_commit_once(self, retry_result: Optional[RetryResult], commit_kind: str # Assign row IDs to new files and get the next row ID for the snapshot commit_entries, next_row_id = self._assign_row_tracking_meta(first_row_id_start, commit_entries) + # Conflict detection: read base entries from latest snapshot, then check conflicts + if detect_conflicts and latest_snapshot is not None: + base_entries = self.commit_scanner.read_all_entries_from_changed_partitions( + latest_snapshot, commit_entries) + conflict_exception = self.conflict_detection.check_conflicts( + latest_snapshot, base_entries, commit_entries, commit_kind) + + if conflict_exception is not None: + if allow_rollback and self.rollback is not None: + if self.rollback.try_to_rollback(latest_snapshot): + return RetryResult(latest_snapshot, conflict_exception) + raise conflict_exception + try: - # TODO: implement noConflictsOrFail logic new_manifest_file_meta = self._write_manifest_file(commit_entries, new_manifest_file) self.manifest_list_manager.write(delta_manifest_list, [new_manifest_file_meta]) @@ -452,12 +507,12 @@ def _is_duplicate_commit(self, retry_result, latest_snapshot, commit_identifier, return True return False - def _generate_overwrite_entries(self, latestSnapshot, partition_filter, commit_messages): + def _generate_overwrite_entries(self, latest_snapshot, partition_filter, commit_messages): """Generate commit entries for OVERWRITE mode based on latest snapshot.""" entries = [] - current_entries = [] if latestSnapshot is None \ + current_entries = [] if latest_snapshot is None \ else (FileScanner(self.table, lambda: [], partition_filter). - read_manifest_entries(self.manifest_list_manager.read_all(latestSnapshot))) + read_manifest_entries(self.manifest_list_manager.read_all(latest_snapshot))) for entry in current_entries: entry.kind = 1 # DELETE entries.append(entry) diff --git a/paimon-python/pypaimon/write/table_update_by_row_id.py b/paimon-python/pypaimon/write/table_update_by_row_id.py index b027564ff398..48f61751ec43 100644 --- a/paimon-python/pypaimon/write/table_update_by_row_id.py +++ b/paimon-python/pypaimon/write/table_update_by_row_id.py @@ -47,7 +47,8 @@ def __init__(self, table, commit_user: str): self.commit_user = commit_user # Load existing first_row_ids and build partition map - (self.first_row_ids, + (self.snapshot_id, + self.first_row_ids, self.first_row_id_to_partition_map, self.first_row_id_to_row_count_map, self.total_row_count, @@ -76,7 +77,9 @@ def _load_existing_files_info(self): total_row_count = sum(first_row_id_to_row_count_map.values()) - return (sorted(list(set(first_row_ids))), + snapshot_id = self.table.snapshot_manager().get_latest_snapshot().id + return (snapshot_id, + sorted(list(set(first_row_ids))), first_row_id_to_partition_map, first_row_id_to_row_count_map, total_row_count, @@ -308,6 +311,7 @@ def _write_group(self, partition: GenericRow, first_row_id: int, # Assign first_row_id to the new files for msg in commit_messages: + msg.check_from_snapshot = self.snapshot_id for file in msg.new_files: # Assign the same first_row_id as the original file file.first_row_id = first_row_id