Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions paimon-python/pypaimon/catalog/filesystem_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ def _load_data_table(self, identifier: Identifier) -> FileStoreTable:
table_schema = self.get_table_schema(identifier)

# Create catalog environment for filesystem catalog
# Filesystem catalog doesn't support version management by default
from pypaimon.catalog.filesystem_catalog_loader import FileSystemCatalogLoader
catalog_environment = CatalogEnvironment(
identifier=identifier,
uuid=None, # Filesystem catalog doesn't track table UUIDs
catalog_loader=None, # No catalog loader for filesystem
uuid=None,
catalog_loader=FileSystemCatalogLoader(self.catalog_context),
supports_version_management=False
)

Expand Down
26 changes: 26 additions & 0 deletions paimon-python/pypaimon/common/options/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ class CoreOptions:
.with_description("Comma-separated field names to treat as BLOB view fields.")
)

BLOB_VIEW_RESOLVE_ENABLED: ConfigOption[bool] = (
ConfigOptions.key("blob-view.resolve.enabled")
.boolean_type()
.default_value(True)
.with_description(
"Whether to resolve blob-view-field values from upstream tables at "
"read time. Set to false to preserve BlobViewStruct references when "
"forwarding blob view values to another blob-view table."
)
)

VECTOR_FIELD: ConfigOption[str] = (
ConfigOptions.key("vector-field")
.string_type()
Expand Down Expand Up @@ -729,6 +740,21 @@ def variant_shredding_schema(self) -> Optional[str]:

def blob_descriptor_fields(self, default=None):
value = self.options.get(CoreOptions.BLOB_DESCRIPTOR_FIELD, default)
return CoreOptions._parse_field_set(value)

def blob_view_fields(self, default=None):
value = self.options.get(CoreOptions.BLOB_VIEW_FIELD, default)
return CoreOptions._parse_field_set(value)

def blob_field(self, default=None):
value = self.options.get(CoreOptions.BLOB_FIELD, default)
return CoreOptions._parse_field_set(value)

def blob_view_resolve_enabled(self, default=True):
return self.options.get(CoreOptions.BLOB_VIEW_RESOLVE_ENABLED, default)

@staticmethod
def _parse_field_set(value):
if value is None:
return set()
if isinstance(value, str):
Expand Down
179 changes: 150 additions & 29 deletions paimon-python/pypaimon/read/reader/blob_descriptor_convert_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,68 +15,189 @@
# specific language governing permissions and limitations
# under the License.

from typing import Optional
from typing import Callable, Optional, Set

import pyarrow
from pyarrow import RecordBatch

from pypaimon.common.options.core_options import CoreOptions
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.table.row.blob import Blob, BlobViewStruct


class BlobDescriptorConvertReader(RecordBatchReader):
def __init__(self, inner: RecordBatchReader, table):
class BlobInlineConvertReader(RecordBatchReader):
"""Resolves BlobView and BlobDescriptor fields in record batches.

Processing is split into two clear stages:
Stage 1 (BlobView resolution): If view fields exist, use a lightweight
prescan reader (only projecting view columns) to collect
BlobViewStructs, bulk-preload their descriptors, then read
full data from the main reader and replace view field values
with the corresponding BlobDescriptor serialized bytes.
Stage 2 (BlobData resolution): Controlled by blob-as-descriptor option.
If false, resolve all BlobDescriptor bytes (from both descriptor
fields and view fields) into real blob data bytes.
If true, return as-is.
"""

def __init__(self, inner: RecordBatchReader, table,
prescan_reader_factory: Optional[Callable[[Set[str]], RecordBatchReader]] = None):
"""
Args:
inner: The main data reader (reads all columns).
table: The table instance.
prescan_reader_factory: Optional factory that creates a lightweight
reader projecting only the specified field names. Used for
prescan to collect BlobViewStructs without reading all columns.
Signature: (field_names: Set[str]) -> RecordBatchReader
"""
self._inner = inner
self._table = table
self._descriptor_fields = CoreOptions.blob_descriptor_fields(table.options)
self._prescan_reader_factory = prescan_reader_factory
self.file_io = inner.file_io
self.blob_field_indices = inner.blob_field_indices
# Preserve original BlobViewStruct bytes when resolve disabled: skip both
# view resolution (Stage 1) and descriptor-to-data resolution (Stage 2).
resolve_enabled = CoreOptions.blob_view_resolve_enabled(
table.options) and self._table.catalog_environment.catalog_loader is not None
self._view_fields = CoreOptions.blob_view_fields(table.options) if resolve_enabled else set()
self._descriptor_fields = CoreOptions.blob_descriptor_fields(table.options)
self._blob_as_descriptor = CoreOptions.blob_as_descriptor(table.options)
self._prescan_done = False
self._blob_view_lookup = None

def read_arrow_batch(self) -> Optional[RecordBatch]:
import pyarrow
# Align with Java: only enter blob view resolution when catalog_loader is available
# If catalog_loader is None, skip both Stage 1 (view resolution) and Stage 2 (descriptor resolution)
# This matches Java's behavior in DataEvolutionTableRead.createReader where blob view reader
# is only created when catalogContext != null
if self._view_fields and not self._prescan_done:
self._prescan_view_structs()

batch = self._inner.read_arrow_batch()
if batch is None:
return None
return self._convert_batch(batch, pyarrow)
# Resolve view fields using the preloaded lookup
if self._view_fields and self._blob_view_lookup is not None:
batch = self._resolve_view_fields(batch, self._blob_view_lookup)
# Resolve BlobDescriptor -> real bytes (if blob-as-descriptor=false)
return self._resolve_descriptor_fields(batch)

# ------------------------------------------------------------------
# Stage 1: BlobView prescan (lightweight, only reads view columns)
# ------------------------------------------------------------------

def _prescan_view_structs(self):
"""Use a lightweight prescan reader (projecting only view columns) to
collect all BlobViewStructs and bulk-preload their descriptors."""
from pypaimon.table.row.blob import BlobViewStruct
from pypaimon.utils.blob_view_lookup import BlobViewLookup

def _convert_batch(self, batch, pyarrow):
from pypaimon.table.row.blob import Blob, BlobDescriptor
all_view_structs = []

result = batch
for field_name in self._descriptor_fields:
if field_name not in result.schema.names:
prescan_reader = self._prescan_reader_factory(self._view_fields)
try:
while True:
batch = prescan_reader.read_arrow_batch()
Copy link
Copy Markdown
Contributor

@JingsongLi JingsongLi Jun 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P2] This prescan exhausts the prescan reader before the first main batch is returned. In Python, limit is enforced by outer TableRead after a batch is returned or sliced, and DataEvolutionSplitRead itself never receives that limit. Java wires topN/limit into the prescan reader in DataEvolutionTableRead.configureBlobViewPrescanRead. As a result, Python with_limit(1) can still prescan and validate every later BlobViewStruct in the same split, and can even fail on a bad reference that the user-visible result would never return. Please pass limit/topN into the prescan scope, or at least add a blob-view + limit test to lock down the intended behavior.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In pypaimon, currently, the limit pushdown in the Read stage of the Append table is not supported. I will support this function in the next pr

if batch is None:
break
for field_name in self._view_fields:
if field_name not in batch.schema.names:
continue
for value in batch.column(field_name).to_pylist():
value = self._normalize_blob_to_bytes(value)
if value is None:
continue
if isinstance(value, bytes) and BlobViewStruct.is_blob_view_struct(value):
all_view_structs.append(BlobViewStruct.deserialize(value))
else:
raise ValueError(
f"Expected BlobViewStruct bytes in view field '{field_name}', "
f"but got non-BlobViewStruct bytes."
)
finally:
prescan_reader.close()

# Bulk-preload BlobViewStruct -> BlobDescriptor mapping
if all_view_structs:
self._blob_view_lookup = BlobViewLookup(self._table)
self._blob_view_lookup.preload(all_view_structs)
self._prescan_done = True

def _resolve_view_fields(self, batch, blob_view_lookup):
"""Replace BlobViewStruct bytes in view fields with the corresponding
BlobDescriptor serialized bytes."""
for field_name in self._view_fields:
if field_name not in batch.schema.names:
continue
values = result.column(field_name).to_pylist()
values = [self._normalize_blob_to_bytes(v) for v in batch.column(field_name).to_pylist()]
converted_values = []
for value in values:
if value is None:
converted_values.append(None)
continue
if hasattr(value, 'as_py'):
value = value.as_py()
if isinstance(value, str):
value = value.encode('utf-8')
if isinstance(value, bytearray):
value = bytes(value)
if not isinstance(value, bytes):
converted_values.append(value)
continue
try:
descriptor = BlobDescriptor.deserialize(value)
if descriptor.serialize() != value:
converted_values.append(value)
continue
uri_reader = self._table.file_io.uri_reader_factory.create(descriptor.uri)
converted_values.append(Blob.from_descriptor(uri_reader, descriptor).to_data())
except Exception:
if not BlobViewStruct.is_blob_view_struct(value):
converted_values.append(value)
continue
view_struct = BlobViewStruct.deserialize(value)
if blob_view_lookup.resolve_to_null(view_struct):
converted_values.append(None)
else:
descriptor = blob_view_lookup.resolve_descriptor(view_struct)
converted_values.append(descriptor.serialize())

column_idx = result.schema.names.index(field_name)
result = result.set_column(
column_idx = batch.schema.names.index(field_name)
batch = batch.set_column(
column_idx,
pyarrow.field(field_name, pyarrow.large_binary(), nullable=True),
pyarrow.array(converted_values, type=pyarrow.large_binary()),
)
return result
return batch

# ------------------------------------------------------------------
# Stage 2: BlobData resolution (unified exit)
# ------------------------------------------------------------------

def _resolve_descriptor_fields(self, batch):
if self._blob_as_descriptor:
return batch

all_inline_blob_fields = self._descriptor_fields | self._view_fields
for field_name in all_inline_blob_fields:
if field_name not in batch.schema.names:
continue
values = [self._normalize_blob_to_bytes(v) for v in batch.column(field_name).to_pylist()]
converted_values = []
for value in values:
blob = Blob.from_bytes(value, self._table.file_io)
converted_values.append(blob.to_data() if blob else None)

column_idx = batch.schema.names.index(field_name)
batch = batch.set_column(
column_idx,
pyarrow.field(field_name, pyarrow.large_binary(), nullable=True),
pyarrow.array(converted_values, type=pyarrow.large_binary()),
)
return batch

# ------------------------------------------------------------------
# Utilities
# ------------------------------------------------------------------

@staticmethod
def _normalize_blob_to_bytes(value):
if value is None:
return None
if hasattr(value, 'as_py'):
value = value.as_py()
if isinstance(value, str):
value = value.encode('utf-8')
if isinstance(value, bytearray):
value = bytes(value)
return value

def close(self):
self._inner.close()
61 changes: 0 additions & 61 deletions paimon-python/pypaimon/read/reader/data_file_batch_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
from pypaimon.table.row.blob import Blob
from pypaimon.table.special_fields import SpecialFields


Expand All @@ -40,8 +39,6 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p
first_row_id: int,
row_tracking_enabled: bool,
system_fields: dict,
blob_as_descriptor: bool = False,
blob_descriptor_fields: Optional[set] = None,
file_io: Optional[FileIO] = None,
row_id_offsets: Optional[List[int]] = None):
self.format_reader = format_reader
Expand All @@ -55,19 +52,7 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p
self._row_id_cursor = 0
self.max_sequence_number = max_sequence_number
self.system_fields = system_fields
self.blob_as_descriptor = blob_as_descriptor
self.blob_descriptor_fields = blob_descriptor_fields or set()
self.file_io = file_io
self.blob_field_names = {
field.name
for field in fields
if hasattr(field.type, 'type') and field.type.type == 'BLOB'
}
self.descriptor_blob_fields = {
field_name
for field_name in self.blob_descriptor_fields
if field_name in self.blob_field_names
}

def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]:
if isinstance(self.format_reader, FormatBlobReader):
Expand Down Expand Up @@ -140,8 +125,6 @@ def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch
if self.row_tracking_enabled and self.system_fields:
record_batch = self._assign_row_tracking(record_batch)

record_batch = self._convert_descriptor_stored_blob_columns(record_batch)

return record_batch

def _align_batch_to_read_schema(self, names: List[str], arrays: list) -> RecordBatch:
Expand Down Expand Up @@ -170,50 +153,6 @@ def _align_batch_to_read_schema(self, names: List[str], arrays: list) -> RecordB
out_fields.append(target_field)
return pa.RecordBatch.from_arrays(out_arrays, schema=pa.schema(out_fields))

def _convert_descriptor_stored_blob_columns(self, record_batch: RecordBatch) -> RecordBatch:
if isinstance(self.format_reader, FormatBlobReader):
return record_batch
if not self.descriptor_blob_fields:
return record_batch

schema_names = set(record_batch.schema.names)
target_fields = [f for f in self.descriptor_blob_fields if f in schema_names]
if not target_fields:
return record_batch

arrays = list(record_batch.columns)
for field_name in target_fields:
field_idx = record_batch.schema.get_field_index(field_name)
values = record_batch.column(field_idx).to_pylist()

if self.blob_as_descriptor:
converted = [self._normalize_blob_cell(v) for v in values]
else:
converted = [self._blob_cell_to_data(v) for v in values]
arrays[field_idx] = pa.array(converted, type=pa.large_binary())

return pa.RecordBatch.from_arrays(arrays, schema=record_batch.schema)

@staticmethod
def _normalize_blob_cell(value):
if value is None:
return None
if hasattr(value, 'as_py'):
value = value.as_py()
if isinstance(value, str):
value = value.encode('utf-8')
if isinstance(value, bytearray):
value = bytes(value)
return value

def _blob_cell_to_data(self, value):
value = self._normalize_blob_cell(value)
if value is None:
return None
if not isinstance(value, bytes):
return value
return Blob.from_bytes(value, self.file_io).to_data()

def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch:
"""Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER)."""
arrays = list(record_batch.columns)
Expand Down
Loading
Loading