Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions paimon-python/pypaimon/manifest/manifest_file_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,6 @@ def read_entries_parallel(self, manifest_files: List[ManifestFileMeta], manifest
def _process_single_manifest(manifest_file: ManifestFileMeta) -> List[ManifestEntry]:
return self.read(manifest_file.file_name, manifest_entry_filter, drop_stats)

def _entry_identifier(e: ManifestEntry) -> tuple:
return (
tuple(e.partition.values),
e.bucket,
e.file.level,
e.file.file_name,
tuple(e.file.extra_files) if e.file.extra_files else (),
e.file.embedded_index,
e.file.external_path,
)

deleted_entry_keys = set()
added_entries = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
Expand All @@ -73,11 +62,11 @@ def _entry_identifier(e: ManifestEntry) -> tuple:
if entry.kind == 0: # ADD
added_entries.append(entry)
else: # DELETE
deleted_entry_keys.add(_entry_identifier(entry))
deleted_entry_keys.add(entry.identifier())

final_entries = [
entry for entry in added_entries
if _entry_identifier(entry) not in deleted_entry_keys
if entry.identifier() not in deleted_entry_keys
]
return final_entries

Expand Down
127 changes: 127 additions & 0 deletions paimon-python/pypaimon/manifest/schema/file_entry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Entry representing a file.
"""


class FileEntry:
"""Entry representing a file.

The same Identifier indicates that the FileEntry refers to the same data file.
"""

class Identifier:
"""Unique identifier for a file entry.

Uses partition, bucket, level, fileName, extraFiles,
embeddedIndex and externalPath to identify a file.
"""

def __init__(self, partition, bucket, level, file_name,
extra_files, embedded_index, external_path):
self.partition = partition
self.bucket = bucket
self.level = level
self.file_name = file_name
self.extra_files = extra_files
self.embedded_index = embedded_index
self.external_path = external_path
self._hash = None

def __eq__(self, other):
if self is other:
return True
if other is None or not isinstance(other, FileEntry.Identifier):
return False
return (self.bucket == other.bucket
and self.level == other.level
and self.partition == other.partition
and self.file_name == other.file_name
and self.extra_files == other.extra_files
and self.embedded_index == other.embedded_index
and self.external_path == other.external_path)

def __hash__(self):
if self._hash is None:
self._hash = hash((
self.partition,
self.bucket,
self.level,
self.file_name,
self.extra_files,
self.embedded_index,
self.external_path,
))
return self._hash

def identifier(self):
"""Build a unique Identifier for this file entry.

Returns:
An Identifier instance.
"""
extra_files = (tuple(self.file.extra_files)
if self.file.extra_files else ())
return FileEntry.Identifier(
partition=self.partition,
bucket=self.bucket,
level=self.file.level,
file_name=self.file.file_name,
extra_files=extra_files,
embedded_index=self.file.embedded_index,
external_path=self.file.external_path,
)

@staticmethod
def merge_entries(entries):
"""Merge file entries: ADD and DELETE of the same file cancel each other.

- ADD: if identifier already in map, raise error; otherwise add to map.
- DELETE: if identifier already in map, remove both (cancel);
otherwise add to map.

Args:
entries: Iterable of FileEntry.

Returns:
List of merged FileEntry values, preserving insertion order.

Raises:
RuntimeError: If trying to add a file that is already in the map.
"""
entry_map = {}

for entry in entries:
entry_identifier = entry.identifier()
if entry.kind == 0: # ADD
if entry_identifier in entry_map:
raise RuntimeError(
"Trying to add file {} which is already added.".format(
entry.file.file_name))
entry_map[entry_identifier] = entry
elif entry.kind == 1: # DELETE
if entry_identifier in entry_map:
del entry_map[entry_identifier]
else:
entry_map[entry_identifier] = entry
else:
raise RuntimeError(
"Unknown entry kind: {}".format(entry.kind))

return list(entry_map.values())
3 changes: 2 additions & 1 deletion paimon-python/pypaimon/manifest/schema/manifest_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@

from pypaimon.manifest.schema.data_file_meta import (DATA_FILE_META_SCHEMA,
DataFileMeta)
from pypaimon.manifest.schema.file_entry import FileEntry
from pypaimon.table.row.generic_row import GenericRow


@dataclass
class ManifestEntry:
class ManifestEntry(FileEntry):
kind: int
partition: GenericRow
bucket: int
Expand Down
69 changes: 68 additions & 1 deletion paimon-python/pypaimon/schema/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ def __init__(self, type: str, nullable: bool = True):
super().__init__(nullable)
self.type = type

def __eq__(self, other):
if self is other:
return True
if not isinstance(other, AtomicType):
return False
return self.type == other.type and self.nullable == other.nullable

def __hash__(self):
return hash((self.type, self.nullable))

def to_dict(self) -> str:
if not self.nullable:
return self.type + " NOT NULL"
Expand All @@ -95,6 +105,16 @@ def __init__(self, nullable: bool, element_type: DataType):
super().__init__(nullable)
self.element = element_type

def __eq__(self, other):
if self is other:
return True
if not isinstance(other, ArrayType):
return False
return self.element == other.element and self.nullable == other.nullable

def __hash__(self):
return hash((self.element, self.nullable))

def to_dict(self) -> Dict[str, Any]:
return {
"type": "ARRAY" + (" NOT NULL" if not self.nullable else ""),
Expand All @@ -119,6 +139,16 @@ def __init__(self, nullable: bool, element_type: DataType):
super().__init__(nullable)
self.element = element_type

def __eq__(self, other):
if self is other:
return True
if not isinstance(other, MultisetType):
return False
return self.element == other.element and self.nullable == other.nullable

def __hash__(self):
return hash((self.element, self.nullable))

def to_dict(self) -> Dict[str, Any]:
return {
"type": "MULTISET{}{}".format('<' + str(self.element) + '>' if self.element else '',
Expand Down Expand Up @@ -150,6 +180,18 @@ def __init__(
self.key = key_type
self.value = value_type

def __eq__(self, other):
if self is other:
return True
if not isinstance(other, MapType):
return False
return (self.key == other.key
and self.value == other.value
and self.nullable == other.nullable)

def __hash__(self):
return hash((self.key, self.value, self.nullable))

def to_dict(self) -> Dict[str, Any]:
return {
"type": "MAP<{}, {}>".format(self.key, self.value),
Expand Down Expand Up @@ -199,6 +241,21 @@ def __init__(
def from_dict(cls, data: Dict[str, Any]) -> "DataField":
return DataTypeParser.parse_data_field(data)

def __eq__(self, other):
if self is other:
return True
if not isinstance(other, DataField):
return False
return (self.id == other.id
and self.name == other.name
and self.type == other.type
and self.description == other.description
and self.default_value == other.default_value)

def __hash__(self):
return hash((self.id, self.name, self.type,
self.description, self.default_value))

def to_dict(self) -> Dict[str, Any]:
result = {
self.FIELD_ID: self.id,
Expand All @@ -223,6 +280,16 @@ def __init__(self, nullable: bool, fields: List[DataField]):
super().__init__(nullable)
self.fields = fields or []

def __eq__(self, other):
if self is other:
return True
if not isinstance(other, RowType):
return False
return self.fields == other.fields and self.nullable == other.nullable

def __hash__(self):
return hash((tuple(self.fields), self.nullable))

def to_dict(self) -> Dict[str, Any]:
return {
"type": "ROW" + ("" if self.nullable else " NOT NULL"),
Expand Down Expand Up @@ -587,7 +654,7 @@ def to_avro_type(field_type: pyarrow.DataType, field_name: str,
parent_name: str = "record") -> Union[str, Dict[str, Any]]:
if pyarrow.types.is_integer(field_type):
if (pyarrow.types.is_signed_integer(field_type) and field_type.bit_width <= 32) or \
(pyarrow.types.is_unsigned_integer(field_type) and field_type.bit_width < 32):
(pyarrow.types.is_unsigned_integer(field_type) and field_type.bit_width < 32):
return "int"
else:
return "long"
Expand Down
10 changes: 10 additions & 0 deletions paimon-python/pypaimon/table/row/generic_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ def get_row_kind(self) -> RowKind:
def __len__(self) -> int:
return len(self.values)

def __eq__(self, other):
if self is other:
return True
if not isinstance(other, GenericRow):
return False
return self.values == other.values and self.row_kind == other.row_kind

def __hash__(self):
return hash((tuple(self.values), tuple(self.fields), self.row_kind))

def __str__(self):
field_strs = [f"{field.name}={repr(value)}" for field, value in zip(self.fields, self.values)]
return f"GenericRow(row_kind={self.row_kind.name}, {', '.join(field_strs)})"
Expand Down
Loading