Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ class ExternalPathStrategy(str, Enum):
SPECIFIC_FS = "specific-fs"


class MergeEngine(str, Enum):
"""
Specifies the merge engine for table with primary key.
"""
DEDUPLICATE = "deduplicate"
PARTIAL_UPDATE = "partial-update"
AGGREGATE = "aggregation"
FIRST_ROW = "first-row"


class CoreOptions:
"""Core options for Paimon tables."""
# File format constants
Expand Down Expand Up @@ -206,6 +216,14 @@ class CoreOptions:
.default_value(False)
.with_description("Whether to enable deletion vectors.")
)

MERGE_ENGINE: ConfigOption[MergeEngine] = (
ConfigOptions.key("merge-engine")
.enum_type(MergeEngine)
.default_value(MergeEngine.DEDUPLICATE)
.with_description("Specify the merge engine for table with primary key. "
"Options: deduplicate, partial-update, aggregation, first-row.")
)
# Commit options
COMMIT_USER_PREFIX: ConfigOption[str] = (
ConfigOptions.key("commit.user-prefix")
Expand Down Expand Up @@ -348,6 +366,9 @@ def data_evolution_enabled(self, default=None):
def deletion_vectors_enabled(self, default=None):
return self.options.get(CoreOptions.DELETION_VECTORS_ENABLED, default)

def merge_engine(self, default=None):
return self.options.get(CoreOptions.MERGE_ENGINE, default)

def data_file_external_paths(self, default=None):
external_paths_str = self.options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS, default)
if not external_paths_str:
Expand Down
35 changes: 35 additions & 0 deletions paimon-python/pypaimon/common/options/options_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
limitations under the License.
"""

from enum import Enum
from typing import Any, Type

from pypaimon.common.memory_size import MemorySize
Expand Down Expand Up @@ -45,6 +46,12 @@ def convert_value(value: Any, target_type: Type) -> Any:
if isinstance(value, target_type):
return value

try:
if issubclass(target_type, Enum):
return OptionsUtils.convert_to_enum(value, target_type)
except TypeError:
pass

# Handle string conversions
if target_type == str:
return OptionsUtils.convert_to_string(value)
Expand Down Expand Up @@ -117,3 +124,31 @@ def convert_to_memory_size(value: Any) -> MemorySize:
if isinstance(value, str):
return MemorySize.parse(value)
raise ValueError(f"Cannot convert {type(value)} to MemorySize")

@staticmethod
def convert_to_enum(value: Any, enum_class: Type[Enum]) -> Enum:

if isinstance(value, enum_class):
return value

if isinstance(value, str):
value_lower = value.lower().strip()
for enum_member in enum_class:
if enum_member.value.lower() == value_lower:
return enum_member
try:
return enum_class[value.upper()]
except KeyError:
raise ValueError(
f"Cannot convert '{value}' to {enum_class.__name__}. "
f"Valid values: {[e.value for e in enum_class]}"
)
elif isinstance(value, Enum):
for enum_member in enum_class:
if enum_member.value == value.value:
return enum_member
raise ValueError(
f"Cannot convert {value} (from {type(value).__name__}) to {enum_class.__name__}"
)
else:
raise ValueError(f"Cannot convert {type(value)} to {enum_class.__name__}")
72 changes: 55 additions & 17 deletions paimon-python/pypaimon/read/scanner/full_starting_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.bucket_mode import BucketMode
from pypaimon.manifest.simple_stats_evolutions import SimpleStatsEvolutions
from pypaimon.common.options.core_options import MergeEngine


class FullStartingScanner(StartingScanner):
Expand Down Expand Up @@ -471,6 +472,12 @@ def weight_func(f: DataFileMeta) -> int:
self._compute_split_start_end_row(splits, plan_start_row, plan_end_row)
return splits

def _without_delete_row(self, data_file_meta: DataFileMeta) -> bool:
# null to true to be compatible with old version
if data_file_meta.delete_row_count is None:
return True
return data_file_meta.delete_row_count == 0

def _create_primary_key_splits(
self, file_entries: List[ManifestEntry], deletion_files_map: dict = None) -> List['Split']:
if self.idx_of_this_subtask is not None:
Expand All @@ -479,28 +486,56 @@ def _create_primary_key_splits(
for entry in file_entries:
partitioned_files[(tuple(entry.partition.values), entry.bucket)].append(entry)

def single_weight_func(f: DataFileMeta) -> int:
return max(f.file_size, self.open_file_cost)

def weight_func(fl: List[DataFileMeta]) -> int:
return max(sum(f.file_size for f in fl), self.open_file_cost)

merge_engine = self.table.options.merge_engine()
merge_engine_first_row = merge_engine == MergeEngine.FIRST_ROW

splits = []
for key, file_entries in partitioned_files.items():
if not file_entries:
return []
continue

data_files: List[DataFileMeta] = [e.file for e in file_entries]
partition_sort_runs: List[List[SortedRun]] = IntervalPartition(data_files).partition()
sections: List[List[DataFileMeta]] = [
[file for s in sl for file in s.files]
for sl in partition_sort_runs
]

packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(sections, weight_func,
self.target_split_size)
flatten_packed_files: List[List[DataFileMeta]] = [
[file for sub_pack in pack for file in sub_pack]
for pack in packed_files
]
splits += self._build_split_from_pack(flatten_packed_files, file_entries, True, deletion_files_map)
raw_convertible = all(
f.level != 0 and self._without_delete_row(f)
for f in data_files
)

levels = {f.level for f in data_files}
one_level = len(levels) == 1

use_optimized_path = raw_convertible and (
self.deletion_vectors_enabled or merge_engine_first_row or one_level)
if use_optimized_path:
packed_files: List[List[DataFileMeta]] = self._pack_for_ordered(
data_files, single_weight_func, self.target_split_size
)
splits += self._build_split_from_pack(
packed_files, file_entries, True, deletion_files_map,
use_optimized_path)
else:
partition_sort_runs: List[List[SortedRun]] = IntervalPartition(data_files).partition()
sections: List[List[DataFileMeta]] = [
[file for s in sl for file in s.files]
for sl in partition_sort_runs
]

packed_files: List[List[List[DataFileMeta]]] = self._pack_for_ordered(sections, weight_func,
self.target_split_size)

flatten_packed_files: List[List[DataFileMeta]] = [
[file for sub_pack in pack for file in sub_pack]
for pack in packed_files
]
splits += self._build_split_from_pack(
flatten_packed_files, file_entries, True,
deletion_files_map, False)
return splits

def _create_data_evolution_splits(
Expand Down Expand Up @@ -595,12 +630,15 @@ def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]
return split_by_row_id

def _build_split_from_pack(self, packed_files, file_entries, for_primary_key_split: bool,
deletion_files_map: dict = None) -> List['Split']:
deletion_files_map: dict = None, use_optimized_path: bool = False) -> List['Split']:
splits = []
for file_group in packed_files:
raw_convertible = True
if for_primary_key_split:
raw_convertible = len(file_group) == 1
if use_optimized_path:
raw_convertible = True
elif for_primary_key_split:
raw_convertible = len(file_group) == 1 and self._without_delete_row(file_group[0])
else:
raw_convertible = True

file_paths = []
total_file_size = 0
Expand Down
Loading