diff --git a/docs/content/append-table/blob.md b/docs/content/append-table/blob.md index cfae5d06059f..15341bbc9588 100644 --- a/docs/content/append-table/blob.md +++ b/docs/content/append-table/blob.md @@ -94,7 +94,19 @@ For details about the blob file format structure, see [File Format - BLOB]({{< r No false Boolean - When set to true, the blob field input is treated as a serialized BlobDescriptor. Paimon reads from the descriptor's URI and streams the data into Paimon's blob files in small chunks, avoiding loading the entire blob into memory. This is useful for writing very large blobs that cannot fit in memory. When reading, if set to true, returns the BlobDescriptor bytes; if false, returns actual blob bytes. + Controls read output format for blob fields. When set to true, queries return serialized BlobDescriptor bytes; when false, queries return actual blob bytes. This option is dynamic and can be changed with ALTER TABLE ... SET. + + +
blob.stored-descriptor-fields
+ No + (none) + String + + Comma-separated BLOB field names stored as serialized BlobDescriptor bytes inline in normal data files. + By default, all blob fields store blob bytes in separate .blob files. + If configured, one table can mix: + some BLOB fields in .blob files and some as descriptor references. +
blob.target-file-size
@@ -217,31 +229,18 @@ SELECT id, name FROM image_table; SELECT * FROM image_table WHERE id = 1; ``` -### Blob Descriptor Mode +### Blob Read Output Mode (`blob-as-descriptor`) -When you want to store references from external blob data (stored in object storage) without loading the entire blob into memory, you can use the `blob-as-descriptor` option: +`blob-as-descriptor` only controls how blob values are returned when reading. ```sql --- Create table in descriptor mode -CREATE TABLE blob_table ( - id INT, - name STRING, - image BYTES -) WITH ( - 'row-tracking.enabled' = 'true', - 'data-evolution.enabled' = 'true', - 'blob-field' = 'image', - 'blob-as-descriptor' = 'true' -); - --- Insert with serialized BlobDescriptor bytes --- The BlobDescriptor contains: version (1 byte) + uri_length (4 bytes) + uri_bytes + offset (8 bytes) + length (8 bytes) --- Paimon will read from the descriptor's URI and stream data into Paimon's blob files in small chunks, avoiding loading the entire blob into memory -INSERT INTO blob_table VALUES (1, 'photo', X''); +-- Return descriptor bytes +ALTER TABLE blob_table SET ('blob-as-descriptor' = 'true'); +SELECT image FROM blob_table; --- Toggle this setting to control read output format: +-- Return actual blob bytes ALTER TABLE blob_table SET ('blob-as-descriptor' = 'false'); -SELECT * FROM blob_table; -- Returns actual blob bytes from Paimon storage +SELECT image FROM blob_table; ``` ## Java API Usage @@ -389,7 +388,7 @@ public class BlobTableExample { } ``` -### Inserting Blob Data +### Construct blob from different sources ```java // From byte array (data already in memory) @@ -442,17 +441,13 @@ long offset = descriptor.offset(); // Starting position in the file long length = descriptor.length(); // Length of the blob data ``` -### Blob Descriptor Mode - -The `blob-as-descriptor` option enables **memory-efficient writing** for very large blobs. When enabled, you provide a `BlobDescriptor` pointing to external data, and Paimon streams the data from the external source into Paimon's `.blob` files without loading the entire blob into memory. +### Descriptor-Aware Write Behavior -**How it works:** -1. **Writing**: You provide a serialized `BlobDescriptor` (containing URI, offset, length) as the blob field value -2. **Paimon copies the data**: Paimon reads from the descriptor's URI in small chunks (e.g., 1024 bytes at a time) and writes to Paimon's `.blob` files -3. **Data is stored in Paimon**: The blob data IS copied to Paimon storage, but in a streaming fashion +Paimon write path is descriptor-aware automatically: -**Key benefit:** -- **Memory efficiency**: For very large blobs (e.g., gigabyte-sized videos), you don't need to load the entire file into memory. Paimon streams the data incrementally. +1. For blob fields stored in `.blob` files, input can be either blob bytes or a `BlobDescriptor`. +2. For fields configured in `blob.stored-descriptor-fields`, Paimon stores descriptor bytes inline in data files (no `.blob` files for those fields), and input must be a descriptor. +3. This behavior does not depend on `blob-as-descriptor`. ```java import org.apache.paimon.catalog.Catalog; @@ -484,21 +479,21 @@ public class BlobDescriptorExample { Catalog catalog = CatalogFactory.createCatalog(catalogContext); catalog.createDatabase("my_db", true); - // Create table with blob-as-descriptor enabled + // Create table: store "video" as descriptor bytes inline Schema schema = Schema.newBuilder() .column("id", DataTypes.INT()) .column("name", DataTypes.STRING()) .column("video", DataTypes.BLOB()) .option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true") .option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true") - .option(CoreOptions.BLOB_AS_DESCRIPTOR.key(), "true") // This is not necessary in java api + .option(CoreOptions.BLOB_STORED_DESCRIPTOR_FIELDS.key(), "video") .build(); Identifier tableId = Identifier.create("my_db", "video_table"); catalog.createTable(tableId, schema, true); Table table = catalog.getTable(tableId); - // Write large blob using descriptor (memory-efficient) + // Write blob using descriptor reference writeLargeBlobWithDescriptor(table); // Read blob data @@ -514,7 +509,7 @@ public class BlobDescriptorExample { // For a very large file (e.g., 2GB video), instead of loading into memory: // byte[] hugeVideo = Files.readAllBytes(...); // This would cause OutOfMemoryError! // - // Use BlobDescriptor to let Paimon stream the data: + // Create a descriptor reference to external blob String externalUri = "s3://my-bucket/videos/large_video.mp4"; long fileSize = 2L * 1024 * 1024 * 1024; // 2GB @@ -524,8 +519,6 @@ public class BlobDescriptorExample { UriReader uriReader = UriReader.fromFile(fileIO); Blob blob = Blob.fromDescriptor(uriReader, descriptor); - // Write the serialized descriptor as blob data - // Paimon will read from the URI and copy data to .blob files in chunks GenericRow row = GenericRow.of( 1, BinaryString.fromString("large_video"), @@ -535,7 +528,7 @@ public class BlobDescriptorExample { commit.commit(write.prepareCommit()); } - System.out.println("Successfully wrote large blob using descriptor mode"); + System.out.println("Successfully wrote large blob using descriptor reference"); } private static void readBlobData(Table table) throws Exception { @@ -548,20 +541,19 @@ public class BlobDescriptorExample { String name = row.getString(1).toString(); Blob blob = row.getBlob(2); - // The blob data is now stored in Paimon's .blob files - // blob.toDescriptor() returns a descriptor pointing to Paimon's internal storage + // Field is configured in blob.stored-descriptor-fields, so descriptor is stored inline BlobDescriptor descriptor = blob.toDescriptor(); System.out.println("Row " + id + ": " + name); - System.out.println(" Paimon blob URI: " + descriptor.uri()); + System.out.println(" Blob URI: " + descriptor.uri()); System.out.println(" Length: " + descriptor.length()); }); } } ``` -**Reading blob data with different modes:** +**Reading blob data with different output modes:** -The `blob-as-descriptor` option also affects how data is returned when reading: +The `blob-as-descriptor` option affects only read output: ```sql -- When blob-as-descriptor = true: Returns BlobDescriptor bytes (reference to Paimon blob file) @@ -573,13 +565,22 @@ ALTER TABLE video_table SET ('blob-as-descriptor' = 'false'); SELECT * FROM video_table; -- Returns actual blob bytes from Paimon storage ``` +### Blob storage mode: DESCRIPTOR ONLY + +If you want downstream tables to **reuse** upstream blob files (no copying and no new .blob files), configure the target blob field(s): + +```sql +'blob.stored-descriptor-fields' = 'image' +``` + +For these configured fields, Paimon stores only serialized BlobDescriptor bytes in normal data files. Reading the blob follows the descriptor URI to access bytes, and writing requires descriptor input for those fields. + ## Limitations -1. **Single Blob Field**: Currently, only one blob field per table is supported. -2. **Append Table Only**: Blob type is designed for append-only tables. Primary key tables are not supported. -3. **No Predicate Pushdown**: Blob columns cannot be used in filter predicates. -4. **No Statistics**: Statistics collection is not supported for blob columns. -5. **Required Options**: `row-tracking.enabled` and `data-evolution.enabled` must be set to `true`. +1. **Append Table Only**: Blob type is designed for append-only tables. Primary key tables are not supported. +2. **No Predicate Pushdown**: Blob columns cannot be used in filter predicates. +3. **No Statistics**: Statistics collection is not supported for blob columns. +4. **Required Options**: `row-tracking.enabled` and `data-evolution.enabled` must be set to `true`. ## Best Practices @@ -587,19 +588,6 @@ SELECT * FROM video_table; -- Returns actual blob bytes from Paimon storage 2. **Set Appropriate Target File Size**: Configure `blob.target-file-size` based on your blob sizes. Larger values mean fewer files but larger individual files. -3. **Consider Descriptor Mode**: For very large blobs that cannot fit in memory, use `blob-as-descriptor` mode to stream data from external sources into Paimon without loading the entire blob into memory. +3. **Use Descriptor Fields When Reusing External Blob Files**: Configure `blob.stored-descriptor-fields` for fields that should keep descriptor references instead of writing new `.blob` files. 4. **Use Partitioning**: Partition your blob tables by date or other dimensions to improve query performance and data management. - -```sql -CREATE TABLE partitioned_blob_table ( - id INT, - name STRING, - image BYTES, - dt STRING -) PARTITIONED BY (dt) WITH ( - 'row-tracking.enabled' = 'true', - 'data-evolution.enabled' = 'true', - 'blob-field' = 'image' -); -``` diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 21cd3f70554e..1bf883edd3b4 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -68,6 +68,12 @@ Boolean Whether to consider blob file size as a factor when performing scan splitting. + +
blob.stored-descriptor-fields
+ (none) + String + Comma-separated BLOB field names to store as serialized BlobDescriptor bytes inline in data files. +
blob.target-file-size
(none) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index ee3f8dd7e215..8b3d10270400 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2146,6 +2146,15 @@ public InlineElement getDescription() { "Specifies column names that should be stored as blob type. " + "This is used when you want to treat a BYTES column as a BLOB."); + @Immutable + public static final ConfigOption BLOB_STORED_DESCRIPTOR_FIELDS = + key("blob.stored-descriptor-fields") + .stringType() + .noDefaultValue() + .withDescription( + "Comma-separated BLOB field names to store as serialized BlobDescriptor " + + "bytes inline in data files."); + public static final ConfigOption BLOB_AS_DESCRIPTOR = key("blob-as-descriptor") .booleanType() @@ -2710,6 +2719,22 @@ public boolean blobSplitByFileSize() { .orElse(!options.get(BLOB_AS_DESCRIPTOR)); } + /** + * Resolve blob fields that should be stored as serialized descriptor bytes in data files. + * + *

If this option is not set, all blob fields are stored in '.blob' files by default. + */ + public Set blobStoredDescriptorFields() { + return options.getOptional(BLOB_STORED_DESCRIPTOR_FIELDS) + .map( + s -> + Arrays.stream(s.split(",")) + .map(String::trim) + .filter(str -> !str.isEmpty()) + .collect(Collectors.toSet())) + .orElse(Collections.emptySet()); + } + public long compactionFileSize(boolean hasPrimaryKey) { // file size to join the compaction, we don't process on middle file size to avoid // compact a same file twice (the compression is not calculate so accurately. the output diff --git a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java index 7f3fa010c42a..b5f3ad3cf8e3 100644 --- a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java +++ b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Set; /** * Data type of binary large object. @@ -67,13 +68,25 @@ public R accept(DataTypeVisitor visitor) { } public static Pair splitBlob(RowType rowType) { + return splitBlob(rowType, java.util.Collections.emptySet()); + } + + /** + * Split row fields into normal fields and blob-file fields. + * + *

Blob fields contained in {@code blobStoredDescriptorFields} are treated as normal fields + * (stored inline as serialized descriptor bytes), while other blob fields are treated as + * blob-file fields. + */ + public static Pair splitBlob( + RowType rowType, Set blobStoredDescriptorFields) { List fields = rowType.getFields(); List normalFields = new ArrayList<>(); List blobFields = new ArrayList<>(); for (DataField field : fields) { DataTypeRoot type = field.type().getTypeRoot(); - if (type == DataTypeRoot.BLOB) { + if (type == DataTypeRoot.BLOB && !blobStoredDescriptorFields.contains(field.name())) { blobFields.add(field); } else { normalFields.add(field); diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java index 3e60ea34867d..4e0ca0af6045 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java @@ -45,8 +45,8 @@ public class BlobDescriptor implements Serializable { private static final long serialVersionUID = 1L; - - private static final byte CURRENT_VERSION = 1; + private static final long MAGIC = 0x424C4F4244455343L; // "BLOBDESC" + private static final byte CURRENT_VERSION = 2; private final byte version; private final String uri; @@ -113,11 +113,12 @@ public byte[] serialize() { byte[] uriBytes = uri.getBytes(UTF_8); int uriLength = uriBytes.length; - int totalSize = 1 + 4 + uriLength + 8 + 8; + int totalSize = 1 + 8 + 4 + uriLength + 8 + 8; ByteBuffer buffer = ByteBuffer.allocate(totalSize); buffer.order(ByteOrder.LITTLE_ENDIAN); buffer.put(version); + buffer.putLong(MAGIC); buffer.putInt(uriLength); buffer.put(uriBytes); @@ -130,16 +131,26 @@ public byte[] serialize() { public static BlobDescriptor deserialize(byte[] bytes) { ByteBuffer buffer = ByteBuffer.wrap(bytes); buffer.order(ByteOrder.LITTLE_ENDIAN); - byte version = buffer.get(); - if (version != CURRENT_VERSION) { + if (version > CURRENT_VERSION) { throw new UnsupportedOperationException( - "Expecting BlobDescriptor version to be " + "Expecting BlobDescriptor version to be less than or equal to " + CURRENT_VERSION + ", but found " + version + "."); } + + if (version > 1) { + if (MAGIC != buffer.getLong()) { + throw new IllegalArgumentException( + "Invalid BlobDescriptor: missing magic header. Expected magic: " + + MAGIC + + ", but found: " + + buffer.getLong()); + } + } + int uriLength = buffer.getInt(); byte[] uriBytes = new byte[uriLength]; buffer.get(uriBytes); @@ -149,4 +160,26 @@ public static BlobDescriptor deserialize(byte[] bytes) { long length = buffer.getLong(); return new BlobDescriptor(version, uri, offset, length); } + + public static boolean isBlobDescriptor(byte[] bytes) { + if (bytes.length < 9) { + return false; + } + ByteBuffer buffer = ByteBuffer.wrap(bytes); + buffer.order(ByteOrder.LITTLE_ENDIAN); + + byte version = buffer.get(); + if (version == 1) { + try { + deserialize(bytes); + return true; + } catch (Exception e) { + return false; + } + } else if (version > CURRENT_VERSION) { + return false; + } else { + return MAGIC == buffer.getLong(); + } + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java index d7423608af45..a3b2db470176 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java @@ -20,7 +20,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.Blob; -import org.apache.paimon.data.BlobData; +import org.apache.paimon.data.BlobDescriptor; import org.apache.paimon.data.DataSetters; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.InternalArray; @@ -30,6 +30,7 @@ import org.apache.paimon.data.Timestamp; import org.apache.paimon.data.variant.Variant; import org.apache.paimon.types.RowKind; +import org.apache.paimon.utils.UriReader; import java.io.Serializable; @@ -43,16 +44,27 @@ public final class ColumnarRow implements InternalRow, DataSetters, Serializable private RowKind rowKind = RowKind.INSERT; private VectorizedColumnBatch vectorizedColumnBatch; + private UriReader uriReader; private int rowId; public ColumnarRow() {} public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch) { - this(vectorizedColumnBatch, 0); + this(vectorizedColumnBatch, null, 0); } public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch, int rowId) { + this(vectorizedColumnBatch, null, rowId); + } + + public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch, UriReader uriReader) { + this(vectorizedColumnBatch, uriReader, 0); + } + + public ColumnarRow( + VectorizedColumnBatch vectorizedColumnBatch, UriReader uriReader, int rowId) { this.vectorizedColumnBatch = vectorizedColumnBatch; + this.uriReader = uriReader; this.rowId = rowId; } @@ -61,6 +73,10 @@ public void setVectorizedColumnBatch(VectorizedColumnBatch vectorizedColumnBatch this.rowId = 0; } + public void setUriReader(UriReader uriReader) { + this.uriReader = uriReader; + } + public VectorizedColumnBatch batch() { return vectorizedColumnBatch; } @@ -151,7 +167,17 @@ public Variant getVariant(int pos) { @Override public Blob getBlob(int pos) { - return new BlobData(getBinary(pos)); + byte[] bytes = getBinary(pos); + if (bytes == null) { + return null; + } + if (uriReader == null) { + throw new IllegalStateException("UriReader is null, cannot read blob data from uri!"); + } + + // Only blob descriptor could be able to stored in columnar format. + BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes); + return Blob.fromDescriptor(uriReader, blobDescriptor); } @Override @@ -238,7 +264,7 @@ public int hashCode() { public ColumnarRow copy(ColumnVector[] vectors) { VectorizedColumnBatch vectorizedColumnBatchCopy = vectorizedColumnBatch.copy(vectors); - ColumnarRow columnarRow = new ColumnarRow(vectorizedColumnBatchCopy, rowId); + ColumnarRow columnarRow = new ColumnarRow(vectorizedColumnBatchCopy, uriReader, rowId); columnarRow.setRowKind(rowKind); return columnarRow; } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java index 92e3b8e6a842..6e21aee2d236 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java @@ -25,13 +25,14 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.io.Serializable; import java.net.URI; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; /** A factory to create and cache {@link UriReader}. */ -public class UriReaderFactory { +public class UriReaderFactory implements Serializable { private final CatalogContext context; private final Map readers; diff --git a/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java b/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java index ef860e8c55a7..eaf9d3dea88b 100644 --- a/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java @@ -18,6 +18,8 @@ package org.apache.paimon.data; +import org.apache.paimon.utils.IOUtils; + import org.junit.jupiter.api.Test; import java.lang.reflect.Constructor; @@ -38,7 +40,7 @@ public void testEquals() throws Exception { BlobDescriptor descriptor3 = new BlobDescriptor(uri2, 100L, 200L); BlobDescriptor descriptor4 = new BlobDescriptor(uri1, 150L, 200L); BlobDescriptor descriptor5 = new BlobDescriptor(uri1, 100L, 250L); - BlobDescriptor descriptor6 = createDescriptorWithVersion((byte) 2, uri1, 100L, 200L); + BlobDescriptor descriptor6 = createDescriptorWithVersion((byte) 3, uri1, 100L, 200L); assertThat(descriptor1).isEqualTo(descriptor2); assertThat(descriptor1).isNotEqualTo(descriptor3); assertThat(descriptor1).isNotEqualTo(descriptor4); @@ -64,7 +66,7 @@ public void testToString() { BlobDescriptor descriptor = new BlobDescriptor(uri, 100L, 200L); String toString = descriptor.toString(); - assertThat(toString).contains("version=1"); + assertThat(toString).contains("version=2"); assertThat(toString).contains("uri='/test/path'"); assertThat(toString).contains("offset=100"); assertThat(toString).contains("length=200"); @@ -90,10 +92,26 @@ public void testSerializeAndDeserialize() { public void testDeserializeWithUnsupportedVersion() { String uri = "/test/path"; byte[] serialized = new BlobDescriptor(uri, 1, 1).serialize(); - serialized[0] = 2; + serialized[0] = 3; assertThatThrownBy(() -> BlobDescriptor.deserialize(serialized)) .isInstanceOf(UnsupportedOperationException.class) - .hasMessageContaining("Expecting BlobDescriptor version to be 1, but found 2."); + .hasMessageContaining( + "Expecting BlobDescriptor version to be less than or equal to 2, but found 3."); + } + + @Test + public void testBlobVersionCompatible() throws Exception { + byte[] serialized = + IOUtils.readFully( + BlobDescriptorTest.class + .getClassLoader() + .getResourceAsStream("compatible/blob_descriptor_v1"), + true); + + BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(serialized); + assertThat(blobDescriptor.uri()).isEqualTo("/test/path"); + assertThat(blobDescriptor.offset()).isEqualTo(100L); + assertThat(blobDescriptor.length()).isEqualTo(200L); } private BlobDescriptor createDescriptorWithVersion( diff --git a/paimon-common/src/test/resources/compatible/blob_descriptor_v1 b/paimon-common/src/test/resources/compatible/blob_descriptor_v1 new file mode 100644 index 000000000000..9e3412deac81 Binary files /dev/null and b/paimon-common/src/test/resources/compatible/blob_descriptor_v1 differ diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml index a3bf3ccf0ec8..8f0a490b8c15 100644 --- a/paimon-core/pom.xml +++ b/paimon-core/pom.xml @@ -76,6 +76,10 @@ under the License. jdk.tools jdk.tools + + com.google.protobuf + protobuf-java + diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index cd7acb4a3390..abc2f2b4a112 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -41,6 +41,7 @@ import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.options.MemorySize; import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.types.BlobType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BatchRecordWriter; import org.apache.paimon.utils.CommitIncrement; @@ -59,11 +60,10 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; -import static org.apache.paimon.types.DataTypeRoot.BLOB; - /** * A {@link RecordWriter} implementation that only accepts records which are always insert * operations and don't have any unique keys or sort keys. @@ -95,6 +95,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner { private final FileIndexOptions fileIndexOptions; private final MemorySize maxDiskSize; @Nullable private final BlobConsumer blobConsumer; + private final Set blobStoredDescriptorFields; @Nullable private CompactDeletionFile compactDeletionFile; private SinkWriter sinkWriter; @@ -125,6 +126,7 @@ public AppendOnlyWriter( boolean asyncFileWrite, boolean statsDenseStore, @Nullable BlobConsumer blobConsumer, + Set blobStoredDescriptorFields, boolean dataEvolutionEnabled) { this.fileIO = fileIO; this.schemaId = schemaId; @@ -153,6 +155,7 @@ public AppendOnlyWriter( this.statsCollectorFactories = statsCollectorFactories; this.maxDiskSize = maxDiskSize; this.fileIndexOptions = fileIndexOptions; + this.blobStoredDescriptorFields = blobStoredDescriptorFields; this.sinkWriter = useWriteBuffer @@ -304,7 +307,8 @@ public void toBufferedWriter() throws Exception { } private RollingFileWriter createRollingRowWriter() { - if (writeSchema.getFieldTypes().stream().anyMatch(t -> t.is(BLOB))) { + if (BlobType.splitBlob(writeSchema, blobStoredDescriptorFields).getRight().getFieldCount() + > 0) { return new RollingBlobFileWriter( fileIO, schemaId, @@ -324,7 +328,8 @@ private RollingFileWriter createRollingRowWriter() { // benefit from async write, but cost a lot. false, statsDenseStore, - blobConsumer); + blobConsumer, + blobStoredDescriptorFields); } return new RowDataRollingFileWriter( fileIO, diff --git a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java index b55bdd8ef09a..d19c4cc21914 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.function.Supplier; import static java.util.Collections.singletonList; @@ -61,8 +62,10 @@ public MultipleBlobFileWriter( boolean asyncFileWrite, boolean statsDenseStore, long targetFileSize, - @Nullable BlobConsumer blobConsumer) { - RowType blobRowType = BlobType.splitBlob(writeSchema).getRight(); + @Nullable BlobConsumer blobConsumer, + Set blobStoredDescriptorFields) { + RowType blobRowType = + BlobType.splitBlob(writeSchema, blobStoredDescriptorFields).getRight(); this.blobWriters = new ArrayList<>(); for (String blobFieldName : blobRowType.getFieldNames()) { BlobFileFormat blobFileFormat = new BlobFileFormat(); diff --git a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java index 80faa01d9238..590b19b23b98 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java @@ -48,6 +48,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Supplier; /** @@ -83,6 +84,7 @@ public class RollingBlobFileWriter implements RollingFileWriter blobWriterFactory; private final long targetFileSize; + private final Set blobStoredDescriptorFields; // State management private final List closedWriters; @@ -109,11 +111,13 @@ public RollingBlobFileWriter( FileSource fileSource, boolean asyncFileWrite, boolean statsDenseStore, - @Nullable BlobConsumer blobConsumer) { + @Nullable BlobConsumer blobConsumer, + Set blobStoredDescriptorFields) { // Initialize basic fields this.targetFileSize = targetFileSize; this.results = new ArrayList<>(); this.closedWriters = new ArrayList<>(); + this.blobStoredDescriptorFields = blobStoredDescriptorFields; // Initialize writer factory for normal data this.writerFactory = @@ -121,7 +125,7 @@ public RollingBlobFileWriter( fileIO, schemaId, fileFormat, - BlobType.splitBlob(writeSchema).getLeft(), + BlobType.splitBlob(writeSchema, blobStoredDescriptorFields).getLeft(), writeSchema, pathFactory, seqNumCounterSupplier, @@ -145,7 +149,8 @@ public RollingBlobFileWriter( asyncFileWrite, statsDenseStore, blobTargetFileSize, - blobConsumer); + blobConsumer, + this.blobStoredDescriptorFields); } /** Creates a factory for normal data writers. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java index bc39a4ae8e76..ae1405e3de62 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java @@ -58,6 +58,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Function; import java.util.function.Supplier; @@ -85,6 +86,7 @@ public abstract class BaseAppendFileStoreWrite extends MemoryFileStoreWrite blobStoredDescriptorFields; public BaseAppendFileStoreWrite( FileIO fileIO, @@ -108,6 +110,7 @@ public BaseAppendFileStoreWrite( this.fileFormat = fileFormat(options); this.pathFactory = pathFactory; this.withBlob = rowType.getFieldTypes().stream().anyMatch(t -> t.is(BLOB)); + this.blobStoredDescriptorFields = options.blobStoredDescriptorFields(); this.fileIndexOptions = options.indexColumnsOptions(); } @@ -153,6 +156,7 @@ protected RecordWriter createWriter( options.asyncFileWrite(), options.statsDenseStore(), blobConsumer, + blobStoredDescriptorFields, options.dataEvolutionEnabled()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index f4cb16a018ce..6f7eb60c664b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -34,6 +34,7 @@ import org.apache.paimon.types.BlobType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.IntType; import org.apache.paimon.types.LocalZonedTimestampType; import org.apache.paimon.types.MapType; @@ -47,9 +48,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BUCKET_KEY; @@ -156,7 +159,11 @@ public static void validateTableSchema(TableSchema schema) { FileFormat fileFormat = FileFormat.fromIdentifier(options.formatType(), new Options(schema.options())); - fileFormat.validateDataFields(BlobType.splitBlob(new RowType(schema.fields())).getLeft()); + RowType tableRowType = new RowType(schema.fields()); + Set blobStoredDescriptorFields = + validateBlobStoredDescriptorFields(tableRowType, options); + fileFormat.validateDataFields( + BlobType.splitBlob(tableRowType, blobStoredDescriptorFields).getLeft()); // Check column names in schema schema.fieldNames() @@ -614,11 +621,29 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options) normalAndBlobType.getLeft().getFieldCount() > 0, "Table with BLOB type column must have other normal columns."); checkArgument( - !schema.partitionKeys().contains(blobNames.get(0)), + blobNames.stream().noneMatch(schema.partitionKeys()::contains), "The BLOB type column can not be part of partition keys."); } } + private static Set validateBlobStoredDescriptorFields( + RowType rowType, CoreOptions options) { + Set blobFieldNames = + rowType.getFields().stream() + .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB) + .map(DataField::name) + .collect(Collectors.toCollection(HashSet::new)); + Set configured = options.blobStoredDescriptorFields(); + for (String field : configured) { + checkArgument( + blobFieldNames.contains(field), + "Field '%s' in '%s' must be a BLOB field in table schema.", + field, + CoreOptions.BLOB_STORED_DESCRIPTOR_FIELDS.key()); + } + return configured; + } + private static void validateIncrementalClustering(TableSchema schema, CoreOptions options) { if (options.clusteringIncrementalEnabled()) { checkArgument( diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index b3470cfb4522..8f159cb33587 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -713,6 +713,7 @@ private Pair> createWriter( true, false, null, + options.blobStoredDescriptorFields(), options.dataEvolutionEnabled()); writer.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java index 3045f33b1664..a0b3ec757641 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java @@ -47,6 +47,7 @@ import javax.annotation.Nonnull; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -55,6 +56,7 @@ import java.util.stream.Collectors; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; /** Tests for table with blob. */ public class BlobTableTest extends TableTestBase { @@ -233,6 +235,144 @@ public void testRolling() throws Exception { assertThat(integer.get()).isEqualTo(1025); } + @Test + public void testDescriptorStorageModeReuseUpstreamBlobWithoutCopy() throws Exception { + createDescriptorTable(); + FileStoreTable table = getTableDefault(); + + // prepare an "upstream blob file" (any file) and write blob bytes into it + Path external = new Path(tempPath.resolve("upstream-blob.bin").toString()); + writeFile(table.fileIO(), external, blobBytes); + + BlobDescriptor descriptor = new BlobDescriptor(external.toString(), 0, blobBytes.length); + UriReader uriReader = UriReader.fromFile(table.fileIO()); + Blob blobRef = Blob.fromDescriptor(uriReader, descriptor); + + writeDataDefault( + Collections.singletonList( + GenericRow.of(1, BinaryString.fromString("nice"), blobRef))); + + readDefault( + row -> { + assertThat(row.getBlob(2).toDescriptor()).isEqualTo(descriptor); + assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes); + }); + + long blobFiles = countFilesWithSuffix(table.fileIO(), table.location(), ".blob"); + assertThat(blobFiles).isEqualTo(0); + } + + @Test + public void testDescriptorStorageModeRejectsNonDescriptorInput() throws Exception { + createDescriptorTable(); + + assertThatThrownBy( + () -> + writeDataDefault( + Collections.singletonList( + GenericRow.of( + 1, + BinaryString.fromString("bad"), + new BlobData(blobBytes))))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("blob.stored-descriptor-fields"); + } + + @Test + public void testMixedBlobStorageModeByFields() throws Exception { + createMixedModeTable(); + FileStoreTable table = getTableDefault(); + + byte[] descriptorBytes = randomBytes(); + Path external = new Path(tempPath.resolve("upstream-mixed-blob.bin").toString()); + writeFile(table.fileIO(), external, descriptorBytes); + + BlobDescriptor descriptor = + new BlobDescriptor(external.toString(), 0, descriptorBytes.length); + UriReader uriReader = UriReader.fromFile(table.fileIO()); + Blob blobRef = Blob.fromDescriptor(uriReader, descriptor); + + writeDataDefault( + Collections.singletonList( + GenericRow.of( + 1, + BinaryString.fromString("mixed"), + new BlobData(blobBytes), + blobRef))); + + readDefault( + row -> { + assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes); + assertThat(row.getBlob(3).toDescriptor()).isEqualTo(descriptor); + assertThat(row.getBlob(3).toData()).isEqualTo(descriptorBytes); + }); + + long blobFiles = countFilesWithSuffix(table.fileIO(), table.location(), ".blob"); + assertThat(blobFiles).isEqualTo(1); + } + + @Test + public void testMixedBlobStorageModeRejectsNonDescriptorInput() throws Exception { + createMixedModeTable(); + + assertThatThrownBy( + () -> + writeDataDefault( + Collections.singletonList( + GenericRow.of( + 1, + BinaryString.fromString("bad"), + new BlobData(blobBytes), + new BlobData(randomBytes()))))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("blob.stored-descriptor-fields"); + } + + private void createDescriptorTable() throws Exception { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.BLOB()); + schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.BLOB_STORED_DESCRIPTOR_FIELDS.key(), "f2"); + catalog.createTable(identifier(), schemaBuilder.build(), true); + } + + private void createMixedModeTable() throws Exception { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.STRING()); + schemaBuilder.column("f2", DataTypes.BLOB()); + schemaBuilder.column("f3", DataTypes.BLOB()); + schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB"); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.BLOB_STORED_DESCRIPTOR_FIELDS.key(), "f3"); + catalog.createTable(identifier(), schemaBuilder.build(), true); + } + + private static void writeFile(FileIO fileIO, Path path, byte[] bytes) throws IOException { + try (org.apache.paimon.fs.PositionOutputStream out = fileIO.newOutputStream(path, true)) { + out.write(bytes); + } + } + + private static long countFilesWithSuffix(FileIO fileIO, Path root, String suffix) + throws IOException { + long count = 0; + org.apache.paimon.fs.RemoteIterator it = + fileIO.listFilesIterative(root, true); + while (it.hasNext()) { + org.apache.paimon.fs.FileStatus status = it.next(); + if (status.getPath().getName().endsWith(suffix)) { + count++; + } + } + return count; + } + protected Schema schemaDefault() { Schema.Builder schemaBuilder = Schema.newBuilder(); schemaBuilder.column("f0", DataTypes.INT()); diff --git a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java index 0afe95eef066..3ada89c777f6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java @@ -43,6 +43,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Random; @@ -106,7 +107,8 @@ public void setUp() throws IOException { FileSource.APPEND, false, // asyncFileWrite false, // statsDenseStore - null); + null, + Collections.emptySet()); } @Test @@ -206,7 +208,8 @@ public void testBlobTargetFileSize() throws IOException { FileSource.APPEND, false, // asyncFileWrite false, // statsDenseStore - null); + null, + Collections.emptySet()); // Create large blob data that will exceed the blob target file size byte[] largeBlobData = new byte[3 * 1024 * 1024]; // 3 MB blob data @@ -283,7 +286,8 @@ void testBlobFileNameFormatWithSharedUuid() throws IOException { FileSource.APPEND, false, // asyncFileWrite false, // statsDenseStore - null); + null, + Collections.emptySet()); // Create blob data that will trigger rolling byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data @@ -362,9 +366,10 @@ void testBlobFileNameFormatWithSharedUuidNonDescriptorMode() throws IOException FileSource.APPEND, false, // asyncFileWrite false, // statsDenseStore - null); + null, + Collections.emptySet()); - // Create blob data that will trigger rolling (non-descriptor mode: direct blob data) + // Create blob data that will trigger rolling byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data new Random(789).nextBytes(blobData); @@ -420,8 +425,8 @@ void testBlobFileNameFormatWithSharedUuidNonDescriptorMode() throws IOException } @Test - void testSequenceNumberIncrementInBlobAsDescriptorMode() throws IOException { - // Write multiple rows to trigger one-by-one writing in blob-as-descriptor mode + void testSequenceNumberIncrementInBlobWritePath() throws IOException { + // Write multiple rows and verify sequence-number continuity in blob files int numRows = 10; for (int i = 0; i < numRows; i++) { InternalRow row = @@ -487,9 +492,8 @@ void testSequenceNumberIncrementInBlobAsDescriptorMode() throws IOException { } @Test - void testSequenceNumberIncrementInNonDescriptorMode() throws IOException { - // Write multiple rows as a batch to trigger batch writing in non-descriptor mode - // (blob-as-descriptor=false, which is the default) + void testSequenceNumberIncrementInBlobWritePathBatch() throws IOException { + // Write multiple rows as a batch and verify sequence-number continuity in blob files int numRows = 10; for (int i = 0; i < numRows; i++) { InternalRow row = @@ -580,7 +584,8 @@ void testBlobStatsSchemaWithCustomColumnName() throws IOException { FileSource.APPEND, false, // asyncFileWrite false, // statsDenseStore - null); + null, + Collections.emptySet()); // Write data for (int i = 0; i < 3; i++) { diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 1edf91e1d22d..1f1b0992e968 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -292,6 +292,7 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception true, false, null, + options.blobStoredDescriptorFields(), options.dataEvolutionEnabled()); appendOnlyWriter.setMemoryPool( new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize())); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java index 3201f594fe59..ad2132e8c1eb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java @@ -47,19 +47,14 @@ public class FlinkRowWrapper implements InternalRow { private final org.apache.flink.table.data.RowData row; - private final boolean blobAsDescriptor; private final UriReaderFactory uriReaderFactory; public FlinkRowWrapper(org.apache.flink.table.data.RowData row) { - this(row, false, null); + this(row, null); } - public FlinkRowWrapper( - org.apache.flink.table.data.RowData row, - boolean blobAsDescriptor, - CatalogContext catalogContext) { + public FlinkRowWrapper(org.apache.flink.table.data.RowData row, CatalogContext catalogContext) { this.row = row; - this.blobAsDescriptor = blobAsDescriptor; this.uriReaderFactory = new UriReaderFactory(catalogContext); } @@ -147,12 +142,14 @@ public Variant getVariant(int pos) { @Override public Blob getBlob(int pos) { - if (blobAsDescriptor) { - BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(row.getBinary(pos)); + byte[] bytes = row.getBinary(pos); + boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes); + if (blobDes) { + BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes); UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri()); return Blob.fromDescriptor(uriReader, blobDescriptor); } else { - return new BlobData(row.getBinary(pos)); + return new BlobData(bytes); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index a58e8a393da4..7ed5e5d09503 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -346,13 +346,11 @@ protected boolean buildForPostponeBucketCompaction( partitionSpec, options.get(FlinkConnectorOptions.SCAN_PARALLELISM)); - boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor(); DataStream partitioned = FlinkStreamPartitioner.partition( FlinkSinkBuilder.mapToInternalRow( sourcePair.getLeft(), realTable.rowType(), - blobAsDescriptor, table.catalogEnvironment().catalogContext()), new RowDataChannelComputer(realTable.schema()), null); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java index d1e3d11e6225..ee3b68f609a2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java @@ -157,13 +157,11 @@ protected List> buildCompactOperator( // 2.3 write and then reorganize the committable // set parallelism to null, and it'll forward parallelism when doWrite() RowAppendTableSink sink = new RowAppendTableSink(table, null, null); - boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor(); DataStream written = sink.doWrite( FlinkSinkBuilder.mapToInternalRow( sorted, table.rowType(), - blobAsDescriptor, table.catalogEnvironment().catalogContext()), commitUser, null); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java index 0a4dc174539f..87d12924533a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java @@ -207,15 +207,13 @@ public FlinkSinkBuilder clusteringIfPossible( public DataStreamSink build() { setParallelismIfAdaptiveConflict(); input = trySortInput(input); - boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor(); CatalogContext contextForDescriptor = BlobDescriptorUtils.getCatalogContext( table.catalogEnvironment().catalogContext(), table.coreOptions().toConfiguration()); DataStream input = - mapToInternalRow( - this.input, table.rowType(), blobAsDescriptor, contextForDescriptor); + mapToInternalRow(this.input, table.rowType(), contextForDescriptor); if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) { SingleOutputStreamOperator newInput = input.forward() @@ -247,14 +245,11 @@ public DataStreamSink build() { public static DataStream mapToInternalRow( DataStream input, org.apache.paimon.types.RowType rowType, - boolean blobAsDescriptor, CatalogContext catalogContext) { SingleOutputStreamOperator result = input.map( (MapFunction) - r -> - new FlinkRowWrapper( - r, blobAsDescriptor, catalogContext)) + r -> new FlinkRowWrapper(r, catalogContext)) .returns( org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType( rowType)); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java index a06ca9948c44..6e09dce05033 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java @@ -28,6 +28,7 @@ import org.apache.paimon.utils.IteratorResultIterator; import org.apache.paimon.utils.IteratorWithException; import org.apache.paimon.utils.Pool; +import org.apache.paimon.utils.UriReader; import org.apache.avro.AvroRuntimeException; import org.apache.avro.file.DataFileReader; @@ -77,7 +78,9 @@ private AvroReader(FileIO fileIO, Path path, long fileSize) throws IOException { private DataFileReader createReaderFromPath(Path path, long fileSize) throws IOException { - DatumReader datumReader = new AvroRowDatumReader(projectedRowType); + UriReader uriReader = UriReader.fromFile(fileIO); + DatumReader datumReader = + new AvroRowDatumReader(projectedRowType, uriReader); SeekableInput in = new SeekableInputStreamWrapper(fileIO.newInputStream(path), fileSize); try { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java index 23da782471b0..c62777bf800e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.format.avro.FieldReaderFactory.RowReader; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.UriReader; import org.apache.avro.Schema; import org.apache.avro.io.DatumReader; @@ -32,12 +33,18 @@ public class AvroRowDatumReader implements DatumReader { private final RowType projectedRowType; + private final UriReader uriReader; private RowReader reader; private boolean isUnion; public AvroRowDatumReader(RowType projectedRowType) { + this(projectedRowType, null); + } + + public AvroRowDatumReader(RowType projectedRowType, UriReader uriReader) { this.projectedRowType = projectedRowType; + this.uriReader = uriReader; } @Override @@ -48,7 +55,8 @@ public void setSchema(Schema schema) { schema = schema.getTypes().get(1); } this.reader = - new FieldReaderFactory().createRowReader(schema, projectedRowType.getFields()); + new FieldReaderFactory(uriReader) + .createRowReader(schema, projectedRowType.getFields()); } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java index 5497751fcb93..e586a4ad8617 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java @@ -104,6 +104,7 @@ public static Schema convertToSchema( return nullable ? nullableSchema(str) : str; case BINARY: case VARBINARY: + case BLOB: Schema binary = SchemaBuilder.builder().bytesType(); return nullable ? nullableSchema(binary) : binary; case TIMESTAMP_WITHOUT_TIME_ZONE: diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java index d95ef309219b..02d77b445dea 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java @@ -19,6 +19,8 @@ package org.apache.paimon.format.avro; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobDescriptor; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericArray; import org.apache.paimon.data.GenericMap; @@ -29,6 +31,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.UriReader; import org.apache.avro.AvroRuntimeException; import org.apache.avro.Schema; @@ -51,6 +54,16 @@ /** Factory to create {@link FieldReader}. */ public class FieldReaderFactory implements AvroSchemaVisitor { + @Nullable private final UriReader uriReader; + + public FieldReaderFactory() { + this(null); + } + + public FieldReaderFactory(@Nullable UriReader uriReader) { + this.uriReader = uriReader; + } + private static final FieldReader STRING_READER = new StringReader(); private static final FieldReader BYTES_READER = new BytesReader(); @@ -73,6 +86,16 @@ public class FieldReaderFactory implements AvroSchemaVisitor { private static final FieldReader TIMESTAMP_MICROS_READER = new TimestampMicrosReader(); + @Override + public FieldReader primitive(Schema primitive, DataType type) { + if (primitive.getType() == Schema.Type.BYTES + && type != null + && type.getTypeRoot() == DataTypeRoot.BLOB) { + return new BlobDescriptorBytesReader(uriReader); + } + return AvroSchemaVisitor.super.primitive(primitive, type); + } + @Override public FieldReader visitUnion(Schema schema, @Nullable DataType type) { return new NullableReader(visit(schema.getTypes().get(1), type)); @@ -230,6 +253,31 @@ public void skip(Decoder decoder) throws IOException { } } + private static class BlobDescriptorBytesReader implements FieldReader { + + private final UriReader uriReader; + + private BlobDescriptorBytesReader(UriReader uriReader) { + if (uriReader == null) { + throw new IllegalArgumentException( + "UriReader must not be null for BlobDescriptorBytesReader."); + } + this.uriReader = uriReader; + } + + @Override + public Object read(Decoder decoder, Object reuse) throws IOException { + byte[] bytes = decoder.readBytes(null).array(); + BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes); + return Blob.fromDescriptor(uriReader, blobDescriptor); + } + + @Override + public void skip(Decoder decoder) throws IOException { + decoder.skipBytes(); + } + } + private static class BooleanReader implements FieldReader { @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java index 68d0ac7ad135..521601706b69 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java @@ -19,6 +19,8 @@ package org.apache.paimon.format.avro; import org.apache.paimon.CoreOptions; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobDescriptor; import org.apache.paimon.data.DataGetters; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; @@ -27,6 +29,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; import org.apache.paimon.types.RowType; import org.apache.avro.AvroRuntimeException; @@ -70,6 +73,35 @@ public class FieldWriterFactory implements AvroSchemaVisitor { private static final FieldWriter DOUBLE_WRITER = (container, i, encoder) -> encoder.writeDouble(container.getDouble(i)); + private static final FieldWriter BLOB_DESCRIPTOR_BYTES_WRITER = + (container, i, encoder) -> { + Blob blob = container.getBlob(i); + if (blob == null) { + // Nullable handling is done by NullableWriter for UNION schemas. + // For required bytes, writing null is a bug. + throw new IllegalArgumentException("Null blob is not allowed."); + } + try { + BlobDescriptor descriptor = blob.toDescriptor(); + encoder.writeBytes(descriptor.serialize()); + } catch (Throwable t) { + throw new IllegalArgumentException( + "blob.stored-descriptor-fields requires blob field value to be a " + + "serialized BlobDescriptor (magic 'BLOBDESC').", + t); + } + }; + + @Override + public FieldWriter primitive(Schema primitive, DataType type) { + if (primitive.getType() == Schema.Type.BYTES + && type != null + && type.getTypeRoot() == DataTypeRoot.BLOB) { + return BLOB_DESCRIPTOR_BYTES_WRITER; + } + return AvroSchemaVisitor.super.primitive(primitive, type); + } + @Override public FieldWriter visitUnion(Schema schema, DataType type) { return new NullableWriter(visit(schema.getTypes().get(1), type)); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index a94eaf2356d8..93c29f67b54e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -38,6 +38,7 @@ import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Pool; import org.apache.paimon.utils.RoaringBitmap32; +import org.apache.paimon.utils.UriReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -101,7 +102,9 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context) context instanceof OrcFormatReaderContext ? ((OrcFormatReaderContext) context).poolSize() : 1; - Pool poolOfBatches = createPoolOfBatches(context.filePath(), poolSize); + UriReader uriReader = UriReader.fromFile(context.fileIO()); + Pool poolOfBatches = + createPoolOfBatches(context.filePath(), poolSize, uriReader); RecordReader orcReader = createRecordReader( @@ -123,7 +126,10 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context) * conversion from the ORC representation to the result format. */ public OrcReaderBatch createReaderBatch( - Path filePath, VectorizedRowBatch orcBatch, Pool.Recycler recycler) { + Path filePath, + VectorizedRowBatch orcBatch, + Pool.Recycler recycler, + UriReader uriReader) { List tableFieldNames = tableType.getFieldNames(); List tableFieldTypes = tableType.getFieldTypes(); @@ -139,17 +145,20 @@ public OrcReaderBatch createReaderBatch( type, legacyTimestampLtzType); } - return new OrcReaderBatch(filePath, orcBatch, new VectorizedColumnBatch(vectors), recycler); + return new OrcReaderBatch( + filePath, orcBatch, new VectorizedColumnBatch(vectors), recycler, uriReader); } // ------------------------------------------------------------------------ - private Pool createPoolOfBatches(Path filePath, int numBatches) { + private Pool createPoolOfBatches( + Path filePath, int numBatches, UriReader uriReader) { final Pool pool = new Pool<>(numBatches); for (int i = 0; i < numBatches; i++) { final VectorizedRowBatch orcBatch = createBatchWrapper(schema, batchSize / numBatches); - final OrcReaderBatch batch = createReaderBatch(filePath, orcBatch, pool.recycler()); + final OrcReaderBatch batch = + createReaderBatch(filePath, orcBatch, pool.recycler(), uriReader); pool.add(batch); } @@ -170,13 +179,14 @@ protected OrcReaderBatch( final Path filePath, final VectorizedRowBatch orcVectorizedRowBatch, final VectorizedColumnBatch paimonColumnBatch, - final Pool.Recycler recycler) { + final Pool.Recycler recycler, + final UriReader uriReader) { this.orcVectorizedRowBatch = checkNotNull(orcVectorizedRowBatch); this.recycler = checkNotNull(recycler); this.paimonColumnBatch = paimonColumnBatch; this.result = new VectorizedRowIterator( - filePath, new ColumnarRow(paimonColumnBatch), this::recycle); + filePath, new ColumnarRow(paimonColumnBatch, uriReader), this::recycle); } /** diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java index f7d3d626d44f..4b80827d1bb3 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java @@ -67,6 +67,9 @@ static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) { case BOOLEAN: return TypeDescription.createBoolean() .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); + case BLOB: + return TypeDescription.createBinary() + .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId)); case VARBINARY: if (type.equals(DataTypes.BYTES())) { return TypeDescription.createBinary() diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java index 762037deb215..9022735e2cac 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java @@ -18,6 +18,8 @@ package org.apache.paimon.format.orc.writer; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobDescriptor; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; @@ -245,7 +247,21 @@ public FieldWriter visit(VariantType variantType) { @Override public FieldWriter visit(BlobType blobType) { - throw new UnsupportedOperationException("Unsupported type: " + blobType); + return (rowId, column, getters, columnId) -> { + BytesColumnVector vector = (BytesColumnVector) column; + Blob blob = getters.getBlob(columnId); + try { + BlobDescriptor descriptor = blob.toDescriptor(); + byte[] bytes = descriptor.serialize(); + vector.setVal(rowId, bytes, 0, bytes.length); + return bytes.length; + } catch (Throwable t) { + throw new IllegalArgumentException( + "blob.stored-descriptor-fields requires blob field value to be a " + + "serialized BlobDescriptor (magic 'BLOBDESC').", + t); + } + }; } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 29f22e1de66b..eabdc2ac0da2 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -36,6 +36,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.UriReader; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.filter2.compat.FilterCompat; @@ -118,8 +119,15 @@ public FileRecordReader createReader(FormatReaderFactory.Context co MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(requestedSchema); List fields = buildFieldsList(readFields, columnIO, requestedSchema); + UriReader uriReader = UriReader.fromFile(context.fileIO()); return new VectorizedParquetRecordReader( - context.filePath(), reader, fileSchema, fields, writableVectors, batchSize); + context.filePath(), + reader, + fileSchema, + fields, + writableVectors, + batchSize, + uriReader); } /** Clips `parquetSchema` according to `fieldNames`. */ diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java index f5d93bec42bd..37a69fe9aebd 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java @@ -90,6 +90,7 @@ public static Type convertToParquetType(String name, DataType type, int fieldId, .withId(fieldId); case BINARY: case VARBINARY: + case BLOB: return Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition) .named(name) .withId(fieldId); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ColumnarBatch.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ColumnarBatch.java index 30a16aba822d..891f59c5e539 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ColumnarBatch.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ColumnarBatch.java @@ -28,6 +28,7 @@ import org.apache.paimon.data.columnar.VectorizedRowIterator; import org.apache.paimon.fs.Path; import org.apache.paimon.utils.LongIterator; +import org.apache.paimon.utils.UriReader; import java.util.Arrays; @@ -38,7 +39,7 @@ public class ColumnarBatch { protected final VectorizedColumnBatch vectorizedColumnBatch; protected final ColumnarRowIterator vectorizedRowIterator; - public ColumnarBatch(Path filePath, ColumnVector[] columns) { + public ColumnarBatch(Path filePath, ColumnVector[] columns, UriReader uriReader) { this.columns = columns; this.vectorizedColumnBatch = new VectorizedColumnBatch(columns); boolean containsNestedColumn = @@ -48,7 +49,7 @@ public ColumnarBatch(Path filePath, ColumnVector[] columns) { vector instanceof MapColumnVector || vector instanceof RowColumnVector || vector instanceof ArrayColumnVector); - ColumnarRow row = new ColumnarRow(vectorizedColumnBatch); + ColumnarRow row = new ColumnarRow(vectorizedColumnBatch, uriReader); this.vectorizedRowIterator = containsNestedColumn ? new ColumnarRowIterator(filePath, row, null) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java index e7d3ad04abeb..a7890953adf6 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java @@ -100,6 +100,7 @@ public static WritableColumnVector createWritableColumnVector( case CHAR: case VARCHAR: case VARBINARY: + case BLOB: return new HeapBytesVector(batchSize); case BINARY: return new HeapBytesVector(batchSize); @@ -176,6 +177,9 @@ public static ColumnVector createReadableColumnVector( case TIMESTAMP_WITHOUT_TIME_ZONE: case TIMESTAMP_WITH_LOCAL_TIME_ZONE: return new ParquetTimestampVector(writableVector); + case BLOB: + // Physical representation is bytes; higher-level Row#getBlob() handles descriptor. + return writableVector; case ARRAY: return new CastedArrayColumnVector( (HeapArrayVector) writableVector, diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java index 880af82af181..12b4849a2520 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java @@ -219,7 +219,15 @@ public UpdaterFactory visit(VariantType variantType) { @Override public UpdaterFactory visit(BlobType blobType) { - throw new RuntimeException("Blob type is not supported"); + // Physical representation is bytes (same as VARBINARY); higher-level Row#getBlob() + // interprets serialized BlobDescriptor when needed. + return c -> { + if (c.getPrimitiveType().getPrimitiveTypeName() + == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { + return new FixedLenByteArrayUpdater(c.getPrimitiveType().getTypeLength()); + } + return new BinaryUpdater(); + }; } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java index d8fd82056e67..c9b8544edc1c 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java @@ -25,6 +25,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.reader.FileRecordIterator; import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.utils.UriReader; import org.apache.parquet.VersionParser; import org.apache.parquet.column.ColumnDescriptor; @@ -75,6 +76,7 @@ public class VectorizedParquetRecordReader implements FileRecordReader fields; private final RowIndexGenerator rowIndexGenerator; @@ -88,7 +90,8 @@ public VectorizedParquetRecordReader( MessageType fileSchema, List fields, WritableColumnVector[] vectors, - int batchSize) + int batchSize, + UriReader uriReader) throws IOException { this.filePath = filePath; this.reader = reader; @@ -96,6 +99,7 @@ public VectorizedParquetRecordReader( this.fields = fields; this.totalRowCount = reader.getFilteredRecordCount(); this.batchSize = batchSize; + this.uriReader = uriReader; this.rowIndexGenerator = new RowIndexGenerator(); // fetch writer version from file metadata @@ -120,7 +124,8 @@ private void initBatch(WritableColumnVector[] vectors) { fields.stream() .map(ParquetField::getType) .collect(Collectors.toList()), - vectors)); + vectors), + uriReader); columnVectors = new ParquetColumnVector[fields.size()]; for (int i = 0; i < columnVectors.length; i++) { columnVectors[i] = diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java index 3629bb852b94..464516aed092 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java @@ -20,6 +20,8 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Blob; +import org.apache.paimon.data.BlobDescriptor; import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; @@ -105,6 +107,8 @@ private FieldWriter createWriter(DataType t, Type type) { case BINARY: case VARBINARY: return new BinaryWriter(); + case BLOB: + return new BlobDescriptorWriter(); case DECIMAL: DecimalType decimalType = (DecimalType) t; return createDecimalWriter(decimalType.getPrecision(), decimalType.getScale()); @@ -313,6 +317,33 @@ private void writeBinary(byte[] value) { } } + /** Writes BLOB as serialized {@link BlobDescriptor} bytes for descriptor-stored fields. */ + private class BlobDescriptorWriter implements FieldWriter { + + @Override + public void write(InternalRow row, int ordinal) { + writeBlob(row.getBlob(ordinal)); + } + + @Override + public void write(InternalArray arrayData, int ordinal) { + // Currently we don't support BLOB inside arrays/maps. + throw new UnsupportedOperationException("BLOB in array is not supported."); + } + + private void writeBlob(Blob blob) { + try { + BlobDescriptor descriptor = blob.toDescriptor(); + recordConsumer.addBinary(Binary.fromReusedByteArray(descriptor.serialize())); + } catch (Throwable t) { + throw new IllegalArgumentException( + "blob.stored-descriptor-fields requires blob field value to be a " + + "serialized BlobDescriptor (magic 'BLOBDESC').", + t); + } + } + } + private class IntWriter implements FieldWriter { @Override diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py index c47f82c50370..6a61fc6e1a0f 100644 --- a/paimon-python/pypaimon/common/file_io.py +++ b/paimon-python/pypaimon/common/file_io.py @@ -218,7 +218,12 @@ def write_lance(self, path: str, data, **kwargs): raise NotImplementedError("write_lance must be implemented by FileIO subclasses") def write_blob(self, path: str, data, blob_as_descriptor: bool, **kwargs): - """Write Blob format file. Must be implemented by subclasses.""" + """ + Write Blob format file. + + NOTE: blob_as_descriptor is kept only for compatibility with existing callers. + Blob write behavior is adaptive and does not depend on this flag. + """ raise NotImplementedError("write_blob must be implemented by FileIO subclasses") def close(self): diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py index c83a75c65585..aa082f540412 100644 --- a/paimon-python/pypaimon/common/options/core_options.py +++ b/paimon-python/pypaimon/common/options/core_options.py @@ -177,7 +177,17 @@ class CoreOptions: ConfigOptions.key("blob-as-descriptor") .boolean_type() .default_value(False) - .with_description("Whether to use blob as descriptor.") + .with_description("Whether to return blob values as serialized BlobDescriptor bytes when reading.") + ) + + BLOB_STORED_DESCRIPTOR_FIELDS: ConfigOption[str] = ( + ConfigOptions.key("blob.stored-descriptor-fields") + .string_type() + .no_default_value() + .with_description( + "Comma-separated BLOB field names that should be stored as serialized BlobDescriptor bytes " + "inline in normal data files." + ) ) TARGET_FILE_SIZE: ConfigOption[MemorySize] = ( @@ -484,6 +494,16 @@ def metadata_stats_enabled(self, default=None): def blob_as_descriptor(self, default=None): return self.options.get(CoreOptions.BLOB_AS_DESCRIPTOR, default) + def blob_stored_descriptor_fields(self, default=None): + value = self.options.get(CoreOptions.BLOB_STORED_DESCRIPTOR_FIELDS, default) + if value is None: + return set() + if isinstance(value, str): + return {field.strip() for field in value.split(",") if field.strip()} + if isinstance(value, (list, set, tuple)): + return {str(field).strip() for field in value if str(field).strip()} + return set() + def target_file_size(self, has_primary_key, default=None): return self.options.get(CoreOptions.TARGET_FILE_SIZE, MemorySize.of_mebi_bytes( diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py b/paimon-python/pypaimon/filesystem/local_file_io.py index 91fc1c906980..0f0686375ac6 100644 --- a/paimon-python/pypaimon/filesystem/local_file_io.py +++ b/paimon-python/pypaimon/filesystem/local_file_io.py @@ -396,6 +396,8 @@ def write_lance(self, path: str, data: pyarrow.Table, **kwargs): def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs): try: + # Kept for API compatibility. Write behavior is adaptive and does not depend on this flag. + _ = blob_as_descriptor if data.num_columns != 1: raise RuntimeError(f"Blob format only supports a single column, got {data.num_columns} columns") @@ -424,18 +426,24 @@ def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, * for i in range(num_rows): col_data = records_dict[field_name][i] if hasattr(fields[0].type, 'type') and fields[0].type.type == "BLOB": - if blob_as_descriptor: - blob_descriptor = BlobDescriptor.deserialize(col_data) - uri_reader = self.uri_reader_factory.create(blob_descriptor.uri) - blob_data = Blob.from_descriptor(uri_reader, blob_descriptor) - elif isinstance(col_data, bytes): - blob_data = BlobData(col_data) + if hasattr(col_data, 'as_py'): + col_data = col_data.as_py() + if isinstance(col_data, str): + col_data = col_data.encode('utf-8') + if isinstance(col_data, bytearray): + col_data = bytes(col_data) + + if isinstance(col_data, bytes): + if BlobDescriptor.is_blob_descriptor(col_data): + descriptor = BlobDescriptor.deserialize(col_data) + uri_reader = self.uri_reader_factory.create(descriptor.uri) + blob_data = Blob.from_descriptor(uri_reader, descriptor) + else: + blob_data = BlobData(col_data) else: - if hasattr(col_data, 'as_py'): - col_data = col_data.as_py() - if isinstance(col_data, str): - col_data = col_data.encode('utf-8') - blob_data = BlobData(col_data) + raise RuntimeError( + "Blob field value must be bytes/blob or serialized BlobDescriptor bytes." + ) row_values = [blob_data] else: row_values = [col_data] diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index cb689baeaf1e..5bed00ea3e74 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -447,6 +447,8 @@ def write_lance(self, path: str, data: pyarrow.Table, **kwargs): def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs): try: + # Kept for API compatibility. Write behavior is adaptive and does not depend on this flag. + _ = blob_as_descriptor if data.num_columns != 1: raise RuntimeError(f"Blob format only supports a single column, got {data.num_columns} columns") column = data.column(0) @@ -466,18 +468,24 @@ def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, * for i in range(num_rows): col_data = records_dict[field_name][i] if hasattr(fields[0].type, 'type') and fields[0].type.type == "BLOB": - if blob_as_descriptor: - blob_descriptor = BlobDescriptor.deserialize(col_data) - uri_reader = self.uri_reader_factory.create(blob_descriptor.uri) - blob_data = Blob.from_descriptor(uri_reader, blob_descriptor) - elif isinstance(col_data, bytes): - blob_data = BlobData(col_data) + if hasattr(col_data, 'as_py'): + col_data = col_data.as_py() + if isinstance(col_data, str): + col_data = col_data.encode('utf-8') + if isinstance(col_data, bytearray): + col_data = bytes(col_data) + + if isinstance(col_data, bytes): + if BlobDescriptor.is_blob_descriptor(col_data): + descriptor = BlobDescriptor.deserialize(col_data) + uri_reader = self.uri_reader_factory.create(descriptor.uri) + blob_data = Blob.from_descriptor(uri_reader, descriptor) + else: + blob_data = BlobData(col_data) else: - if hasattr(col_data, 'as_py'): - col_data = col_data.as_py() - if isinstance(col_data, str): - col_data = col_data.encode('utf-8') - blob_data = BlobData(col_data) + raise RuntimeError( + "Blob field value must be bytes/blob or serialized BlobDescriptor bytes." + ) row_values = [blob_data] else: row_values = [col_data] diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py index c9e51785a056..8ab9229e2ac6 100644 --- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py @@ -21,10 +21,12 @@ import pyarrow as pa from pyarrow import RecordBatch +from pypaimon.common.file_io import FileIO from pypaimon.read.partition_info import PartitionInfo from pypaimon.read.reader.format_blob_reader import FormatBlobReader from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader from pypaimon.schema.data_types import DataField, PyarrowFieldParser +from pypaimon.table.row.blob import Blob, BlobDescriptor from pypaimon.table.special_fields import SpecialFields @@ -38,7 +40,10 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p max_sequence_number: int, first_row_id: int, row_tracking_enabled: bool, - system_fields: dict): + system_fields: dict, + blob_as_descriptor: bool = False, + blob_stored_descriptor_fields: Optional[set] = None, + file_io: Optional[FileIO] = None): self.format_reader = format_reader self.index_mapping = index_mapping self.partition_info = partition_info @@ -48,6 +53,19 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p self.first_row_id = first_row_id self.max_sequence_number = max_sequence_number self.system_fields = system_fields + self.blob_as_descriptor = blob_as_descriptor + self.blob_stored_descriptor_fields = blob_stored_descriptor_fields or set() + self.file_io = file_io + self.blob_field_names = { + field.name + for field in fields + if hasattr(field.type, 'type') and field.type.type == 'BLOB' + } + self.descriptor_blob_fields = { + field_name + for field_name in self.blob_stored_descriptor_fields + if field_name in self.blob_field_names + } def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]: if isinstance(self.format_reader, FormatBlobReader): @@ -117,8 +135,73 @@ def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch if self.row_tracking_enabled and self.system_fields: record_batch = self._assign_row_tracking(record_batch) + record_batch = self._convert_descriptor_stored_blob_columns(record_batch) + return record_batch + def _convert_descriptor_stored_blob_columns(self, record_batch: RecordBatch) -> RecordBatch: + if isinstance(self.format_reader, FormatBlobReader): + return record_batch + if not self.descriptor_blob_fields: + return record_batch + + schema_names = set(record_batch.schema.names) + target_fields = [f for f in self.descriptor_blob_fields if f in schema_names] + if not target_fields: + return record_batch + + arrays = list(record_batch.columns) + for field_name in target_fields: + field_idx = record_batch.schema.get_field_index(field_name) + values = record_batch.column(field_idx).to_pylist() + + if self.blob_as_descriptor: + converted = [self._normalize_blob_cell(v) for v in values] + else: + converted = [self._blob_cell_to_data(v) for v in values] + arrays[field_idx] = pa.array(converted, type=pa.large_binary()) + + return pa.RecordBatch.from_arrays(arrays, schema=record_batch.schema) + + @staticmethod + def _normalize_blob_cell(value): + if value is None: + return None + if hasattr(value, 'as_py'): + value = value.as_py() + if isinstance(value, str): + value = value.encode('utf-8') + if isinstance(value, bytearray): + value = bytes(value) + return value + + def _blob_cell_to_data(self, value): + value = self._normalize_blob_cell(value) + if value is None: + return None + + if not isinstance(value, bytes): + return value + + descriptor = self._deserialize_descriptor_or_none(value) + if descriptor is None: + return value + + try: + uri_reader = self.file_io.uri_reader_factory.create(descriptor.uri) + blob = Blob.from_descriptor(uri_reader, descriptor) + return blob.to_data() + except Exception as e: + raise RuntimeError( + "Failed to read blob bytes from descriptor URI while converting blob value." + ) from e + + @staticmethod + def _deserialize_descriptor_or_none(raw: bytes): + if not BlobDescriptor.is_blob_descriptor(raw): + return None + return BlobDescriptor.deserialize(raw) + def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch: """Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER).""" arrays = list(record_batch.columns) diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 9b218aa0b4ae..350ca509ef1f 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -133,6 +133,9 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, else: raise ValueError(f"Unexpected file format: {file_format}") + blob_as_descriptor = CoreOptions.blob_as_descriptor(self.table.options) + blob_stored_descriptor_fields = CoreOptions.blob_stored_descriptor_fields(self.table.options) + index_mapping = self.create_index_mapping() partition_info = self._create_partition_info() system_fields = SpecialFields.find_system_fields(self.read_fields) @@ -150,7 +153,10 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, file.max_sequence_number, file.first_row_id, row_tracking_enabled, - system_fields) + system_fields, + blob_as_descriptor=blob_as_descriptor, + blob_stored_descriptor_fields=blob_stored_descriptor_fields, + file_io=self.table.file_io) else: return DataFileBatchReader( format_reader, @@ -161,7 +167,10 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, file.max_sequence_number, file.first_row_id, row_tracking_enabled, - system_fields) + system_fields, + blob_as_descriptor=blob_as_descriptor, + blob_stored_descriptor_fields=blob_stored_descriptor_fields, + file_io=self.table.file_io) def _get_fields_and_predicate(self, schema_id: int, read_fields): key = (schema_id, tuple(read_fields)) diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index e0a442f30582..78c229e9be4d 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -20,6 +20,7 @@ import pandas import pyarrow +from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.predicate import Predicate from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader from pypaimon.read.split import Split @@ -87,7 +88,56 @@ def to_arrow(self, splits: List[Split]) -> Optional[pyarrow.Table]: if not table_list: return pyarrow.Table.from_arrays([pyarrow.array([], type=field.type) for field in schema], schema=schema) else: - return pyarrow.Table.from_batches(table_list) + table = pyarrow.Table.from_batches(table_list) + return self._convert_descriptor_stored_fields_for_read(table) + + def _convert_descriptor_stored_fields_for_read(self, table: pyarrow.Table) -> pyarrow.Table: + if CoreOptions.blob_as_descriptor(self.table.options): + return table + + descriptor_fields = CoreOptions.blob_stored_descriptor_fields(self.table.options) + if not descriptor_fields: + return table + + from pypaimon.table.row.blob import Blob, BlobDescriptor + + result = table + for field_name in descriptor_fields: + if field_name not in result.column_names: + continue + values = result.column(field_name).to_pylist() + converted_values = [] + for value in values: + if value is None: + converted_values.append(None) + continue + if hasattr(value, 'as_py'): + value = value.as_py() + if isinstance(value, str): + value = value.encode('utf-8') + if isinstance(value, bytearray): + value = bytes(value) + if not isinstance(value, bytes): + converted_values.append(value) + continue + + try: + descriptor = BlobDescriptor.deserialize(value) + if descriptor.serialize() != value: + converted_values.append(value) + continue + uri_reader = self.table.file_io.uri_reader_factory.create(descriptor.uri) + converted_values.append(Blob.from_descriptor(uri_reader, descriptor).to_data()) + except Exception: + converted_values.append(value) + + column_idx = result.column_names.index(field_name) + result = result.set_column( + column_idx, + pyarrow.field(field_name, pyarrow.large_binary(), nullable=True), + pyarrow.array(converted_values, type=pyarrow.large_binary()), + ) + return result def _arrow_batch_generator(self, splits: List[Split], schema: pyarrow.Schema) -> Iterator[pyarrow.RecordBatch]: chunk_size = 65536 diff --git a/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py index 2770fad136a9..8d48642d597d 100644 --- a/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py +++ b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ """ -Sample demonstrating how to use blob-as-descriptor mode with REST catalog. +Sample demonstrating descriptor-stored blob fields with REST catalog. """ from pypaimon import CatalogFactory import pyarrow as pa @@ -50,9 +50,9 @@ def write_table_with_blob(catalog, video_file_path: str, external_oss_options: d 'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true', 'blob-field': 'video', - 'blob-as-descriptor': 'true' + 'blob.stored-descriptor-fields': 'video' }, - comment='Table with blob column using blob-as-descriptor mode') + comment='Table with blob column using descriptor-stored field') table_identifier = f'{database_name}.{table_name}' catalog.create_table( diff --git a/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py b/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py index fafe31b2f393..280322aabb73 100644 --- a/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py +++ b/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py @@ -218,7 +218,7 @@ def double_value(row): # print("Step 7: Blob data handling (Optional)") # print("="*60) # print("Paimon supports storing blob data (images, audio, etc.)") - # print("For blob data examples, see: pypaimon/sample/oss_blob_as_descriptor.py") + # print("For blob data examples, see: pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py") print("\n" + "="*60) print("Summary") @@ -232,7 +232,7 @@ def double_value(row): print(" - Scalable data processing with Ray operations") print(" - Integration with Ray ecosystem (Ray Train, Ray Serve, etc.)") print("\nFor blob data (images, audio) examples, see:") - print(" - pypaimon/sample/oss_blob_as_descriptor.py") + print(" - pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py") finally: server.shutdown() diff --git a/paimon-python/pypaimon/table/row/blob.py b/paimon-python/pypaimon/table/row/blob.py index a9a2e2fd490b..4faebd608dc0 100644 --- a/paimon-python/pypaimon/table/row/blob.py +++ b/paimon-python/pypaimon/table/row/blob.py @@ -17,6 +17,7 @@ ################################################################################ import io +import struct from abc import ABC, abstractmethod from typing import Optional, Union from urllib.parse import urlparse @@ -25,7 +26,8 @@ class BlobDescriptor: - CURRENT_VERSION = 1 + CURRENT_VERSION = 2 + MAGIC = 0x424C4F4244455343 # "BLOBDESC" def __init__(self, uri: str, offset: int, length: int, version: int = CURRENT_VERSION): self._version = version @@ -50,25 +52,20 @@ def version(self) -> int: return self._version def serialize(self) -> bytes: - import struct - uri_bytes = self._uri.encode('utf-8') uri_length = len(uri_bytes) - - # Pack using little endian format data = struct.pack(' 1: + data += struct.pack(' 'BlobDescriptor': - import struct - - if len(data) < 5: # minimum size: version(1) + uri_length(4) + if len(data) < 5: raise ValueError("Invalid BlobDescriptor data: too short") offset = 0 @@ -77,11 +74,26 @@ def deserialize(cls, data: bytes) -> 'BlobDescriptor': version = struct.unpack(' cls.CURRENT_VERSION: + raise ValueError( + f"Expecting BlobDescriptor version to be less than or equal to " + f"{cls.CURRENT_VERSION}, but found {version}." + ) + + if version > 1: + if offset + 8 > len(data): + raise ValueError("Invalid BlobDescriptor data: too short") + magic = struct.unpack(' len(data): + raise ValueError("Invalid BlobDescriptor data: too short") uri_length = struct.unpack(' 'BlobDescriptor': return cls(uri, blob_offset, blob_length, version) + @classmethod + def is_blob_descriptor(cls, data: bytes) -> bool: + if not isinstance(data, (bytes, bytearray)): + return False + raw = bytes(data) + if len(raw) < 9: + return False + + version = raw[0] + if version < 1: + return False + if version > cls.CURRENT_VERSION: + return False + if version == 1: + try: + cls.deserialize(raw) + return True + except Exception: + return False + + try: + magic = struct.unpack(' bool: """Check equality with another BlobDescriptor.""" if not isinstance(other, BlobDescriptor): diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index b1236d764b10..f4328a8e2667 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -197,7 +197,7 @@ def test_data_blob_writer_no_blob_column(self): writer.close() def test_data_blob_writer_multiple_blob_columns(self): - """Test that DataBlobWriter raises error when multiple blob columns are found.""" + """Test that DataBlobWriter supports multiple blob columns.""" from pypaimon import Schema # Test schema with multiple blob columns @@ -228,10 +228,18 @@ def test_data_blob_writer_multiple_blob_columns(self): 'blob2': [b'blob2_1', b'blob2_2', b'blob2_3'] }, schema=pa_schema) - # This should raise an error when DataBlobWriter is created internally - with self.assertRaises(ValueError) as context: - writer.write_arrow(test_data) - self.assertIn("Limit exactly one blob field in one paimon table yet", str(context.exception)) + writer.write_arrow(test_data) + commit_messages = writer.prepare_commit() + write_builder.new_commit().commit(commit_messages) + writer.close() + + all_files = [f for msg in commit_messages for f in msg.new_files] + blob_files = [f for f in all_files if f.file_name.endswith('.blob')] + self.assertGreaterEqual(len(blob_files), 2, "Each blob column should produce blob files") + + # Verify row counts can be read back correctly. + result = table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits()) + self.assertEqual(result.num_rows, 3) def test_data_blob_writer_write_operations(self): """Test DataBlobWriter write operations with real data.""" @@ -1074,7 +1082,94 @@ def test_blob_write_read_end_to_end_with_descriptor(self): print(" - Created external blob file and descriptor") print(" - Wrote and read blob descriptor successfully") print(" - Verified blob data can be read from descriptor") - print(" - Tested blob-as-descriptor=true mode") + print(" - Tested blob-as-descriptor=true read output mode") + + def test_blob_stored_descriptor_fields_mixed_mode(self): + import random + from pypaimon import Schema + from pypaimon.table.row.blob import BlobDescriptor + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('pic1', pa.large_binary()), + ('pic2', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'blob.stored-descriptor-fields': 'pic2' + } + ) + self.catalog.create_table('test_db.blob_mixed_mode_test', schema, False) + table = self.catalog.get_table('test_db.blob_mixed_mode_test') + + random.seed(7) + pic1_data = bytes(bytearray([random.randint(0, 255) for _ in range(1024)])) + + external_blob_path = os.path.join(self.temp_dir, 'mixed_external_blob') + pic2_data = b'pic2_descriptor_payload' + with open(external_blob_path, 'wb') as f: + f.write(pic2_data) + descriptor = BlobDescriptor(external_blob_path, 0, len(pic2_data)) + + test_data = pa.Table.from_pydict({ + 'id': [1], + 'pic1': [pic1_data], + 'pic2': [descriptor.serialize()] + }, schema=pa_schema) + + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + writer.write_arrow(test_data) + commit_messages = writer.prepare_commit() + write_builder.new_commit().commit(commit_messages) + writer.close() + + all_files = [f for msg in commit_messages for f in msg.new_files] + blob_files = [f for f in all_files if f.file_name.endswith('.blob')] + self.assertGreaterEqual(len(blob_files), 1) + self.assertTrue(all(f.write_cols == ['pic1'] for f in blob_files)) + + result = table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits()) + self.assertEqual(result.num_rows, 1) + self.assertEqual(result.column('pic1').to_pylist()[0], pic1_data) + self.assertEqual(result.column('pic2').to_pylist()[0], pic2_data) + + def test_blob_stored_descriptor_fields_rejects_non_descriptor_input(self): + from pypaimon import Schema + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('pic1', pa.large_binary()), + ('pic2', pa.large_binary()), + ]) + + schema = Schema.from_pyarrow_schema( + pa_schema, + options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'blob.stored-descriptor-fields': 'pic2' + } + ) + self.catalog.create_table('test_db.blob_mixed_mode_reject_test', schema, False) + table = self.catalog.get_table('test_db.blob_mixed_mode_reject_test') + + write_builder = table.new_batch_write_builder() + writer = write_builder.new_write() + + bad_data = pa.Table.from_pydict({ + 'id': [1], + 'pic1': [b'good'], + 'pic2': [b'not-a-descriptor'] + }, schema=pa_schema) + + with self.assertRaises(ValueError) as context: + writer.write_arrow(bad_data) + self.assertIn("blob.stored-descriptor-fields", str(context.exception)) def test_blob_write_read_large_data_end_to_end(self): """Test end-to-end blob functionality with large blob data (1MB per blob).""" @@ -1745,7 +1840,7 @@ def test_blob_as_descriptor_sequence_number_increment(self): from pypaimon import Schema from pypaimon.table.row.blob import BlobDescriptor - # Create schema with blob column (blob-as-descriptor=true) + # Create schema with blob column (descriptor input is auto-detected in write path) pa_schema = pa.schema([ ('id', pa.int32()), ('name', pa.string()), @@ -1770,7 +1865,7 @@ def test_blob_as_descriptor_sequence_number_increment(self): f.write(data) descriptors.append(BlobDescriptor(path, 0, len(data))) - # Write data row by row (this triggers the one-by-one writing in blob-as-descriptor mode) + # Write data row by row and verify blob sequence-number continuity write_builder = table.new_batch_write_builder() writer = write_builder.new_write() @@ -1832,7 +1927,7 @@ def test_blob_as_descriptor_sequence_number_increment(self): def test_blob_non_descriptor_sequence_number_increment(self): from pypaimon import Schema - # Create schema with blob column (blob-as-descriptor=false, normal mode) + # Create schema with blob column (default read output is blob bytes) pa_schema = pa.schema([ ('id', pa.int32()), ('name', pa.string()), @@ -1841,7 +1936,7 @@ def test_blob_non_descriptor_sequence_number_increment(self): schema = Schema.from_pyarrow_schema(pa_schema, options={ 'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true', - 'blob-as-descriptor': 'false' # Normal mode, not descriptor mode + 'blob-as-descriptor': 'false' # Read output as blob bytes }) self.catalog.create_table('test_db.blob_sequence_non_desc_test', schema, False) @@ -1855,7 +1950,7 @@ def test_blob_non_descriptor_sequence_number_increment(self): 'blob_data': [f'blob data {i}'.encode('utf-8') for i in range(num_rows)] }, schema=pa_schema) - # Write data as a batch (this triggers batch writing in non-descriptor mode) + # Write data as a batch and verify blob sequence-number continuity write_builder = table.new_batch_write_builder() writer = write_builder.new_write() writer.write_arrow(test_data) @@ -1901,7 +1996,7 @@ def test_blob_non_descriptor_sequence_number_increment(self): f"File: {blob_file.file_name}, min_seq: {min_seq}, max_seq: {max_seq}" ) - print("✅ Non-descriptor mode sequence number increment test passed") + print("✅ Blob sequence number increment test passed (batch write)") def test_blob_stats_schema_with_custom_column_name(self): from pypaimon import Schema @@ -1969,7 +2064,7 @@ def test_blob_file_name_format_with_shared_uuid_non_descriptor_mode(self): import re from pypaimon import Schema - # Create schema with blob column (blob-as-descriptor=false) + # Create schema with blob column (blob bytes read output) pa_schema = pa.schema([ ('id', pa.int32()), ('name', pa.string()), @@ -1978,7 +2073,7 @@ def test_blob_file_name_format_with_shared_uuid_non_descriptor_mode(self): schema = Schema.from_pyarrow_schema(pa_schema, options={ 'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true', - 'blob-as-descriptor': 'false', # Non-descriptor mode + 'blob-as-descriptor': 'false', # Read output as blob bytes 'target-file-size': '1MB' # Small target size to trigger multiple rollings }) @@ -2069,10 +2164,10 @@ def test_blob_file_name_format_with_shared_uuid_non_descriptor_mode(self): self.assertEqual(result.column('id').to_pylist(), list(range(1, num_blobs + 1))) def test_blob_non_descriptor_target_file_size_rolling(self): - """Test that blob.target-file-size is respected in non-descriptor mode.""" + """Test that blob.target-file-size is respected in blob write path.""" from pypaimon import Schema - # Create schema with blob column (non-descriptor mode) + # Create schema with blob column (blob write path) pa_schema = pa.schema([ ('id', pa.int32()), ('blob_data', pa.large_binary()), diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py index 91808bc01413..2f95bff04613 100644 --- a/paimon-python/pypaimon/tests/blob_test.py +++ b/paimon-python/pypaimon/tests/blob_test.py @@ -421,15 +421,21 @@ def test_blob_descriptor_deserialization_invalid_data(self): BlobDescriptor.deserialize(b"sho") # Only 3 bytes, need at least 5 self.assertIn("too short", str(context.exception)) - # Test with invalid version (version 0) - # Create valid data but with wrong version + # Test with unsupported version (> current version) valid_descriptor = BlobDescriptor("test://uri", 0, 100) valid_data = bytearray(valid_descriptor.serialize()) - valid_data[0] = 0 # Set invalid version (0) + valid_data[0] = 3 # Set unsupported version with self.assertRaises(ValueError) as context: BlobDescriptor.deserialize(bytes(valid_data)) - self.assertIn("Unsupported BlobDescriptor version", str(context.exception)) + self.assertIn("less than or equal to 2, but found 3", str(context.exception)) + + # Test with invalid magic for version 2 descriptor + invalid_magic_data = bytearray(valid_descriptor.serialize()) + invalid_magic_data[1:9] = b"\x00" * 8 + with self.assertRaises(ValueError) as context: + BlobDescriptor.deserialize(bytes(invalid_magic_data)) + self.assertIn("missing magic header", str(context.exception)) # Test with incomplete data (missing URI bytes) incomplete_data = b'\x01\x00\x00\x00\x10' # Version 1, URI length 16, but no URI bytes @@ -486,6 +492,15 @@ def test_blob_descriptor_version_handling(self): deserialized = BlobDescriptor.deserialize(serialized) self.assertEqual(deserialized.version, 2) + # v1 payloads should remain deserializable for compatibility + descriptor_v1 = BlobDescriptor("test://uri", 0, 100, version=1) + serialized_v1 = descriptor_v1.serialize() + deserialized_v1 = BlobDescriptor.deserialize(serialized_v1) + self.assertEqual(deserialized_v1.version, 1) + self.assertEqual(deserialized_v1.uri, descriptor_v1.uri) + self.assertEqual(deserialized_v1.offset, descriptor_v1.offset) + self.assertEqual(deserialized_v1.length, descriptor_v1.length) + def test_blob_descriptor_edge_cases(self): """Test BlobDescriptor with edge cases.""" # Test with empty URI @@ -532,15 +547,31 @@ def test_blob_descriptor_serialization_format(self): # Check that serialized data starts with version byte self.assertEqual(serialized[0], BlobDescriptor.CURRENT_VERSION) - # Check minimum length (version + uri_length + uri + offset + length) - # 1 + 4 + len("test") + 8 + 8 = 25 bytes - self.assertEqual(len(serialized), 25) + # Check minimum length (version + magic + uri_length + uri + offset + length) + # 1 + 8 + 4 + len("test") + 8 + 8 = 33 bytes + self.assertEqual(len(serialized), 33) # Verify round-trip consistency deserialized = BlobDescriptor.deserialize(serialized) re_serialized = deserialized.serialize() self.assertEqual(serialized, re_serialized) + def test_blob_descriptor_detection(self): + import struct + + descriptor_v2 = BlobDescriptor("test://uri", 1, 2) + descriptor_v1 = BlobDescriptor("test://uri", 1, 2, version=1) + random_bytes = b"not-a-descriptor" + fake_v1_prefix = b"\x01not-a-descriptor" + v2_magic_only = bytes([2]) + struct.pack('= start: + # Handle blob files specially. Each blob field tracks row ids independently. + if current_data_start >= start: raise RuntimeError( - f"This is a bug, blobStart {blob_start} should be less than start {start} " + f"This is a bug, blobStart {current_data_start} should be less than start {start} " f"when assigning a blob entry file." ) row_count = entry.file.row_count - row_id_assigned.append(entry.assign_first_row_id(blob_start)) - blob_start += row_count + blob_field_key = tuple(entry.file.write_cols or []) + field_blob_start = blob_start_by_field.get(blob_field_key, current_data_start) + row_id_assigned.append(entry.assign_first_row_id(field_blob_start)) + blob_start_by_field[blob_field_key] = field_blob_start + row_count else: # Handle regular files row_count = entry.file.row_count row_id_assigned.append(entry.assign_first_row_id(start)) - blob_start = start + current_data_start = start + blob_start_by_field.clear() start += row_count else: # For compact files or files that already have first_row_id, don't assign diff --git a/paimon-python/pypaimon/write/writer/blob_file_writer.py b/paimon-python/pypaimon/write/writer/blob_file_writer.py index 2b3ba1a1af1f..b038fcf2ab30 100644 --- a/paimon-python/pypaimon/write/writer/blob_file_writer.py +++ b/paimon-python/pypaimon/write/writer/blob_file_writer.py @@ -31,10 +31,9 @@ class BlobFileWriter: Writes rows one by one and tracks file size. """ - def __init__(self, file_io, file_path: Path, blob_as_descriptor: bool): + def __init__(self, file_io, file_path: Path): self.file_io = file_io self.file_path = file_path - self.blob_as_descriptor = blob_as_descriptor self.output_stream = file_io.new_output_stream(file_path) self.writer = BlobFormatWriter(self.output_stream) self.row_count = 0 @@ -50,30 +49,7 @@ def write_row(self, row_data: pa.Table): field_name = row_data.schema[0].name col_data = records_dict[field_name][0] - # Convert to Blob - if self.blob_as_descriptor: - # In blob-as-descriptor mode, we need to read external file data - # for rolling size calculation (based on external file size) - if isinstance(col_data, bytes): - blob_descriptor = BlobDescriptor.deserialize(col_data) - else: - # Handle PyArrow types - if hasattr(col_data, 'as_py'): - col_data = col_data.as_py() - if isinstance(col_data, str): - col_data = col_data.encode('utf-8') - blob_descriptor = BlobDescriptor.deserialize(col_data) - # Read external file data for rolling size calculation - uri_reader = self.file_io.uri_reader_factory.create(blob_descriptor.uri) - blob_data = Blob.from_descriptor(uri_reader, blob_descriptor) - elif isinstance(col_data, bytes): - blob_data = BlobData(col_data) - else: - if hasattr(col_data, 'as_py'): - col_data = col_data.as_py() - if isinstance(col_data, str): - col_data = col_data.encode('utf-8') - blob_data = BlobData(col_data) + blob_data = self._to_blob(col_data) # Create GenericRow fields = [DataField(0, field_name, PyarrowFieldParser.to_paimon_type(row_data.schema[0].type, False))] @@ -83,6 +59,36 @@ def write_row(self, row_data: pa.Table): self.writer.add_element(row) self.row_count += 1 + def _to_blob(self, col_data) -> Blob: + if hasattr(col_data, 'as_py'): + col_data = col_data.as_py() + if isinstance(col_data, str): + col_data = col_data.encode('utf-8') + if isinstance(col_data, bytearray): + col_data = bytes(col_data) + + if isinstance(col_data, Blob): + return col_data + + if isinstance(col_data, bytes): + if BlobDescriptor.is_blob_descriptor(col_data): + descriptor = BlobDescriptor.deserialize(col_data) + uri_reader = self.file_io.uri_reader_factory.create(descriptor.uri) + return Blob.from_descriptor(uri_reader, descriptor) + else: + return BlobData(col_data) + + raise ValueError( + "Blob field value must be bytes/blob or serialized BlobDescriptor bytes, " + f"got {type(col_data)}." + ) + + @staticmethod + def _deserialize_descriptor_or_none(raw: bytes): + if not BlobDescriptor.is_blob_descriptor(raw): + return None + return BlobDescriptor.deserialize(raw) + def reach_target_size(self, suggested_check: bool, target_size: int) -> bool: return self.writer.reach_target_size(suggested_check, target_size) diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py b/paimon-python/pypaimon/write/writer/blob_writer.py index 527045d1e0ab..d6994ca75c7e 100644 --- a/paimon-python/pypaimon/write/writer/blob_writer.py +++ b/paimon-python/pypaimon/write/writer/blob_writer.py @@ -60,30 +60,17 @@ def _check_and_roll_if_needed(self): if self.pending_data is None: return - if self.blob_as_descriptor: - # blob-as-descriptor=true: Write row by row and check actual file size - for i in range(self.pending_data.num_rows): - row_data = self.pending_data.slice(i, 1) - self._write_row_to_file(row_data) - self.record_count += 1 - - if self.rolling_file(False): - self.close_current_writer() - - # All data has been written - self.pending_data = None - else: - # blob-as-descriptor=false: Use blob_target_file_size instead of target_file_size - current_size = self.pending_data.nbytes - if current_size > self.blob_target_file_size: - split_row = self._find_optimal_split_point(self.pending_data, self.blob_target_file_size) - if split_row > 0: - data_to_write = self.pending_data.slice(0, split_row) - remaining_data = self.pending_data.slice(split_row) - - self._write_data_to_file(data_to_write) - self.pending_data = remaining_data - self._check_and_roll_if_needed() + # Always write blob rows one-by-one so rolling uses actual blob bytes size rather than + # in-memory serialized descriptor size. + for i in range(self.pending_data.num_rows): + row_data = self.pending_data.slice(i, 1) + self._write_row_to_file(row_data) + self.record_count += 1 + + if self.rolling_file(False): + self.close_current_writer() + + self.pending_data = None def _write_row_to_file(self, row_data: pa.Table): """Write a single row to the current blob file. Opens a new file if needed.""" @@ -103,7 +90,7 @@ def open_current_writer(self): self.file_count += 1 # Increment counter for next file file_path = self._generate_file_path(file_name) self.current_file_path = file_path - self.current_writer = BlobFileWriter(self.file_io, file_path, self.blob_as_descriptor) + self.current_writer = BlobFileWriter(self.file_io, file_path) def rolling_file(self, force_check: bool = False) -> bool: if self.current_writer is None: @@ -132,8 +119,8 @@ def close_current_writer(self): def _write_data_to_file(self, data): """ - Override for blob format in normal mode (blob-as-descriptor=false). - Only difference from parent: use shared UUID + counter for file naming. + Keep a fallback path for direct blob table writes while preserving the shared uuid+counter + naming behavior. """ if data.num_rows == 0: return @@ -146,7 +133,6 @@ def _write_data_to_file(self, data): self.file_count += 1 file_path = self._generate_file_path(file_name) - # Write blob file (parent class already supports blob format) self.file_io.write_blob(file_path, data, self.blob_as_descriptor) file_size = self.file_io.get_file_size(file_path) @@ -219,20 +205,20 @@ def _add_file_metadata(self, file_name: str, file_path: str, data_or_row_count, def prepare_commit(self): """Prepare commit, ensuring all data is written.""" - # Close current file if open (blob-as-descriptor=true mode) + # Close current file if open. if self.current_writer is not None: self.close_current_writer() - # Call parent to handle pending_data (blob-as-descriptor=false mode) + # Call parent to handle pending_data fallback. return super().prepare_commit() def close(self): """Close current writer if open.""" - # Close current file if open (blob-as-descriptor=true mode) + # Close current file if open. if self.current_writer is not None: self.close_current_writer() - # Call parent to handle pending_data (blob-as-descriptor=false mode) + # Call parent to handle pending_data fallback. super().close() def abort(self): diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py b/paimon-python/pypaimon/write/writer/data_blob_writer.py index 800e21e5a64d..f97bcf99b3e2 100644 --- a/paimon-python/pypaimon/write/writer/data_blob_writer.py +++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py @@ -18,7 +18,7 @@ import logging import uuid -from typing import List, Optional, Tuple +from typing import Dict, List, Optional, Tuple import pyarrow as pa @@ -38,8 +38,9 @@ class DataBlobWriter(DataWriter): A rolling file writer that handles both normal data and blob data. This writer creates separate files for normal columns and blob columns, managing their lifecycle independently. - For example, given a table schema with normal columns (id INT, name STRING) and a blob column - (data BLOB), this writer will create separate files for (id, name) and (data). + For example, given a table schema with normal columns (id INT, name STRING) and blob columns + (pic1 BLOB, pic2 BLOB), this writer will create separate files for normal columns and each + blob-file column. Key features: - Blob data can roll independently when normal data doesn't need rolling @@ -79,13 +80,31 @@ class DataBlobWriter(DataWriter): def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, options: CoreOptions = None): super().__init__(table, partition, bucket, max_seq_number, options) - # Determine blob column from table schema - self.blob_column_name = self._get_blob_columns_from_schema() + # Determine blob columns from table schema + self.blob_column_names = self._get_blob_columns_from_schema() + self.blob_stored_descriptor_fields = CoreOptions.blob_stored_descriptor_fields(self.options) + + unknown_descriptor_fields = self.blob_stored_descriptor_fields.difference( + set(self.blob_column_names) + ) + if unknown_descriptor_fields: + raise ValueError( + "Fields in 'blob.stored-descriptor-fields' must be blob fields in schema. " + f"Unknown fields: {sorted(unknown_descriptor_fields)}" + ) + + # Blob fields that should still be written to `.blob` files. + self.blob_file_column_names = [ + col for col in self.blob_column_names if col not in self.blob_stored_descriptor_fields + ] - # Split schema into normal and blob columns all_column_names = self.table.field_names - self.normal_column_names = [col for col in all_column_names if col != self.blob_column_name] - self.normal_columns = [field for field in self.table.table_schema.fields if field.name != self.blob_column_name] + self.normal_column_names = [ + col for col in all_column_names if col not in self.blob_file_column_names + ] + self.normal_columns = [ + field for field in self.table.table_schema.fields if field.name in self.normal_column_names + ] self.write_cols = self.normal_column_names # State management for blob writer @@ -95,33 +114,37 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op # Track pending data for normal data only self.pending_normal_data: Optional[pa.Table] = None - # Initialize blob writer with blob column name + # Initialize blob writers for each blob-file column. from pypaimon.write.writer.blob_writer import BlobWriter - self.blob_writer = BlobWriter( - table=self.table, - partition=self.partition, - bucket=self.bucket, - max_seq_number=max_seq_number, - blob_column=self.blob_column_name, - options=options - ) + self.blob_writers: Dict[str, BlobWriter] = {} + for blob_column in self.blob_file_column_names: + self.blob_writers[blob_column] = BlobWriter( + table=self.table, + partition=self.partition, + bucket=self.bucket, + max_seq_number=max_seq_number, + blob_column=blob_column, + options=options + ) - logger.info(f"Initialized DataBlobWriter with blob column: {self.blob_column_name}") + logger.info( + "Initialized DataBlobWriter with blob columns: %s, blob file columns: %s, descriptor " + "stored columns: %s", + self.blob_column_names, + self.blob_file_column_names, + sorted(self.blob_stored_descriptor_fields), + ) - def _get_blob_columns_from_schema(self) -> str: + def _get_blob_columns_from_schema(self) -> List[str]: blob_columns = [] for field in self.table.table_schema.fields: type_str = str(field.type).lower() if 'blob' in type_str: blob_columns.append(field.name) - # Validate blob column count (matching Java constraint) if len(blob_columns) == 0: raise ValueError("No blob field found in table schema.") - elif len(blob_columns) > 1: - raise ValueError("Limit exactly one blob field in one paimon table yet.") - - return blob_columns[0] # Return single blob column name + return blob_columns def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: normal_data, _ = self._split_data(data) @@ -133,7 +156,8 @@ def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table: def write(self, data: pa.RecordBatch): try: # Split data into normal and blob parts - normal_data, blob_data = self._split_data(data) + normal_data, blob_data_map = self._split_data(data) + self._validate_descriptor_stored_fields_input(data) # Process and accumulate normal data processed_normal = self._process_normal_data(normal_data) @@ -142,10 +166,10 @@ def write(self, data: pa.RecordBatch): else: self.pending_normal_data = self._merge_normal_data(self.pending_normal_data, processed_normal) - # Write blob data directly to blob writer (handles its own rolling) - if blob_data is not None and blob_data.num_rows > 0: - # Write blob data directly to blob writer - self.blob_writer.write(blob_data) + # Write blob-file columns to dedicated blob writers. + for blob_column, blob_data in blob_data_map.items(): + if blob_data is not None and blob_data.num_rows > 0: + self.blob_writers[blob_column].write(blob_data) self.record_count += data.num_rows @@ -181,21 +205,51 @@ def close(self): def abort(self): """Abort all writers and clean up resources.""" - self.blob_writer.abort() + for blob_writer in self.blob_writers.values(): + blob_writer.abort() self.pending_normal_data = None self.committed_files.clear() - def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, pa.RecordBatch]: + def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, Dict[str, pa.RecordBatch]]: """Split data into normal and blob parts based on column names.""" - # Use the pre-computed column names - normal_columns = self.normal_column_names - blob_columns = [self.blob_column_name] # Single blob column - - # Create projected batches - normal_data = data.select(normal_columns) if normal_columns else None - blob_data = data.select(blob_columns) if blob_columns else None + normal_data = data.select(self.normal_column_names) if self.normal_column_names else None + blob_data_map = { + blob_column: data.select([blob_column]) for blob_column in self.blob_file_column_names + } + return normal_data, blob_data_map + + def _validate_descriptor_stored_fields_input(self, data: pa.RecordBatch): + if not self.blob_stored_descriptor_fields: + return - return normal_data, blob_data + from pypaimon.table.row.blob import BlobDescriptor + + for field_name in self.blob_stored_descriptor_fields: + if field_name not in data.schema.names: + continue + values = data.column(data.schema.get_field_index(field_name)).to_pylist() + for value in values: + if value is None: + continue + if hasattr(value, 'as_py'): + value = value.as_py() + if isinstance(value, str): + value = value.encode('utf-8') + if not isinstance(value, (bytes, bytearray)): + raise ValueError( + "blob.stored-descriptor-fields requires blob field value to be a serialized " + "BlobDescriptor." + ) + try: + descriptor_bytes = bytes(value) + descriptor = BlobDescriptor.deserialize(descriptor_bytes) + if descriptor.serialize() != descriptor_bytes: + raise ValueError("Descriptor payload contains trailing bytes.") + except Exception as e: + raise ValueError( + "blob.stored-descriptor-fields requires blob field value to be a serialized " + "BlobDescriptor." + ) from e @staticmethod def _process_normal_data(data: pa.RecordBatch) -> pa.Table: @@ -228,11 +282,11 @@ def _close_current_writers(self): # Close normal writer and get metadata normal_meta = self._write_normal_data_to_file(self.pending_normal_data) - # Fetch blob metadata from blob writer - blob_metas = self.blob_writer.prepare_commit() - - # Validate consistency between normal and blob files (Java behavior) - self._validate_consistency(normal_meta, blob_metas) + blob_metas = [] + for blob_column in self.blob_file_column_names: + writer_metas = self.blob_writers[blob_column].prepare_commit() + self._validate_consistency(normal_meta, writer_metas, blob_column) + blob_metas.extend(writer_metas) # Add normal file metadata first self.committed_files.append(normal_meta) @@ -301,7 +355,8 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table, file_path=file_path, write_cols=self.write_cols) - def _validate_consistency(self, normal_meta: DataFileMeta, blob_metas: List[DataFileMeta]): + def _validate_consistency( + self, normal_meta: DataFileMeta, blob_metas: List[DataFileMeta], blob_column: str): if normal_meta is None: return @@ -312,5 +367,6 @@ def _validate_consistency(self, normal_meta: DataFileMeta, blob_metas: List[Data raise RuntimeError( f"This is a bug: The row count of main file and blob files does not match. " f"Main file: {normal_meta.file_name} (row count: {normal_row_count}), " + f"blob field: {blob_column}, " f"blob files: {[meta.file_name for meta in blob_metas]} (total row count: {blob_row_count})" ) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java index 3ad3666e9760..d2e70b9ed3b4 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java @@ -61,28 +61,25 @@ public class SparkInternalRowWrapper implements InternalRow, Serializable { private final StructType tableSchema; private final int length; - private final boolean blobAsDescriptor; @Nullable private final UriReaderFactory uriReaderFactory; @Nullable private final int[] fieldIndexMap; private transient org.apache.spark.sql.catalyst.InternalRow internalRow; public SparkInternalRowWrapper(StructType tableSchema, int length) { - this(tableSchema, length, null, false, null); + this(tableSchema, length, null, null); } public SparkInternalRowWrapper( StructType tableSchema, int length, StructType dataSchema, - boolean blobAsDescriptor, CatalogContext catalogContext) { this.tableSchema = tableSchema; this.length = length; this.fieldIndexMap = dataSchema != null ? buildFieldIndexMap(tableSchema, dataSchema) : null; - this.blobAsDescriptor = blobAsDescriptor; - this.uriReaderFactory = blobAsDescriptor ? new UriReaderFactory(catalogContext) : null; + this.uriReaderFactory = new UriReaderFactory(catalogContext); } public SparkInternalRowWrapper replace(org.apache.spark.sql.catalyst.InternalRow internalRow) { @@ -243,12 +240,14 @@ public Variant getVariant(int pos) { @Override public Blob getBlob(int pos) { - if (blobAsDescriptor) { - BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(internalRow.getBinary(pos)); + byte[] bytes = internalRow.getBinary(pos); + boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes); + if (blobDes) { + BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes); UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri()); return Blob.fromDescriptor(uriReader, blobDescriptor); } else { - return new BlobData(internalRow.getBinary(pos)); + return new BlobData(bytes); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java index cc947c7ea04b..36b5624ff52f 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java @@ -62,24 +62,17 @@ public class SparkRow implements InternalRow, Serializable { private final RowType type; private final Row row; private final RowKind rowKind; - private final boolean blobAsDescriptor; private final UriReaderFactory uriReaderFactory; public SparkRow(RowType type, Row row) { - this(type, row, RowKind.INSERT, false, null); + this(type, row, RowKind.INSERT, null); } - public SparkRow( - RowType type, - Row row, - RowKind rowkind, - boolean blobAsDescriptor, - CatalogContext catalogContext) { + public SparkRow(RowType type, Row row, RowKind rowkind, CatalogContext catalogContext) { this.type = type; this.row = row; this.rowKind = rowkind; - this.blobAsDescriptor = blobAsDescriptor; - this.uriReaderFactory = blobAsDescriptor ? new UriReaderFactory(catalogContext) : null; + this.uriReaderFactory = new UriReaderFactory(catalogContext); } @Override @@ -168,12 +161,14 @@ public Variant getVariant(int i) { @Override public Blob getBlob(int i) { - if (blobAsDescriptor) { - BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(row.getAs(i)); + byte[] bytes = row.getAs(i); + boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes); + if (blobDes) { + BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes); UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri()); return Blob.fromDescriptor(uriReader, blobDescriptor); } else { - return new BlobData(row.getAs(i)); + return new BlobData(bytes); } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala index 077d92cc292b..a5d9278523ab 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala @@ -74,7 +74,6 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se table.newBatchWriteBuilder(), writeType, firstRowIdToPartitionMap, - coreOptions.blobAsDescriptor(), table.catalogEnvironment().catalogContext()) try { iter.foreach(row => write.write(row)) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index de5804cc72f2..ebdbf038fff2 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -138,7 +138,6 @@ case class PaimonSparkWriter( writeRowTracking, fullCompactionDeltaCommits, batchId, - coreOptions.blobAsDescriptor(), table.catalogEnvironment().catalogContext(), postponePartitionBucketComputer ) @@ -455,7 +454,6 @@ case class PaimonSparkWriter( val toPaimonRow = SparkRowUtils.toPaimonRow( rowType, rowKindColIdx, - table.coreOptions().blobAsDescriptor(), table.catalogEnvironment().catalogContext()) bootstrapIterator.asScala diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala index 572e1cf96c22..5c25b70e6754 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala @@ -104,7 +104,6 @@ case class SparkPostponeCompactProcedure( writeRowTracking = coreOptions.dataEvolutionEnabled(), Option.apply(coreOptions.fullCompactionDeltaCommits()), None, - coreOptions.blobAsDescriptor(), table.catalogEnvironment().catalogContext(), Some(postponePartitionBucketComputer) ) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala index 86b070083fe9..eee55454546c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala @@ -30,7 +30,6 @@ object SparkRowUtils { def toPaimonRow( writeType: RowType, rowkindColIdx: Int, - blobAsDescriptor: Boolean, catalogContext: CatalogContext): Row => SparkRow = { if (rowkindColIdx != -1) { row => @@ -38,9 +37,8 @@ object SparkRowUtils { writeType, row, RowKind.fromByteValue(row.getByte(rowkindColIdx)), - blobAsDescriptor, catalogContext) - } else { row => new SparkRow(writeType, row, RowKind.INSERT, blobAsDescriptor, catalogContext) } + } else { row => new SparkRow(writeType, row, RowKind.INSERT, catalogContext) } } def getFieldIndex(schema: StructType, colName: String): Int = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala index 0bc9bc0a7684..c0e5190d5dfa 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala @@ -40,7 +40,6 @@ case class DataEvolutionTableDataWrite( writeBuilder: BatchWriteBuilder, writeType: RowType, firstRowIdToPartitionMap: mutable.HashMap[Long, (BinaryRow, Long)], - blobAsDescriptor: Boolean, catalogContext: CatalogContext) extends InnerTableV1DataWrite { @@ -51,7 +50,7 @@ case class DataEvolutionTableDataWrite( private val commitMessages = ListBuffer[CommitMessageImpl]() private val toPaimonRow = { - SparkRowUtils.toPaimonRow(writeType, -1, blobAsDescriptor, catalogContext) + SparkRowUtils.toPaimonRow(writeType, -1, catalogContext) } def write(row: Row): Unit = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonDataWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonDataWrite.scala index f0861c27b5cb..02b9338b4bc9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonDataWrite.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonDataWrite.scala @@ -37,7 +37,6 @@ case class PaimonDataWrite( writeRowTracking: Boolean = false, fullCompactionDeltaCommits: Option[Int], batchId: Option[Long], - blobAsDescriptor: Boolean, catalogContext: CatalogContext, postponePartitionBucketComputer: Option[BinaryRow => Integer]) extends abstractInnerTableDataWrite[Row] @@ -58,7 +57,7 @@ case class PaimonDataWrite( } private val toPaimonRow = { - SparkRowUtils.toPaimonRow(writeType, rowKindColIdx, blobAsDescriptor, catalogContext) + SparkRowUtils.toPaimonRow(writeType, rowKindColIdx, catalogContext) } def write(row: Row): Unit = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala index fbd166a18312..aa2dfcdf8f56 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala @@ -45,7 +45,6 @@ case class PaimonV2DataWriter( val fullCompactionDeltaCommits: Option[Int] = Option.apply(coreOptions.fullCompactionDeltaCommits()) - val blobAsDescriptor: Boolean = coreOptions.blobAsDescriptor() val write: TableWriteImpl[InternalRow] = { writeBuilder @@ -57,12 +56,8 @@ case class PaimonV2DataWriter( private val rowConverter: InternalRow => SparkInternalRowWrapper = { val numFields = writeSchema.fields.length - val reusableWrapper = new SparkInternalRowWrapper( - writeSchema, - numFields, - dataSchema, - blobAsDescriptor, - catalogContext) + val reusableWrapper = + new SparkInternalRowWrapper(writeSchema, numFields, dataSchema, catalogContext) record => reusableWrapper.replace(record) } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala index 08c874cd3bee..1c108a9abf31 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala @@ -91,13 +91,15 @@ class BlobTestBase extends PaimonSparkTestBase { val blobDescriptor = new BlobDescriptor(uri, 0, blobData.length) sql( - "CREATE TABLE t (id INT, data STRING, picture BINARY) TBLPROPERTIES ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture', 'blob-as-descriptor'='true')") + "CREATE TABLE t (id INT, data STRING, picture BINARY) TBLPROPERTIES ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture')") sql( "INSERT INTO t VALUES (1, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')," + "(5, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')," + "(2, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')," + "(3, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')," + "(4, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')") + + sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='true')") val newDescriptorBytes = sql("SELECT picture FROM t WHERE id = 1").collect()(0).get(0).asInstanceOf[Array[Byte]] val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes) @@ -132,10 +134,11 @@ class BlobTestBase extends PaimonSparkTestBase { sql( "CREATE TABLE IF NOT EXISTS t (\n" + "id STRING,\n" + "name STRING,\n" + "file_size STRING,\n" + "crc64 STRING,\n" + "modified_time STRING,\n" + "content BINARY\n" + ") \n" + "PARTITIONED BY (ds STRING, batch STRING) \n" + - "TBLPROPERTIES ('comment' = 'blob table','partition.expiration-time' = '365 d','row-tracking.enabled' = 'true','data-evolution.enabled' = 'true','blob-field' = 'content','blob-as-descriptor' = 'true')") + "TBLPROPERTIES ('comment' = 'blob table','partition.expiration-time' = '365 d','row-tracking.enabled' = 'true','data-evolution.enabled' = 'true','blob-field' = 'content')") sql( "INSERT OVERWRITE TABLE t\nPARTITION(ds= '1017',batch = 'test') VALUES \n('1','paimon','1024','12345678','20241017',X'" + bytesToHex( blobDescriptor.serialize()) + "')") + sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='true')") val newDescriptorBytes = sql("SELECT content FROM t WHERE id = '1'").collect()(0).get(0).asInstanceOf[Array[Byte]] val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes)