BLOB_AS_DESCRIPTOR =
key("blob-as-descriptor")
.booleanType()
@@ -2710,6 +2719,22 @@ public boolean blobSplitByFileSize() {
.orElse(!options.get(BLOB_AS_DESCRIPTOR));
}
+ /**
+ * Resolve blob fields that should be stored as serialized descriptor bytes in data files.
+ *
+ * If this option is not set, all blob fields are stored in '.blob' files by default.
+ */
+ public Set blobStoredDescriptorFields() {
+ return options.getOptional(BLOB_STORED_DESCRIPTOR_FIELDS)
+ .map(
+ s ->
+ Arrays.stream(s.split(","))
+ .map(String::trim)
+ .filter(str -> !str.isEmpty())
+ .collect(Collectors.toSet()))
+ .orElse(Collections.emptySet());
+ }
+
public long compactionFileSize(boolean hasPrimaryKey) {
// file size to join the compaction, we don't process on middle file size to avoid
// compact a same file twice (the compression is not calculate so accurately. the output
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
index 7f3fa010c42a..b5f3ad3cf8e3 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
/**
* Data type of binary large object.
@@ -67,13 +68,25 @@ public R accept(DataTypeVisitor visitor) {
}
public static Pair splitBlob(RowType rowType) {
+ return splitBlob(rowType, java.util.Collections.emptySet());
+ }
+
+ /**
+ * Split row fields into normal fields and blob-file fields.
+ *
+ * Blob fields contained in {@code blobStoredDescriptorFields} are treated as normal fields
+ * (stored inline as serialized descriptor bytes), while other blob fields are treated as
+ * blob-file fields.
+ */
+ public static Pair splitBlob(
+ RowType rowType, Set blobStoredDescriptorFields) {
List fields = rowType.getFields();
List normalFields = new ArrayList<>();
List blobFields = new ArrayList<>();
for (DataField field : fields) {
DataTypeRoot type = field.type().getTypeRoot();
- if (type == DataTypeRoot.BLOB) {
+ if (type == DataTypeRoot.BLOB && !blobStoredDescriptorFields.contains(field.name())) {
blobFields.add(field);
} else {
normalFields.add(field);
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java
index 3e60ea34867d..4e0ca0af6045 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/BlobDescriptor.java
@@ -45,8 +45,8 @@
public class BlobDescriptor implements Serializable {
private static final long serialVersionUID = 1L;
-
- private static final byte CURRENT_VERSION = 1;
+ private static final long MAGIC = 0x424C4F4244455343L; // "BLOBDESC"
+ private static final byte CURRENT_VERSION = 2;
private final byte version;
private final String uri;
@@ -113,11 +113,12 @@ public byte[] serialize() {
byte[] uriBytes = uri.getBytes(UTF_8);
int uriLength = uriBytes.length;
- int totalSize = 1 + 4 + uriLength + 8 + 8;
+ int totalSize = 1 + 8 + 4 + uriLength + 8 + 8;
ByteBuffer buffer = ByteBuffer.allocate(totalSize);
buffer.order(ByteOrder.LITTLE_ENDIAN);
buffer.put(version);
+ buffer.putLong(MAGIC);
buffer.putInt(uriLength);
buffer.put(uriBytes);
@@ -130,16 +131,26 @@ public byte[] serialize() {
public static BlobDescriptor deserialize(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
buffer.order(ByteOrder.LITTLE_ENDIAN);
-
byte version = buffer.get();
- if (version != CURRENT_VERSION) {
+ if (version > CURRENT_VERSION) {
throw new UnsupportedOperationException(
- "Expecting BlobDescriptor version to be "
+ "Expecting BlobDescriptor version to be less than or equal to "
+ CURRENT_VERSION
+ ", but found "
+ version
+ ".");
}
+
+ if (version > 1) {
+ if (MAGIC != buffer.getLong()) {
+ throw new IllegalArgumentException(
+ "Invalid BlobDescriptor: missing magic header. Expected magic: "
+ + MAGIC
+ + ", but found: "
+ + buffer.getLong());
+ }
+ }
+
int uriLength = buffer.getInt();
byte[] uriBytes = new byte[uriLength];
buffer.get(uriBytes);
@@ -149,4 +160,26 @@ public static BlobDescriptor deserialize(byte[] bytes) {
long length = buffer.getLong();
return new BlobDescriptor(version, uri, offset, length);
}
+
+ public static boolean isBlobDescriptor(byte[] bytes) {
+ if (bytes.length < 9) {
+ return false;
+ }
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+
+ byte version = buffer.get();
+ if (version == 1) {
+ try {
+ deserialize(bytes);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ } else if (version > CURRENT_VERSION) {
+ return false;
+ } else {
+ return MAGIC == buffer.getLong();
+ }
+ }
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
index d7423608af45..a3b2db470176 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRow.java
@@ -20,7 +20,7 @@
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Blob;
-import org.apache.paimon.data.BlobData;
+import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.DataSetters;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
@@ -30,6 +30,7 @@
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.UriReader;
import java.io.Serializable;
@@ -43,16 +44,27 @@ public final class ColumnarRow implements InternalRow, DataSetters, Serializable
private RowKind rowKind = RowKind.INSERT;
private VectorizedColumnBatch vectorizedColumnBatch;
+ private UriReader uriReader;
private int rowId;
public ColumnarRow() {}
public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch) {
- this(vectorizedColumnBatch, 0);
+ this(vectorizedColumnBatch, null, 0);
}
public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch, int rowId) {
+ this(vectorizedColumnBatch, null, rowId);
+ }
+
+ public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch, UriReader uriReader) {
+ this(vectorizedColumnBatch, uriReader, 0);
+ }
+
+ public ColumnarRow(
+ VectorizedColumnBatch vectorizedColumnBatch, UriReader uriReader, int rowId) {
this.vectorizedColumnBatch = vectorizedColumnBatch;
+ this.uriReader = uriReader;
this.rowId = rowId;
}
@@ -61,6 +73,10 @@ public void setVectorizedColumnBatch(VectorizedColumnBatch vectorizedColumnBatch
this.rowId = 0;
}
+ public void setUriReader(UriReader uriReader) {
+ this.uriReader = uriReader;
+ }
+
public VectorizedColumnBatch batch() {
return vectorizedColumnBatch;
}
@@ -151,7 +167,17 @@ public Variant getVariant(int pos) {
@Override
public Blob getBlob(int pos) {
- return new BlobData(getBinary(pos));
+ byte[] bytes = getBinary(pos);
+ if (bytes == null) {
+ return null;
+ }
+ if (uriReader == null) {
+ throw new IllegalStateException("UriReader is null, cannot read blob data from uri!");
+ }
+
+ // Only blob descriptor could be able to stored in columnar format.
+ BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
+ return Blob.fromDescriptor(uriReader, blobDescriptor);
}
@Override
@@ -238,7 +264,7 @@ public int hashCode() {
public ColumnarRow copy(ColumnVector[] vectors) {
VectorizedColumnBatch vectorizedColumnBatchCopy = vectorizedColumnBatch.copy(vectors);
- ColumnarRow columnarRow = new ColumnarRow(vectorizedColumnBatchCopy, rowId);
+ ColumnarRow columnarRow = new ColumnarRow(vectorizedColumnBatchCopy, uriReader, rowId);
columnarRow.setRowKind(rowKind);
return columnarRow;
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java
index 92e3b8e6a842..6e21aee2d236 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/UriReaderFactory.java
@@ -25,13 +25,14 @@
import javax.annotation.Nullable;
import java.io.IOException;
+import java.io.Serializable;
import java.net.URI;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/** A factory to create and cache {@link UriReader}. */
-public class UriReaderFactory {
+public class UriReaderFactory implements Serializable {
private final CatalogContext context;
private final Map readers;
diff --git a/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java b/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java
index ef860e8c55a7..eaf9d3dea88b 100644
--- a/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/data/BlobDescriptorTest.java
@@ -18,6 +18,8 @@
package org.apache.paimon.data;
+import org.apache.paimon.utils.IOUtils;
+
import org.junit.jupiter.api.Test;
import java.lang.reflect.Constructor;
@@ -38,7 +40,7 @@ public void testEquals() throws Exception {
BlobDescriptor descriptor3 = new BlobDescriptor(uri2, 100L, 200L);
BlobDescriptor descriptor4 = new BlobDescriptor(uri1, 150L, 200L);
BlobDescriptor descriptor5 = new BlobDescriptor(uri1, 100L, 250L);
- BlobDescriptor descriptor6 = createDescriptorWithVersion((byte) 2, uri1, 100L, 200L);
+ BlobDescriptor descriptor6 = createDescriptorWithVersion((byte) 3, uri1, 100L, 200L);
assertThat(descriptor1).isEqualTo(descriptor2);
assertThat(descriptor1).isNotEqualTo(descriptor3);
assertThat(descriptor1).isNotEqualTo(descriptor4);
@@ -64,7 +66,7 @@ public void testToString() {
BlobDescriptor descriptor = new BlobDescriptor(uri, 100L, 200L);
String toString = descriptor.toString();
- assertThat(toString).contains("version=1");
+ assertThat(toString).contains("version=2");
assertThat(toString).contains("uri='/test/path'");
assertThat(toString).contains("offset=100");
assertThat(toString).contains("length=200");
@@ -90,10 +92,26 @@ public void testSerializeAndDeserialize() {
public void testDeserializeWithUnsupportedVersion() {
String uri = "/test/path";
byte[] serialized = new BlobDescriptor(uri, 1, 1).serialize();
- serialized[0] = 2;
+ serialized[0] = 3;
assertThatThrownBy(() -> BlobDescriptor.deserialize(serialized))
.isInstanceOf(UnsupportedOperationException.class)
- .hasMessageContaining("Expecting BlobDescriptor version to be 1, but found 2.");
+ .hasMessageContaining(
+ "Expecting BlobDescriptor version to be less than or equal to 2, but found 3.");
+ }
+
+ @Test
+ public void testBlobVersionCompatible() throws Exception {
+ byte[] serialized =
+ IOUtils.readFully(
+ BlobDescriptorTest.class
+ .getClassLoader()
+ .getResourceAsStream("compatible/blob_descriptor_v1"),
+ true);
+
+ BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(serialized);
+ assertThat(blobDescriptor.uri()).isEqualTo("/test/path");
+ assertThat(blobDescriptor.offset()).isEqualTo(100L);
+ assertThat(blobDescriptor.length()).isEqualTo(200L);
}
private BlobDescriptor createDescriptorWithVersion(
diff --git a/paimon-common/src/test/resources/compatible/blob_descriptor_v1 b/paimon-common/src/test/resources/compatible/blob_descriptor_v1
new file mode 100644
index 000000000000..9e3412deac81
Binary files /dev/null and b/paimon-common/src/test/resources/compatible/blob_descriptor_v1 differ
diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index a3bf3ccf0ec8..8f0a490b8c15 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -76,6 +76,10 @@ under the License.
jdk.tools
jdk.tools
+
+ com.google.protobuf
+ protobuf-java
+
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index cd7acb4a3390..abc2f2b4a112 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -41,6 +41,7 @@
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BatchRecordWriter;
import org.apache.paimon.utils.CommitIncrement;
@@ -59,11 +60,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
-import static org.apache.paimon.types.DataTypeRoot.BLOB;
-
/**
* A {@link RecordWriter} implementation that only accepts records which are always insert
* operations and don't have any unique keys or sort keys.
@@ -95,6 +95,7 @@ public class AppendOnlyWriter implements BatchRecordWriter, MemoryOwner {
private final FileIndexOptions fileIndexOptions;
private final MemorySize maxDiskSize;
@Nullable private final BlobConsumer blobConsumer;
+ private final Set blobStoredDescriptorFields;
@Nullable private CompactDeletionFile compactDeletionFile;
private SinkWriter sinkWriter;
@@ -125,6 +126,7 @@ public AppendOnlyWriter(
boolean asyncFileWrite,
boolean statsDenseStore,
@Nullable BlobConsumer blobConsumer,
+ Set blobStoredDescriptorFields,
boolean dataEvolutionEnabled) {
this.fileIO = fileIO;
this.schemaId = schemaId;
@@ -153,6 +155,7 @@ public AppendOnlyWriter(
this.statsCollectorFactories = statsCollectorFactories;
this.maxDiskSize = maxDiskSize;
this.fileIndexOptions = fileIndexOptions;
+ this.blobStoredDescriptorFields = blobStoredDescriptorFields;
this.sinkWriter =
useWriteBuffer
@@ -304,7 +307,8 @@ public void toBufferedWriter() throws Exception {
}
private RollingFileWriter createRollingRowWriter() {
- if (writeSchema.getFieldTypes().stream().anyMatch(t -> t.is(BLOB))) {
+ if (BlobType.splitBlob(writeSchema, blobStoredDescriptorFields).getRight().getFieldCount()
+ > 0) {
return new RollingBlobFileWriter(
fileIO,
schemaId,
@@ -324,7 +328,8 @@ private RollingFileWriter createRollingRowWriter() {
// benefit from async write, but cost a lot.
false,
statsDenseStore,
- blobConsumer);
+ blobConsumer,
+ blobStoredDescriptorFields);
}
return new RowDataRollingFileWriter(
fileIO,
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
index b55bdd8ef09a..d19c4cc21914 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/MultipleBlobFileWriter.java
@@ -42,6 +42,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.function.Supplier;
import static java.util.Collections.singletonList;
@@ -61,8 +62,10 @@ public MultipleBlobFileWriter(
boolean asyncFileWrite,
boolean statsDenseStore,
long targetFileSize,
- @Nullable BlobConsumer blobConsumer) {
- RowType blobRowType = BlobType.splitBlob(writeSchema).getRight();
+ @Nullable BlobConsumer blobConsumer,
+ Set blobStoredDescriptorFields) {
+ RowType blobRowType =
+ BlobType.splitBlob(writeSchema, blobStoredDescriptorFields).getRight();
this.blobWriters = new ArrayList<>();
for (String blobFieldName : blobRowType.getFieldNames()) {
BlobFileFormat blobFileFormat = new BlobFileFormat();
diff --git a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index 80faa01d9238..590b19b23b98 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -48,6 +48,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Supplier;
/**
@@ -83,6 +84,7 @@ public class RollingBlobFileWriter implements RollingFileWriter blobWriterFactory;
private final long targetFileSize;
+ private final Set blobStoredDescriptorFields;
// State management
private final List closedWriters;
@@ -109,11 +111,13 @@ public RollingBlobFileWriter(
FileSource fileSource,
boolean asyncFileWrite,
boolean statsDenseStore,
- @Nullable BlobConsumer blobConsumer) {
+ @Nullable BlobConsumer blobConsumer,
+ Set blobStoredDescriptorFields) {
// Initialize basic fields
this.targetFileSize = targetFileSize;
this.results = new ArrayList<>();
this.closedWriters = new ArrayList<>();
+ this.blobStoredDescriptorFields = blobStoredDescriptorFields;
// Initialize writer factory for normal data
this.writerFactory =
@@ -121,7 +125,7 @@ public RollingBlobFileWriter(
fileIO,
schemaId,
fileFormat,
- BlobType.splitBlob(writeSchema).getLeft(),
+ BlobType.splitBlob(writeSchema, blobStoredDescriptorFields).getLeft(),
writeSchema,
pathFactory,
seqNumCounterSupplier,
@@ -145,7 +149,8 @@ public RollingBlobFileWriter(
asyncFileWrite,
statsDenseStore,
blobTargetFileSize,
- blobConsumer);
+ blobConsumer,
+ this.blobStoredDescriptorFields);
}
/** Creates a factory for normal data writers. */
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index bc39a4ae8e76..ae1405e3de62 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -58,6 +58,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -85,6 +86,7 @@ public abstract class BaseAppendFileStoreWrite extends MemoryFileStoreWrite blobStoredDescriptorFields;
public BaseAppendFileStoreWrite(
FileIO fileIO,
@@ -108,6 +110,7 @@ public BaseAppendFileStoreWrite(
this.fileFormat = fileFormat(options);
this.pathFactory = pathFactory;
this.withBlob = rowType.getFieldTypes().stream().anyMatch(t -> t.is(BLOB));
+ this.blobStoredDescriptorFields = options.blobStoredDescriptorFields();
this.fileIndexOptions = options.indexColumnsOptions();
}
@@ -153,6 +156,7 @@ protected RecordWriter createWriter(
options.asyncFileWrite(),
options.statsDenseStore(),
blobConsumer,
+ blobStoredDescriptorFields,
options.dataEvolutionEnabled());
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index f4cb16a018ce..6f7eb60c664b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -34,6 +34,7 @@
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
@@ -47,9 +48,11 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
@@ -156,7 +159,11 @@ public static void validateTableSchema(TableSchema schema) {
FileFormat fileFormat =
FileFormat.fromIdentifier(options.formatType(), new Options(schema.options()));
- fileFormat.validateDataFields(BlobType.splitBlob(new RowType(schema.fields())).getLeft());
+ RowType tableRowType = new RowType(schema.fields());
+ Set blobStoredDescriptorFields =
+ validateBlobStoredDescriptorFields(tableRowType, options);
+ fileFormat.validateDataFields(
+ BlobType.splitBlob(tableRowType, blobStoredDescriptorFields).getLeft());
// Check column names in schema
schema.fieldNames()
@@ -614,11 +621,29 @@ private static void validateRowTracking(TableSchema schema, CoreOptions options)
normalAndBlobType.getLeft().getFieldCount() > 0,
"Table with BLOB type column must have other normal columns.");
checkArgument(
- !schema.partitionKeys().contains(blobNames.get(0)),
+ blobNames.stream().noneMatch(schema.partitionKeys()::contains),
"The BLOB type column can not be part of partition keys.");
}
}
+ private static Set validateBlobStoredDescriptorFields(
+ RowType rowType, CoreOptions options) {
+ Set blobFieldNames =
+ rowType.getFields().stream()
+ .filter(field -> field.type().getTypeRoot() == DataTypeRoot.BLOB)
+ .map(DataField::name)
+ .collect(Collectors.toCollection(HashSet::new));
+ Set configured = options.blobStoredDescriptorFields();
+ for (String field : configured) {
+ checkArgument(
+ blobFieldNames.contains(field),
+ "Field '%s' in '%s' must be a BLOB field in table schema.",
+ field,
+ CoreOptions.BLOB_STORED_DESCRIPTOR_FIELDS.key());
+ }
+ return configured;
+ }
+
private static void validateIncrementalClustering(TableSchema schema, CoreOptions options) {
if (options.clusteringIncrementalEnabled()) {
checkArgument(
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index b3470cfb4522..8f159cb33587 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -713,6 +713,7 @@ private Pair> createWriter(
true,
false,
null,
+ options.blobStoredDescriptorFields(),
options.dataEvolutionEnabled());
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
index 3045f33b1664..a0b3ec757641 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/BlobTableTest.java
@@ -47,6 +47,7 @@
import javax.annotation.Nonnull;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -55,6 +56,7 @@
import java.util.stream.Collectors;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
/** Tests for table with blob. */
public class BlobTableTest extends TableTestBase {
@@ -233,6 +235,144 @@ public void testRolling() throws Exception {
assertThat(integer.get()).isEqualTo(1025);
}
+ @Test
+ public void testDescriptorStorageModeReuseUpstreamBlobWithoutCopy() throws Exception {
+ createDescriptorTable();
+ FileStoreTable table = getTableDefault();
+
+ // prepare an "upstream blob file" (any file) and write blob bytes into it
+ Path external = new Path(tempPath.resolve("upstream-blob.bin").toString());
+ writeFile(table.fileIO(), external, blobBytes);
+
+ BlobDescriptor descriptor = new BlobDescriptor(external.toString(), 0, blobBytes.length);
+ UriReader uriReader = UriReader.fromFile(table.fileIO());
+ Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(1, BinaryString.fromString("nice"), blobRef)));
+
+ readDefault(
+ row -> {
+ assertThat(row.getBlob(2).toDescriptor()).isEqualTo(descriptor);
+ assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+ });
+
+ long blobFiles = countFilesWithSuffix(table.fileIO(), table.location(), ".blob");
+ assertThat(blobFiles).isEqualTo(0);
+ }
+
+ @Test
+ public void testDescriptorStorageModeRejectsNonDescriptorInput() throws Exception {
+ createDescriptorTable();
+
+ assertThatThrownBy(
+ () ->
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("bad"),
+ new BlobData(blobBytes)))))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("blob.stored-descriptor-fields");
+ }
+
+ @Test
+ public void testMixedBlobStorageModeByFields() throws Exception {
+ createMixedModeTable();
+ FileStoreTable table = getTableDefault();
+
+ byte[] descriptorBytes = randomBytes();
+ Path external = new Path(tempPath.resolve("upstream-mixed-blob.bin").toString());
+ writeFile(table.fileIO(), external, descriptorBytes);
+
+ BlobDescriptor descriptor =
+ new BlobDescriptor(external.toString(), 0, descriptorBytes.length);
+ UriReader uriReader = UriReader.fromFile(table.fileIO());
+ Blob blobRef = Blob.fromDescriptor(uriReader, descriptor);
+
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("mixed"),
+ new BlobData(blobBytes),
+ blobRef)));
+
+ readDefault(
+ row -> {
+ assertThat(row.getBlob(2).toData()).isEqualTo(blobBytes);
+ assertThat(row.getBlob(3).toDescriptor()).isEqualTo(descriptor);
+ assertThat(row.getBlob(3).toData()).isEqualTo(descriptorBytes);
+ });
+
+ long blobFiles = countFilesWithSuffix(table.fileIO(), table.location(), ".blob");
+ assertThat(blobFiles).isEqualTo(1);
+ }
+
+ @Test
+ public void testMixedBlobStorageModeRejectsNonDescriptorInput() throws Exception {
+ createMixedModeTable();
+
+ assertThatThrownBy(
+ () ->
+ writeDataDefault(
+ Collections.singletonList(
+ GenericRow.of(
+ 1,
+ BinaryString.fromString("bad"),
+ new BlobData(blobBytes),
+ new BlobData(randomBytes())))))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("blob.stored-descriptor-fields");
+ }
+
+ private void createDescriptorTable() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.BLOB_STORED_DESCRIPTOR_FIELDS.key(), "f2");
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+ }
+
+ private void createMixedModeTable() throws Exception {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("f0", DataTypes.INT());
+ schemaBuilder.column("f1", DataTypes.STRING());
+ schemaBuilder.column("f2", DataTypes.BLOB());
+ schemaBuilder.column("f3", DataTypes.BLOB());
+ schemaBuilder.option(CoreOptions.TARGET_FILE_SIZE.key(), "25 MB");
+ schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ schemaBuilder.option(CoreOptions.BLOB_STORED_DESCRIPTOR_FIELDS.key(), "f3");
+ catalog.createTable(identifier(), schemaBuilder.build(), true);
+ }
+
+ private static void writeFile(FileIO fileIO, Path path, byte[] bytes) throws IOException {
+ try (org.apache.paimon.fs.PositionOutputStream out = fileIO.newOutputStream(path, true)) {
+ out.write(bytes);
+ }
+ }
+
+ private static long countFilesWithSuffix(FileIO fileIO, Path root, String suffix)
+ throws IOException {
+ long count = 0;
+ org.apache.paimon.fs.RemoteIterator it =
+ fileIO.listFilesIterative(root, true);
+ while (it.hasNext()) {
+ org.apache.paimon.fs.FileStatus status = it.next();
+ if (status.getPath().getName().endsWith(suffix)) {
+ count++;
+ }
+ }
+ return count;
+ }
+
protected Schema schemaDefault() {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("f0", DataTypes.INT());
diff --git a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
index 0afe95eef066..3ada89c777f6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/append/RollingBlobFileWriterTest.java
@@ -43,6 +43,7 @@
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
@@ -106,7 +107,8 @@ public void setUp() throws IOException {
FileSource.APPEND,
false, // asyncFileWrite
false, // statsDenseStore
- null);
+ null,
+ Collections.emptySet());
}
@Test
@@ -206,7 +208,8 @@ public void testBlobTargetFileSize() throws IOException {
FileSource.APPEND,
false, // asyncFileWrite
false, // statsDenseStore
- null);
+ null,
+ Collections.emptySet());
// Create large blob data that will exceed the blob target file size
byte[] largeBlobData = new byte[3 * 1024 * 1024]; // 3 MB blob data
@@ -283,7 +286,8 @@ void testBlobFileNameFormatWithSharedUuid() throws IOException {
FileSource.APPEND,
false, // asyncFileWrite
false, // statsDenseStore
- null);
+ null,
+ Collections.emptySet());
// Create blob data that will trigger rolling
byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
@@ -362,9 +366,10 @@ void testBlobFileNameFormatWithSharedUuidNonDescriptorMode() throws IOException
FileSource.APPEND,
false, // asyncFileWrite
false, // statsDenseStore
- null);
+ null,
+ Collections.emptySet());
- // Create blob data that will trigger rolling (non-descriptor mode: direct blob data)
+ // Create blob data that will trigger rolling
byte[] blobData = new byte[1024 * 1024]; // 1 MB blob data
new Random(789).nextBytes(blobData);
@@ -420,8 +425,8 @@ void testBlobFileNameFormatWithSharedUuidNonDescriptorMode() throws IOException
}
@Test
- void testSequenceNumberIncrementInBlobAsDescriptorMode() throws IOException {
- // Write multiple rows to trigger one-by-one writing in blob-as-descriptor mode
+ void testSequenceNumberIncrementInBlobWritePath() throws IOException {
+ // Write multiple rows and verify sequence-number continuity in blob files
int numRows = 10;
for (int i = 0; i < numRows; i++) {
InternalRow row =
@@ -487,9 +492,8 @@ void testSequenceNumberIncrementInBlobAsDescriptorMode() throws IOException {
}
@Test
- void testSequenceNumberIncrementInNonDescriptorMode() throws IOException {
- // Write multiple rows as a batch to trigger batch writing in non-descriptor mode
- // (blob-as-descriptor=false, which is the default)
+ void testSequenceNumberIncrementInBlobWritePathBatch() throws IOException {
+ // Write multiple rows as a batch and verify sequence-number continuity in blob files
int numRows = 10;
for (int i = 0; i < numRows; i++) {
InternalRow row =
@@ -580,7 +584,8 @@ void testBlobStatsSchemaWithCustomColumnName() throws IOException {
FileSource.APPEND,
false, // asyncFileWrite
false, // statsDenseStore
- null);
+ null,
+ Collections.emptySet());
// Write data
for (int i = 0; i < 3; i++) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index 1edf91e1d22d..1f1b0992e968 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -292,6 +292,7 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
true,
false,
null,
+ options.blobStoredDescriptorFields(),
options.dataEvolutionEnabled());
appendOnlyWriter.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(), options.pageSize()));
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
index 3201f594fe59..ad2132e8c1eb 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkRowWrapper.java
@@ -47,19 +47,14 @@
public class FlinkRowWrapper implements InternalRow {
private final org.apache.flink.table.data.RowData row;
- private final boolean blobAsDescriptor;
private final UriReaderFactory uriReaderFactory;
public FlinkRowWrapper(org.apache.flink.table.data.RowData row) {
- this(row, false, null);
+ this(row, null);
}
- public FlinkRowWrapper(
- org.apache.flink.table.data.RowData row,
- boolean blobAsDescriptor,
- CatalogContext catalogContext) {
+ public FlinkRowWrapper(org.apache.flink.table.data.RowData row, CatalogContext catalogContext) {
this.row = row;
- this.blobAsDescriptor = blobAsDescriptor;
this.uriReaderFactory = new UriReaderFactory(catalogContext);
}
@@ -147,12 +142,14 @@ public Variant getVariant(int pos) {
@Override
public Blob getBlob(int pos) {
- if (blobAsDescriptor) {
- BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(row.getBinary(pos));
+ byte[] bytes = row.getBinary(pos);
+ boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
+ if (blobDes) {
+ BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri());
return Blob.fromDescriptor(uriReader, blobDescriptor);
} else {
- return new BlobData(row.getBinary(pos));
+ return new BlobData(bytes);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index a58e8a393da4..7ed5e5d09503 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -346,13 +346,11 @@ protected boolean buildForPostponeBucketCompaction(
partitionSpec,
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
- boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
DataStream partitioned =
FlinkStreamPartitioner.partition(
FlinkSinkBuilder.mapToInternalRow(
sourcePair.getLeft(),
realTable.rowType(),
- blobAsDescriptor,
table.catalogEnvironment().catalogContext()),
new RowDataChannelComputer(realTable.schema()),
null);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
index d1e3d11e6225..ee3b68f609a2 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/IncrementalClusterCompact.java
@@ -157,13 +157,11 @@ protected List> buildCompactOperator(
// 2.3 write and then reorganize the committable
// set parallelism to null, and it'll forward parallelism when doWrite()
RowAppendTableSink sink = new RowAppendTableSink(table, null, null);
- boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
DataStream written =
sink.doWrite(
FlinkSinkBuilder.mapToInternalRow(
sorted,
table.rowType(),
- blobAsDescriptor,
table.catalogEnvironment().catalogContext()),
commitUser,
null);
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index 0a4dc174539f..87d12924533a 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -207,15 +207,13 @@ public FlinkSinkBuilder clusteringIfPossible(
public DataStreamSink> build() {
setParallelismIfAdaptiveConflict();
input = trySortInput(input);
- boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
CatalogContext contextForDescriptor =
BlobDescriptorUtils.getCatalogContext(
table.catalogEnvironment().catalogContext(),
table.coreOptions().toConfiguration());
DataStream input =
- mapToInternalRow(
- this.input, table.rowType(), blobAsDescriptor, contextForDescriptor);
+ mapToInternalRow(this.input, table.rowType(), contextForDescriptor);
if (table.coreOptions().localMergeEnabled() && table.schema().primaryKeys().size() > 0) {
SingleOutputStreamOperator newInput =
input.forward()
@@ -247,14 +245,11 @@ public DataStreamSink> build() {
public static DataStream mapToInternalRow(
DataStream input,
org.apache.paimon.types.RowType rowType,
- boolean blobAsDescriptor,
CatalogContext catalogContext) {
SingleOutputStreamOperator result =
input.map(
(MapFunction)
- r ->
- new FlinkRowWrapper(
- r, blobAsDescriptor, catalogContext))
+ r -> new FlinkRowWrapper(r, catalogContext))
.returns(
org.apache.paimon.flink.utils.InternalTypeInfo.fromRowType(
rowType));
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
index a06ca9948c44..6e09dce05033 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroBulkFormat.java
@@ -28,6 +28,7 @@
import org.apache.paimon.utils.IteratorResultIterator;
import org.apache.paimon.utils.IteratorWithException;
import org.apache.paimon.utils.Pool;
+import org.apache.paimon.utils.UriReader;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.file.DataFileReader;
@@ -77,7 +78,9 @@ private AvroReader(FileIO fileIO, Path path, long fileSize) throws IOException {
private DataFileReader createReaderFromPath(Path path, long fileSize)
throws IOException {
- DatumReader datumReader = new AvroRowDatumReader(projectedRowType);
+ UriReader uriReader = UriReader.fromFile(fileIO);
+ DatumReader datumReader =
+ new AvroRowDatumReader(projectedRowType, uriReader);
SeekableInput in =
new SeekableInputStreamWrapper(fileIO.newInputStream(path), fileSize);
try {
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java
index 23da782471b0..c62777bf800e 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumReader.java
@@ -21,6 +21,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.avro.FieldReaderFactory.RowReader;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.UriReader;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
@@ -32,12 +33,18 @@
public class AvroRowDatumReader implements DatumReader {
private final RowType projectedRowType;
+ private final UriReader uriReader;
private RowReader reader;
private boolean isUnion;
public AvroRowDatumReader(RowType projectedRowType) {
+ this(projectedRowType, null);
+ }
+
+ public AvroRowDatumReader(RowType projectedRowType, UriReader uriReader) {
this.projectedRowType = projectedRowType;
+ this.uriReader = uriReader;
}
@Override
@@ -48,7 +55,8 @@ public void setSchema(Schema schema) {
schema = schema.getTypes().get(1);
}
this.reader =
- new FieldReaderFactory().createRowReader(schema, projectedRowType.getFields());
+ new FieldReaderFactory(uriReader)
+ .createRowReader(schema, projectedRowType.getFields());
}
@Override
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
index 5497751fcb93..e586a4ad8617 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroSchemaConverter.java
@@ -104,6 +104,7 @@ public static Schema convertToSchema(
return nullable ? nullableSchema(str) : str;
case BINARY:
case VARBINARY:
+ case BLOB:
Schema binary = SchemaBuilder.builder().bytesType();
return nullable ? nullableSchema(binary) : binary;
case TIMESTAMP_WITHOUT_TIME_ZONE:
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
index d95ef309219b..02d77b445dea 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldReaderFactory.java
@@ -19,6 +19,8 @@
package org.apache.paimon.format.avro;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
@@ -29,6 +31,7 @@
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.UriReader;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
@@ -51,6 +54,16 @@
/** Factory to create {@link FieldReader}. */
public class FieldReaderFactory implements AvroSchemaVisitor {
+ @Nullable private final UriReader uriReader;
+
+ public FieldReaderFactory() {
+ this(null);
+ }
+
+ public FieldReaderFactory(@Nullable UriReader uriReader) {
+ this.uriReader = uriReader;
+ }
+
private static final FieldReader STRING_READER = new StringReader();
private static final FieldReader BYTES_READER = new BytesReader();
@@ -73,6 +86,16 @@ public class FieldReaderFactory implements AvroSchemaVisitor {
private static final FieldReader TIMESTAMP_MICROS_READER = new TimestampMicrosReader();
+ @Override
+ public FieldReader primitive(Schema primitive, DataType type) {
+ if (primitive.getType() == Schema.Type.BYTES
+ && type != null
+ && type.getTypeRoot() == DataTypeRoot.BLOB) {
+ return new BlobDescriptorBytesReader(uriReader);
+ }
+ return AvroSchemaVisitor.super.primitive(primitive, type);
+ }
+
@Override
public FieldReader visitUnion(Schema schema, @Nullable DataType type) {
return new NullableReader(visit(schema.getTypes().get(1), type));
@@ -230,6 +253,31 @@ public void skip(Decoder decoder) throws IOException {
}
}
+ private static class BlobDescriptorBytesReader implements FieldReader {
+
+ private final UriReader uriReader;
+
+ private BlobDescriptorBytesReader(UriReader uriReader) {
+ if (uriReader == null) {
+ throw new IllegalArgumentException(
+ "UriReader must not be null for BlobDescriptorBytesReader.");
+ }
+ this.uriReader = uriReader;
+ }
+
+ @Override
+ public Object read(Decoder decoder, Object reuse) throws IOException {
+ byte[] bytes = decoder.readBytes(null).array();
+ BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
+ return Blob.fromDescriptor(uriReader, blobDescriptor);
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ decoder.skipBytes();
+ }
+ }
+
private static class BooleanReader implements FieldReader {
@Override
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
index 68d0ac7ad135..521601706b69 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/FieldWriterFactory.java
@@ -19,6 +19,8 @@
package org.apache.paimon.format.avro;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.DataGetters;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
@@ -27,6 +29,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.RowType;
import org.apache.avro.AvroRuntimeException;
@@ -70,6 +73,35 @@ public class FieldWriterFactory implements AvroSchemaVisitor {
private static final FieldWriter DOUBLE_WRITER =
(container, i, encoder) -> encoder.writeDouble(container.getDouble(i));
+ private static final FieldWriter BLOB_DESCRIPTOR_BYTES_WRITER =
+ (container, i, encoder) -> {
+ Blob blob = container.getBlob(i);
+ if (blob == null) {
+ // Nullable handling is done by NullableWriter for UNION schemas.
+ // For required bytes, writing null is a bug.
+ throw new IllegalArgumentException("Null blob is not allowed.");
+ }
+ try {
+ BlobDescriptor descriptor = blob.toDescriptor();
+ encoder.writeBytes(descriptor.serialize());
+ } catch (Throwable t) {
+ throw new IllegalArgumentException(
+ "blob.stored-descriptor-fields requires blob field value to be a "
+ + "serialized BlobDescriptor (magic 'BLOBDESC').",
+ t);
+ }
+ };
+
+ @Override
+ public FieldWriter primitive(Schema primitive, DataType type) {
+ if (primitive.getType() == Schema.Type.BYTES
+ && type != null
+ && type.getTypeRoot() == DataTypeRoot.BLOB) {
+ return BLOB_DESCRIPTOR_BYTES_WRITER;
+ }
+ return AvroSchemaVisitor.super.primitive(primitive, type);
+ }
+
@Override
public FieldWriter visitUnion(Schema schema, DataType type) {
return new NullableWriter(visit(schema.getTypes().get(1), type));
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index a94eaf2356d8..93c29f67b54e 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -38,6 +38,7 @@
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Pool;
import org.apache.paimon.utils.RoaringBitmap32;
+import org.apache.paimon.utils.UriReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
@@ -101,7 +102,9 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context)
context instanceof OrcFormatReaderContext
? ((OrcFormatReaderContext) context).poolSize()
: 1;
- Pool poolOfBatches = createPoolOfBatches(context.filePath(), poolSize);
+ UriReader uriReader = UriReader.fromFile(context.fileIO());
+ Pool poolOfBatches =
+ createPoolOfBatches(context.filePath(), poolSize, uriReader);
RecordReader orcReader =
createRecordReader(
@@ -123,7 +126,10 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context)
* conversion from the ORC representation to the result format.
*/
public OrcReaderBatch createReaderBatch(
- Path filePath, VectorizedRowBatch orcBatch, Pool.Recycler recycler) {
+ Path filePath,
+ VectorizedRowBatch orcBatch,
+ Pool.Recycler recycler,
+ UriReader uriReader) {
List tableFieldNames = tableType.getFieldNames();
List tableFieldTypes = tableType.getFieldTypes();
@@ -139,17 +145,20 @@ public OrcReaderBatch createReaderBatch(
type,
legacyTimestampLtzType);
}
- return new OrcReaderBatch(filePath, orcBatch, new VectorizedColumnBatch(vectors), recycler);
+ return new OrcReaderBatch(
+ filePath, orcBatch, new VectorizedColumnBatch(vectors), recycler, uriReader);
}
// ------------------------------------------------------------------------
- private Pool createPoolOfBatches(Path filePath, int numBatches) {
+ private Pool createPoolOfBatches(
+ Path filePath, int numBatches, UriReader uriReader) {
final Pool pool = new Pool<>(numBatches);
for (int i = 0; i < numBatches; i++) {
final VectorizedRowBatch orcBatch = createBatchWrapper(schema, batchSize / numBatches);
- final OrcReaderBatch batch = createReaderBatch(filePath, orcBatch, pool.recycler());
+ final OrcReaderBatch batch =
+ createReaderBatch(filePath, orcBatch, pool.recycler(), uriReader);
pool.add(batch);
}
@@ -170,13 +179,14 @@ protected OrcReaderBatch(
final Path filePath,
final VectorizedRowBatch orcVectorizedRowBatch,
final VectorizedColumnBatch paimonColumnBatch,
- final Pool.Recycler recycler) {
+ final Pool.Recycler recycler,
+ final UriReader uriReader) {
this.orcVectorizedRowBatch = checkNotNull(orcVectorizedRowBatch);
this.recycler = checkNotNull(recycler);
this.paimonColumnBatch = paimonColumnBatch;
this.result =
new VectorizedRowIterator(
- filePath, new ColumnarRow(paimonColumnBatch), this::recycle);
+ filePath, new ColumnarRow(paimonColumnBatch, uriReader), this::recycle);
}
/**
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java
index f7d3d626d44f..4b80827d1bb3 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcTypeUtil.java
@@ -67,6 +67,9 @@ static TypeDescription convertToOrcType(DataType type, int fieldId, int depth) {
case BOOLEAN:
return TypeDescription.createBoolean()
.setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
+ case BLOB:
+ return TypeDescription.createBinary()
+ .setAttribute(PAIMON_ORC_FIELD_ID_KEY, String.valueOf(fieldId));
case VARBINARY:
if (type.equals(DataTypes.BYTES())) {
return TypeDescription.createBinary()
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
index 762037deb215..9022735e2cac 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/writer/FieldWriterFactory.java
@@ -18,6 +18,8 @@
package org.apache.paimon.format.orc.writer;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
@@ -245,7 +247,21 @@ public FieldWriter visit(VariantType variantType) {
@Override
public FieldWriter visit(BlobType blobType) {
- throw new UnsupportedOperationException("Unsupported type: " + blobType);
+ return (rowId, column, getters, columnId) -> {
+ BytesColumnVector vector = (BytesColumnVector) column;
+ Blob blob = getters.getBlob(columnId);
+ try {
+ BlobDescriptor descriptor = blob.toDescriptor();
+ byte[] bytes = descriptor.serialize();
+ vector.setVal(rowId, bytes, 0, bytes.length);
+ return bytes.length;
+ } catch (Throwable t) {
+ throw new IllegalArgumentException(
+ "blob.stored-descriptor-fields requires blob field value to be a "
+ + "serialized BlobDescriptor (magic 'BLOBDESC').",
+ t);
+ }
+ };
}
@Override
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 29f22e1de66b..eabdc2ac0da2 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -36,6 +36,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.UriReader;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.filter2.compat.FilterCompat;
@@ -118,8 +119,15 @@ public FileRecordReader createReader(FormatReaderFactory.Context co
MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(requestedSchema);
List fields = buildFieldsList(readFields, columnIO, requestedSchema);
+ UriReader uriReader = UriReader.fromFile(context.fileIO());
return new VectorizedParquetRecordReader(
- context.filePath(), reader, fileSchema, fields, writableVectors, batchSize);
+ context.filePath(),
+ reader,
+ fileSchema,
+ fields,
+ writableVectors,
+ batchSize,
+ uriReader);
}
/** Clips `parquetSchema` according to `fieldNames`. */
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
index f5d93bec42bd..37a69fe9aebd 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
@@ -90,6 +90,7 @@ public static Type convertToParquetType(String name, DataType type, int fieldId,
.withId(fieldId);
case BINARY:
case VARBINARY:
+ case BLOB:
return Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.named(name)
.withId(fieldId);
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ColumnarBatch.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ColumnarBatch.java
index 30a16aba822d..891f59c5e539 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ColumnarBatch.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ColumnarBatch.java
@@ -28,6 +28,7 @@
import org.apache.paimon.data.columnar.VectorizedRowIterator;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.LongIterator;
+import org.apache.paimon.utils.UriReader;
import java.util.Arrays;
@@ -38,7 +39,7 @@ public class ColumnarBatch {
protected final VectorizedColumnBatch vectorizedColumnBatch;
protected final ColumnarRowIterator vectorizedRowIterator;
- public ColumnarBatch(Path filePath, ColumnVector[] columns) {
+ public ColumnarBatch(Path filePath, ColumnVector[] columns, UriReader uriReader) {
this.columns = columns;
this.vectorizedColumnBatch = new VectorizedColumnBatch(columns);
boolean containsNestedColumn =
@@ -48,7 +49,7 @@ public ColumnarBatch(Path filePath, ColumnVector[] columns) {
vector instanceof MapColumnVector
|| vector instanceof RowColumnVector
|| vector instanceof ArrayColumnVector);
- ColumnarRow row = new ColumnarRow(vectorizedColumnBatch);
+ ColumnarRow row = new ColumnarRow(vectorizedColumnBatch, uriReader);
this.vectorizedRowIterator =
containsNestedColumn
? new ColumnarRowIterator(filePath, row, null)
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
index e7d3ad04abeb..a7890953adf6 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetReaderUtil.java
@@ -100,6 +100,7 @@ public static WritableColumnVector createWritableColumnVector(
case CHAR:
case VARCHAR:
case VARBINARY:
+ case BLOB:
return new HeapBytesVector(batchSize);
case BINARY:
return new HeapBytesVector(batchSize);
@@ -176,6 +177,9 @@ public static ColumnVector createReadableColumnVector(
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return new ParquetTimestampVector(writableVector);
+ case BLOB:
+ // Physical representation is bytes; higher-level Row#getBlob() handles descriptor.
+ return writableVector;
case ARRAY:
return new CastedArrayColumnVector(
(HeapArrayVector) writableVector,
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
index 880af82af181..12b4849a2520 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
@@ -219,7 +219,15 @@ public UpdaterFactory visit(VariantType variantType) {
@Override
public UpdaterFactory visit(BlobType blobType) {
- throw new RuntimeException("Blob type is not supported");
+ // Physical representation is bytes (same as VARBINARY); higher-level Row#getBlob()
+ // interprets serialized BlobDescriptor when needed.
+ return c -> {
+ if (c.getPrimitiveType().getPrimitiveTypeName()
+ == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
+ return new FixedLenByteArrayUpdater(c.getPrimitiveType().getTypeLength());
+ }
+ return new BinaryUpdater();
+ };
}
@Override
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
index d8fd82056e67..c9b8544edc1c 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedParquetRecordReader.java
@@ -25,6 +25,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.utils.UriReader;
import org.apache.parquet.VersionParser;
import org.apache.parquet.column.ColumnDescriptor;
@@ -75,6 +76,7 @@ public class VectorizedParquetRecordReader implements FileRecordReader fields;
private final RowIndexGenerator rowIndexGenerator;
@@ -88,7 +90,8 @@ public VectorizedParquetRecordReader(
MessageType fileSchema,
List fields,
WritableColumnVector[] vectors,
- int batchSize)
+ int batchSize,
+ UriReader uriReader)
throws IOException {
this.filePath = filePath;
this.reader = reader;
@@ -96,6 +99,7 @@ public VectorizedParquetRecordReader(
this.fields = fields;
this.totalRowCount = reader.getFilteredRecordCount();
this.batchSize = batchSize;
+ this.uriReader = uriReader;
this.rowIndexGenerator = new RowIndexGenerator();
// fetch writer version from file metadata
@@ -120,7 +124,8 @@ private void initBatch(WritableColumnVector[] vectors) {
fields.stream()
.map(ParquetField::getType)
.collect(Collectors.toList()),
- vectors));
+ vectors),
+ uriReader);
columnVectors = new ParquetColumnVector[fields.size()];
for (int i = 0; i < columnVectors.length; i++) {
columnVectors[i] =
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
index 3629bb852b94..464516aed092 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetRowDataWriter.java
@@ -20,6 +20,8 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.Blob;
+import org.apache.paimon.data.BlobDescriptor;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
@@ -105,6 +107,8 @@ private FieldWriter createWriter(DataType t, Type type) {
case BINARY:
case VARBINARY:
return new BinaryWriter();
+ case BLOB:
+ return new BlobDescriptorWriter();
case DECIMAL:
DecimalType decimalType = (DecimalType) t;
return createDecimalWriter(decimalType.getPrecision(), decimalType.getScale());
@@ -313,6 +317,33 @@ private void writeBinary(byte[] value) {
}
}
+ /** Writes BLOB as serialized {@link BlobDescriptor} bytes for descriptor-stored fields. */
+ private class BlobDescriptorWriter implements FieldWriter {
+
+ @Override
+ public void write(InternalRow row, int ordinal) {
+ writeBlob(row.getBlob(ordinal));
+ }
+
+ @Override
+ public void write(InternalArray arrayData, int ordinal) {
+ // Currently we don't support BLOB inside arrays/maps.
+ throw new UnsupportedOperationException("BLOB in array is not supported.");
+ }
+
+ private void writeBlob(Blob blob) {
+ try {
+ BlobDescriptor descriptor = blob.toDescriptor();
+ recordConsumer.addBinary(Binary.fromReusedByteArray(descriptor.serialize()));
+ } catch (Throwable t) {
+ throw new IllegalArgumentException(
+ "blob.stored-descriptor-fields requires blob field value to be a "
+ + "serialized BlobDescriptor (magic 'BLOBDESC').",
+ t);
+ }
+ }
+ }
+
private class IntWriter implements FieldWriter {
@Override
diff --git a/paimon-python/pypaimon/common/file_io.py b/paimon-python/pypaimon/common/file_io.py
index c47f82c50370..6a61fc6e1a0f 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -218,7 +218,12 @@ def write_lance(self, path: str, data, **kwargs):
raise NotImplementedError("write_lance must be implemented by FileIO subclasses")
def write_blob(self, path: str, data, blob_as_descriptor: bool, **kwargs):
- """Write Blob format file. Must be implemented by subclasses."""
+ """
+ Write Blob format file.
+
+ NOTE: blob_as_descriptor is kept only for compatibility with existing callers.
+ Blob write behavior is adaptive and does not depend on this flag.
+ """
raise NotImplementedError("write_blob must be implemented by FileIO subclasses")
def close(self):
diff --git a/paimon-python/pypaimon/common/options/core_options.py b/paimon-python/pypaimon/common/options/core_options.py
index c83a75c65585..aa082f540412 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -177,7 +177,17 @@ class CoreOptions:
ConfigOptions.key("blob-as-descriptor")
.boolean_type()
.default_value(False)
- .with_description("Whether to use blob as descriptor.")
+ .with_description("Whether to return blob values as serialized BlobDescriptor bytes when reading.")
+ )
+
+ BLOB_STORED_DESCRIPTOR_FIELDS: ConfigOption[str] = (
+ ConfigOptions.key("blob.stored-descriptor-fields")
+ .string_type()
+ .no_default_value()
+ .with_description(
+ "Comma-separated BLOB field names that should be stored as serialized BlobDescriptor bytes "
+ "inline in normal data files."
+ )
)
TARGET_FILE_SIZE: ConfigOption[MemorySize] = (
@@ -484,6 +494,16 @@ def metadata_stats_enabled(self, default=None):
def blob_as_descriptor(self, default=None):
return self.options.get(CoreOptions.BLOB_AS_DESCRIPTOR, default)
+ def blob_stored_descriptor_fields(self, default=None):
+ value = self.options.get(CoreOptions.BLOB_STORED_DESCRIPTOR_FIELDS, default)
+ if value is None:
+ return set()
+ if isinstance(value, str):
+ return {field.strip() for field in value.split(",") if field.strip()}
+ if isinstance(value, (list, set, tuple)):
+ return {str(field).strip() for field in value if str(field).strip()}
+ return set()
+
def target_file_size(self, has_primary_key, default=None):
return self.options.get(CoreOptions.TARGET_FILE_SIZE,
MemorySize.of_mebi_bytes(
diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py b/paimon-python/pypaimon/filesystem/local_file_io.py
index 91fc1c906980..0f0686375ac6 100644
--- a/paimon-python/pypaimon/filesystem/local_file_io.py
+++ b/paimon-python/pypaimon/filesystem/local_file_io.py
@@ -396,6 +396,8 @@ def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs):
try:
+ # Kept for API compatibility. Write behavior is adaptive and does not depend on this flag.
+ _ = blob_as_descriptor
if data.num_columns != 1:
raise RuntimeError(f"Blob format only supports a single column, got {data.num_columns} columns")
@@ -424,18 +426,24 @@ def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, *
for i in range(num_rows):
col_data = records_dict[field_name][i]
if hasattr(fields[0].type, 'type') and fields[0].type.type == "BLOB":
- if blob_as_descriptor:
- blob_descriptor = BlobDescriptor.deserialize(col_data)
- uri_reader = self.uri_reader_factory.create(blob_descriptor.uri)
- blob_data = Blob.from_descriptor(uri_reader, blob_descriptor)
- elif isinstance(col_data, bytes):
- blob_data = BlobData(col_data)
+ if hasattr(col_data, 'as_py'):
+ col_data = col_data.as_py()
+ if isinstance(col_data, str):
+ col_data = col_data.encode('utf-8')
+ if isinstance(col_data, bytearray):
+ col_data = bytes(col_data)
+
+ if isinstance(col_data, bytes):
+ if BlobDescriptor.is_blob_descriptor(col_data):
+ descriptor = BlobDescriptor.deserialize(col_data)
+ uri_reader = self.uri_reader_factory.create(descriptor.uri)
+ blob_data = Blob.from_descriptor(uri_reader, descriptor)
+ else:
+ blob_data = BlobData(col_data)
else:
- if hasattr(col_data, 'as_py'):
- col_data = col_data.as_py()
- if isinstance(col_data, str):
- col_data = col_data.encode('utf-8')
- blob_data = BlobData(col_data)
+ raise RuntimeError(
+ "Blob field value must be bytes/blob or serialized BlobDescriptor bytes."
+ )
row_values = [blob_data]
else:
row_values = [col_data]
diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
index cb689baeaf1e..5bed00ea3e74 100644
--- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
+++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py
@@ -447,6 +447,8 @@ def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs):
try:
+ # Kept for API compatibility. Write behavior is adaptive and does not depend on this flag.
+ _ = blob_as_descriptor
if data.num_columns != 1:
raise RuntimeError(f"Blob format only supports a single column, got {data.num_columns} columns")
column = data.column(0)
@@ -466,18 +468,24 @@ def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, *
for i in range(num_rows):
col_data = records_dict[field_name][i]
if hasattr(fields[0].type, 'type') and fields[0].type.type == "BLOB":
- if blob_as_descriptor:
- blob_descriptor = BlobDescriptor.deserialize(col_data)
- uri_reader = self.uri_reader_factory.create(blob_descriptor.uri)
- blob_data = Blob.from_descriptor(uri_reader, blob_descriptor)
- elif isinstance(col_data, bytes):
- blob_data = BlobData(col_data)
+ if hasattr(col_data, 'as_py'):
+ col_data = col_data.as_py()
+ if isinstance(col_data, str):
+ col_data = col_data.encode('utf-8')
+ if isinstance(col_data, bytearray):
+ col_data = bytes(col_data)
+
+ if isinstance(col_data, bytes):
+ if BlobDescriptor.is_blob_descriptor(col_data):
+ descriptor = BlobDescriptor.deserialize(col_data)
+ uri_reader = self.uri_reader_factory.create(descriptor.uri)
+ blob_data = Blob.from_descriptor(uri_reader, descriptor)
+ else:
+ blob_data = BlobData(col_data)
else:
- if hasattr(col_data, 'as_py'):
- col_data = col_data.as_py()
- if isinstance(col_data, str):
- col_data = col_data.encode('utf-8')
- blob_data = BlobData(col_data)
+ raise RuntimeError(
+ "Blob field value must be bytes/blob or serialized BlobDescriptor bytes."
+ )
row_values = [blob_data]
else:
row_values = [col_data]
diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index c9e51785a056..8ab9229e2ac6 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -21,10 +21,12 @@
import pyarrow as pa
from pyarrow import RecordBatch
+from pypaimon.common.file_io import FileIO
from pypaimon.read.partition_info import PartitionInfo
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
+from pypaimon.table.row.blob import Blob, BlobDescriptor
from pypaimon.table.special_fields import SpecialFields
@@ -38,7 +40,10 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p
max_sequence_number: int,
first_row_id: int,
row_tracking_enabled: bool,
- system_fields: dict):
+ system_fields: dict,
+ blob_as_descriptor: bool = False,
+ blob_stored_descriptor_fields: Optional[set] = None,
+ file_io: Optional[FileIO] = None):
self.format_reader = format_reader
self.index_mapping = index_mapping
self.partition_info = partition_info
@@ -48,6 +53,19 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p
self.first_row_id = first_row_id
self.max_sequence_number = max_sequence_number
self.system_fields = system_fields
+ self.blob_as_descriptor = blob_as_descriptor
+ self.blob_stored_descriptor_fields = blob_stored_descriptor_fields or set()
+ self.file_io = file_io
+ self.blob_field_names = {
+ field.name
+ for field in fields
+ if hasattr(field.type, 'type') and field.type.type == 'BLOB'
+ }
+ self.descriptor_blob_fields = {
+ field_name
+ for field_name in self.blob_stored_descriptor_fields
+ if field_name in self.blob_field_names
+ }
def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]:
if isinstance(self.format_reader, FormatBlobReader):
@@ -117,8 +135,73 @@ def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch
if self.row_tracking_enabled and self.system_fields:
record_batch = self._assign_row_tracking(record_batch)
+ record_batch = self._convert_descriptor_stored_blob_columns(record_batch)
+
return record_batch
+ def _convert_descriptor_stored_blob_columns(self, record_batch: RecordBatch) -> RecordBatch:
+ if isinstance(self.format_reader, FormatBlobReader):
+ return record_batch
+ if not self.descriptor_blob_fields:
+ return record_batch
+
+ schema_names = set(record_batch.schema.names)
+ target_fields = [f for f in self.descriptor_blob_fields if f in schema_names]
+ if not target_fields:
+ return record_batch
+
+ arrays = list(record_batch.columns)
+ for field_name in target_fields:
+ field_idx = record_batch.schema.get_field_index(field_name)
+ values = record_batch.column(field_idx).to_pylist()
+
+ if self.blob_as_descriptor:
+ converted = [self._normalize_blob_cell(v) for v in values]
+ else:
+ converted = [self._blob_cell_to_data(v) for v in values]
+ arrays[field_idx] = pa.array(converted, type=pa.large_binary())
+
+ return pa.RecordBatch.from_arrays(arrays, schema=record_batch.schema)
+
+ @staticmethod
+ def _normalize_blob_cell(value):
+ if value is None:
+ return None
+ if hasattr(value, 'as_py'):
+ value = value.as_py()
+ if isinstance(value, str):
+ value = value.encode('utf-8')
+ if isinstance(value, bytearray):
+ value = bytes(value)
+ return value
+
+ def _blob_cell_to_data(self, value):
+ value = self._normalize_blob_cell(value)
+ if value is None:
+ return None
+
+ if not isinstance(value, bytes):
+ return value
+
+ descriptor = self._deserialize_descriptor_or_none(value)
+ if descriptor is None:
+ return value
+
+ try:
+ uri_reader = self.file_io.uri_reader_factory.create(descriptor.uri)
+ blob = Blob.from_descriptor(uri_reader, descriptor)
+ return blob.to_data()
+ except Exception as e:
+ raise RuntimeError(
+ "Failed to read blob bytes from descriptor URI while converting blob value."
+ ) from e
+
+ @staticmethod
+ def _deserialize_descriptor_or_none(raw: bytes):
+ if not BlobDescriptor.is_blob_descriptor(raw):
+ return None
+ return BlobDescriptor.deserialize(raw)
+
def _assign_row_tracking(self, record_batch: RecordBatch) -> RecordBatch:
"""Assign row tracking meta fields (_ROW_ID and _SEQUENCE_NUMBER)."""
arrays = list(record_batch.columns)
diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py
index 9b218aa0b4ae..350ca509ef1f 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -133,6 +133,9 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
else:
raise ValueError(f"Unexpected file format: {file_format}")
+ blob_as_descriptor = CoreOptions.blob_as_descriptor(self.table.options)
+ blob_stored_descriptor_fields = CoreOptions.blob_stored_descriptor_fields(self.table.options)
+
index_mapping = self.create_index_mapping()
partition_info = self._create_partition_info()
system_fields = SpecialFields.find_system_fields(self.read_fields)
@@ -150,7 +153,10 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
file.max_sequence_number,
file.first_row_id,
row_tracking_enabled,
- system_fields)
+ system_fields,
+ blob_as_descriptor=blob_as_descriptor,
+ blob_stored_descriptor_fields=blob_stored_descriptor_fields,
+ file_io=self.table.file_io)
else:
return DataFileBatchReader(
format_reader,
@@ -161,7 +167,10 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
file.max_sequence_number,
file.first_row_id,
row_tracking_enabled,
- system_fields)
+ system_fields,
+ blob_as_descriptor=blob_as_descriptor,
+ blob_stored_descriptor_fields=blob_stored_descriptor_fields,
+ file_io=self.table.file_io)
def _get_fields_and_predicate(self, schema_id: int, read_fields):
key = (schema_id, tuple(read_fields))
diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py
index e0a442f30582..78c229e9be4d 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -20,6 +20,7 @@
import pandas
import pyarrow
+from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.read.split import Split
@@ -87,7 +88,56 @@ def to_arrow(self, splits: List[Split]) -> Optional[pyarrow.Table]:
if not table_list:
return pyarrow.Table.from_arrays([pyarrow.array([], type=field.type) for field in schema], schema=schema)
else:
- return pyarrow.Table.from_batches(table_list)
+ table = pyarrow.Table.from_batches(table_list)
+ return self._convert_descriptor_stored_fields_for_read(table)
+
+ def _convert_descriptor_stored_fields_for_read(self, table: pyarrow.Table) -> pyarrow.Table:
+ if CoreOptions.blob_as_descriptor(self.table.options):
+ return table
+
+ descriptor_fields = CoreOptions.blob_stored_descriptor_fields(self.table.options)
+ if not descriptor_fields:
+ return table
+
+ from pypaimon.table.row.blob import Blob, BlobDescriptor
+
+ result = table
+ for field_name in descriptor_fields:
+ if field_name not in result.column_names:
+ continue
+ values = result.column(field_name).to_pylist()
+ converted_values = []
+ for value in values:
+ if value is None:
+ converted_values.append(None)
+ continue
+ if hasattr(value, 'as_py'):
+ value = value.as_py()
+ if isinstance(value, str):
+ value = value.encode('utf-8')
+ if isinstance(value, bytearray):
+ value = bytes(value)
+ if not isinstance(value, bytes):
+ converted_values.append(value)
+ continue
+
+ try:
+ descriptor = BlobDescriptor.deserialize(value)
+ if descriptor.serialize() != value:
+ converted_values.append(value)
+ continue
+ uri_reader = self.table.file_io.uri_reader_factory.create(descriptor.uri)
+ converted_values.append(Blob.from_descriptor(uri_reader, descriptor).to_data())
+ except Exception:
+ converted_values.append(value)
+
+ column_idx = result.column_names.index(field_name)
+ result = result.set_column(
+ column_idx,
+ pyarrow.field(field_name, pyarrow.large_binary(), nullable=True),
+ pyarrow.array(converted_values, type=pyarrow.large_binary()),
+ )
+ return result
def _arrow_batch_generator(self, splits: List[Split], schema: pyarrow.Schema) -> Iterator[pyarrow.RecordBatch]:
chunk_size = 65536
diff --git a/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
index 2770fad136a9..8d48642d597d 100644
--- a/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
+++ b/paimon-python/pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
"""
-Sample demonstrating how to use blob-as-descriptor mode with REST catalog.
+Sample demonstrating descriptor-stored blob fields with REST catalog.
"""
from pypaimon import CatalogFactory
import pyarrow as pa
@@ -50,9 +50,9 @@ def write_table_with_blob(catalog, video_file_path: str, external_oss_options: d
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-field': 'video',
- 'blob-as-descriptor': 'true'
+ 'blob.stored-descriptor-fields': 'video'
},
- comment='Table with blob column using blob-as-descriptor mode')
+ comment='Table with blob column using descriptor-stored field')
table_identifier = f'{database_name}.{table_name}'
catalog.create_table(
diff --git a/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py b/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py
index fafe31b2f393..280322aabb73 100644
--- a/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py
+++ b/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py
@@ -218,7 +218,7 @@ def double_value(row):
# print("Step 7: Blob data handling (Optional)")
# print("="*60)
# print("Paimon supports storing blob data (images, audio, etc.)")
- # print("For blob data examples, see: pypaimon/sample/oss_blob_as_descriptor.py")
+ # print("For blob data examples, see: pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py")
print("\n" + "="*60)
print("Summary")
@@ -232,7 +232,7 @@ def double_value(row):
print(" - Scalable data processing with Ray operations")
print(" - Integration with Ray ecosystem (Ray Train, Ray Serve, etc.)")
print("\nFor blob data (images, audio) examples, see:")
- print(" - pypaimon/sample/oss_blob_as_descriptor.py")
+ print(" - pypaimon/sample/rest_catalog_blob_as_descriptor_sample.py")
finally:
server.shutdown()
diff --git a/paimon-python/pypaimon/table/row/blob.py b/paimon-python/pypaimon/table/row/blob.py
index a9a2e2fd490b..4faebd608dc0 100644
--- a/paimon-python/pypaimon/table/row/blob.py
+++ b/paimon-python/pypaimon/table/row/blob.py
@@ -17,6 +17,7 @@
################################################################################
import io
+import struct
from abc import ABC, abstractmethod
from typing import Optional, Union
from urllib.parse import urlparse
@@ -25,7 +26,8 @@
class BlobDescriptor:
- CURRENT_VERSION = 1
+ CURRENT_VERSION = 2
+ MAGIC = 0x424C4F4244455343 # "BLOBDESC"
def __init__(self, uri: str, offset: int, length: int, version: int = CURRENT_VERSION):
self._version = version
@@ -50,25 +52,20 @@ def version(self) -> int:
return self._version
def serialize(self) -> bytes:
- import struct
-
uri_bytes = self._uri.encode('utf-8')
uri_length = len(uri_bytes)
-
- # Pack using little endian format
data = struct.pack(' 1:
+ data += struct.pack(' 'BlobDescriptor':
- import struct
-
- if len(data) < 5: # minimum size: version(1) + uri_length(4)
+ if len(data) < 5:
raise ValueError("Invalid BlobDescriptor data: too short")
offset = 0
@@ -77,11 +74,26 @@ def deserialize(cls, data: bytes) -> 'BlobDescriptor':
version = struct.unpack(' cls.CURRENT_VERSION:
+ raise ValueError(
+ f"Expecting BlobDescriptor version to be less than or equal to "
+ f"{cls.CURRENT_VERSION}, but found {version}."
+ )
+
+ if version > 1:
+ if offset + 8 > len(data):
+ raise ValueError("Invalid BlobDescriptor data: too short")
+ magic = struct.unpack(' len(data):
+ raise ValueError("Invalid BlobDescriptor data: too short")
uri_length = struct.unpack(' 'BlobDescriptor':
return cls(uri, blob_offset, blob_length, version)
+ @classmethod
+ def is_blob_descriptor(cls, data: bytes) -> bool:
+ if not isinstance(data, (bytes, bytearray)):
+ return False
+ raw = bytes(data)
+ if len(raw) < 9:
+ return False
+
+ version = raw[0]
+ if version < 1:
+ return False
+ if version > cls.CURRENT_VERSION:
+ return False
+ if version == 1:
+ try:
+ cls.deserialize(raw)
+ return True
+ except Exception:
+ return False
+
+ try:
+ magic = struct.unpack(' bool:
"""Check equality with another BlobDescriptor."""
if not isinstance(other, BlobDescriptor):
diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py
index b1236d764b10..f4328a8e2667 100755
--- a/paimon-python/pypaimon/tests/blob_table_test.py
+++ b/paimon-python/pypaimon/tests/blob_table_test.py
@@ -197,7 +197,7 @@ def test_data_blob_writer_no_blob_column(self):
writer.close()
def test_data_blob_writer_multiple_blob_columns(self):
- """Test that DataBlobWriter raises error when multiple blob columns are found."""
+ """Test that DataBlobWriter supports multiple blob columns."""
from pypaimon import Schema
# Test schema with multiple blob columns
@@ -228,10 +228,18 @@ def test_data_blob_writer_multiple_blob_columns(self):
'blob2': [b'blob2_1', b'blob2_2', b'blob2_3']
}, schema=pa_schema)
- # This should raise an error when DataBlobWriter is created internally
- with self.assertRaises(ValueError) as context:
- writer.write_arrow(test_data)
- self.assertIn("Limit exactly one blob field in one paimon table yet", str(context.exception))
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ all_files = [f for msg in commit_messages for f in msg.new_files]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+ self.assertGreaterEqual(len(blob_files), 2, "Each blob column should produce blob files")
+
+ # Verify row counts can be read back correctly.
+ result = table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
+ self.assertEqual(result.num_rows, 3)
def test_data_blob_writer_write_operations(self):
"""Test DataBlobWriter write operations with real data."""
@@ -1074,7 +1082,94 @@ def test_blob_write_read_end_to_end_with_descriptor(self):
print(" - Created external blob file and descriptor")
print(" - Wrote and read blob descriptor successfully")
print(" - Verified blob data can be read from descriptor")
- print(" - Tested blob-as-descriptor=true mode")
+ print(" - Tested blob-as-descriptor=true read output mode")
+
+ def test_blob_stored_descriptor_fields_mixed_mode(self):
+ import random
+ from pypaimon import Schema
+ from pypaimon.table.row.blob import BlobDescriptor
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('pic1', pa.large_binary()),
+ ('pic2', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob.stored-descriptor-fields': 'pic2'
+ }
+ )
+ self.catalog.create_table('test_db.blob_mixed_mode_test', schema, False)
+ table = self.catalog.get_table('test_db.blob_mixed_mode_test')
+
+ random.seed(7)
+ pic1_data = bytes(bytearray([random.randint(0, 255) for _ in range(1024)]))
+
+ external_blob_path = os.path.join(self.temp_dir, 'mixed_external_blob')
+ pic2_data = b'pic2_descriptor_payload'
+ with open(external_blob_path, 'wb') as f:
+ f.write(pic2_data)
+ descriptor = BlobDescriptor(external_blob_path, 0, len(pic2_data))
+
+ test_data = pa.Table.from_pydict({
+ 'id': [1],
+ 'pic1': [pic1_data],
+ 'pic2': [descriptor.serialize()]
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+ writer.write_arrow(test_data)
+ commit_messages = writer.prepare_commit()
+ write_builder.new_commit().commit(commit_messages)
+ writer.close()
+
+ all_files = [f for msg in commit_messages for f in msg.new_files]
+ blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
+ self.assertGreaterEqual(len(blob_files), 1)
+ self.assertTrue(all(f.write_cols == ['pic1'] for f in blob_files))
+
+ result = table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
+ self.assertEqual(result.num_rows, 1)
+ self.assertEqual(result.column('pic1').to_pylist()[0], pic1_data)
+ self.assertEqual(result.column('pic2').to_pylist()[0], pic2_data)
+
+ def test_blob_stored_descriptor_fields_rejects_non_descriptor_input(self):
+ from pypaimon import Schema
+
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('pic1', pa.large_binary()),
+ ('pic2', pa.large_binary()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ options={
+ 'row-tracking.enabled': 'true',
+ 'data-evolution.enabled': 'true',
+ 'blob.stored-descriptor-fields': 'pic2'
+ }
+ )
+ self.catalog.create_table('test_db.blob_mixed_mode_reject_test', schema, False)
+ table = self.catalog.get_table('test_db.blob_mixed_mode_reject_test')
+
+ write_builder = table.new_batch_write_builder()
+ writer = write_builder.new_write()
+
+ bad_data = pa.Table.from_pydict({
+ 'id': [1],
+ 'pic1': [b'good'],
+ 'pic2': [b'not-a-descriptor']
+ }, schema=pa_schema)
+
+ with self.assertRaises(ValueError) as context:
+ writer.write_arrow(bad_data)
+ self.assertIn("blob.stored-descriptor-fields", str(context.exception))
def test_blob_write_read_large_data_end_to_end(self):
"""Test end-to-end blob functionality with large blob data (1MB per blob)."""
@@ -1745,7 +1840,7 @@ def test_blob_as_descriptor_sequence_number_increment(self):
from pypaimon import Schema
from pypaimon.table.row.blob import BlobDescriptor
- # Create schema with blob column (blob-as-descriptor=true)
+ # Create schema with blob column (descriptor input is auto-detected in write path)
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
@@ -1770,7 +1865,7 @@ def test_blob_as_descriptor_sequence_number_increment(self):
f.write(data)
descriptors.append(BlobDescriptor(path, 0, len(data)))
- # Write data row by row (this triggers the one-by-one writing in blob-as-descriptor mode)
+ # Write data row by row and verify blob sequence-number continuity
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
@@ -1832,7 +1927,7 @@ def test_blob_as_descriptor_sequence_number_increment(self):
def test_blob_non_descriptor_sequence_number_increment(self):
from pypaimon import Schema
- # Create schema with blob column (blob-as-descriptor=false, normal mode)
+ # Create schema with blob column (default read output is blob bytes)
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
@@ -1841,7 +1936,7 @@ def test_blob_non_descriptor_sequence_number_increment(self):
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
- 'blob-as-descriptor': 'false' # Normal mode, not descriptor mode
+ 'blob-as-descriptor': 'false' # Read output as blob bytes
})
self.catalog.create_table('test_db.blob_sequence_non_desc_test', schema, False)
@@ -1855,7 +1950,7 @@ def test_blob_non_descriptor_sequence_number_increment(self):
'blob_data': [f'blob data {i}'.encode('utf-8') for i in range(num_rows)]
}, schema=pa_schema)
- # Write data as a batch (this triggers batch writing in non-descriptor mode)
+ # Write data as a batch and verify blob sequence-number continuity
write_builder = table.new_batch_write_builder()
writer = write_builder.new_write()
writer.write_arrow(test_data)
@@ -1901,7 +1996,7 @@ def test_blob_non_descriptor_sequence_number_increment(self):
f"File: {blob_file.file_name}, min_seq: {min_seq}, max_seq: {max_seq}"
)
- print("✅ Non-descriptor mode sequence number increment test passed")
+ print("✅ Blob sequence number increment test passed (batch write)")
def test_blob_stats_schema_with_custom_column_name(self):
from pypaimon import Schema
@@ -1969,7 +2064,7 @@ def test_blob_file_name_format_with_shared_uuid_non_descriptor_mode(self):
import re
from pypaimon import Schema
- # Create schema with blob column (blob-as-descriptor=false)
+ # Create schema with blob column (blob bytes read output)
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
@@ -1978,7 +2073,7 @@ def test_blob_file_name_format_with_shared_uuid_non_descriptor_mode(self):
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
- 'blob-as-descriptor': 'false', # Non-descriptor mode
+ 'blob-as-descriptor': 'false', # Read output as blob bytes
'target-file-size': '1MB' # Small target size to trigger multiple rollings
})
@@ -2069,10 +2164,10 @@ def test_blob_file_name_format_with_shared_uuid_non_descriptor_mode(self):
self.assertEqual(result.column('id').to_pylist(), list(range(1, num_blobs + 1)))
def test_blob_non_descriptor_target_file_size_rolling(self):
- """Test that blob.target-file-size is respected in non-descriptor mode."""
+ """Test that blob.target-file-size is respected in blob write path."""
from pypaimon import Schema
- # Create schema with blob column (non-descriptor mode)
+ # Create schema with blob column (blob write path)
pa_schema = pa.schema([
('id', pa.int32()),
('blob_data', pa.large_binary()),
diff --git a/paimon-python/pypaimon/tests/blob_test.py b/paimon-python/pypaimon/tests/blob_test.py
index 91808bc01413..2f95bff04613 100644
--- a/paimon-python/pypaimon/tests/blob_test.py
+++ b/paimon-python/pypaimon/tests/blob_test.py
@@ -421,15 +421,21 @@ def test_blob_descriptor_deserialization_invalid_data(self):
BlobDescriptor.deserialize(b"sho") # Only 3 bytes, need at least 5
self.assertIn("too short", str(context.exception))
- # Test with invalid version (version 0)
- # Create valid data but with wrong version
+ # Test with unsupported version (> current version)
valid_descriptor = BlobDescriptor("test://uri", 0, 100)
valid_data = bytearray(valid_descriptor.serialize())
- valid_data[0] = 0 # Set invalid version (0)
+ valid_data[0] = 3 # Set unsupported version
with self.assertRaises(ValueError) as context:
BlobDescriptor.deserialize(bytes(valid_data))
- self.assertIn("Unsupported BlobDescriptor version", str(context.exception))
+ self.assertIn("less than or equal to 2, but found 3", str(context.exception))
+
+ # Test with invalid magic for version 2 descriptor
+ invalid_magic_data = bytearray(valid_descriptor.serialize())
+ invalid_magic_data[1:9] = b"\x00" * 8
+ with self.assertRaises(ValueError) as context:
+ BlobDescriptor.deserialize(bytes(invalid_magic_data))
+ self.assertIn("missing magic header", str(context.exception))
# Test with incomplete data (missing URI bytes)
incomplete_data = b'\x01\x00\x00\x00\x10' # Version 1, URI length 16, but no URI bytes
@@ -486,6 +492,15 @@ def test_blob_descriptor_version_handling(self):
deserialized = BlobDescriptor.deserialize(serialized)
self.assertEqual(deserialized.version, 2)
+ # v1 payloads should remain deserializable for compatibility
+ descriptor_v1 = BlobDescriptor("test://uri", 0, 100, version=1)
+ serialized_v1 = descriptor_v1.serialize()
+ deserialized_v1 = BlobDescriptor.deserialize(serialized_v1)
+ self.assertEqual(deserialized_v1.version, 1)
+ self.assertEqual(deserialized_v1.uri, descriptor_v1.uri)
+ self.assertEqual(deserialized_v1.offset, descriptor_v1.offset)
+ self.assertEqual(deserialized_v1.length, descriptor_v1.length)
+
def test_blob_descriptor_edge_cases(self):
"""Test BlobDescriptor with edge cases."""
# Test with empty URI
@@ -532,15 +547,31 @@ def test_blob_descriptor_serialization_format(self):
# Check that serialized data starts with version byte
self.assertEqual(serialized[0], BlobDescriptor.CURRENT_VERSION)
- # Check minimum length (version + uri_length + uri + offset + length)
- # 1 + 4 + len("test") + 8 + 8 = 25 bytes
- self.assertEqual(len(serialized), 25)
+ # Check minimum length (version + magic + uri_length + uri + offset + length)
+ # 1 + 8 + 4 + len("test") + 8 + 8 = 33 bytes
+ self.assertEqual(len(serialized), 33)
# Verify round-trip consistency
deserialized = BlobDescriptor.deserialize(serialized)
re_serialized = deserialized.serialize()
self.assertEqual(serialized, re_serialized)
+ def test_blob_descriptor_detection(self):
+ import struct
+
+ descriptor_v2 = BlobDescriptor("test://uri", 1, 2)
+ descriptor_v1 = BlobDescriptor("test://uri", 1, 2, version=1)
+ random_bytes = b"not-a-descriptor"
+ fake_v1_prefix = b"\x01not-a-descriptor"
+ v2_magic_only = bytes([2]) + struct.pack('= start:
+ # Handle blob files specially. Each blob field tracks row ids independently.
+ if current_data_start >= start:
raise RuntimeError(
- f"This is a bug, blobStart {blob_start} should be less than start {start} "
+ f"This is a bug, blobStart {current_data_start} should be less than start {start} "
f"when assigning a blob entry file."
)
row_count = entry.file.row_count
- row_id_assigned.append(entry.assign_first_row_id(blob_start))
- blob_start += row_count
+ blob_field_key = tuple(entry.file.write_cols or [])
+ field_blob_start = blob_start_by_field.get(blob_field_key, current_data_start)
+ row_id_assigned.append(entry.assign_first_row_id(field_blob_start))
+ blob_start_by_field[blob_field_key] = field_blob_start + row_count
else:
# Handle regular files
row_count = entry.file.row_count
row_id_assigned.append(entry.assign_first_row_id(start))
- blob_start = start
+ current_data_start = start
+ blob_start_by_field.clear()
start += row_count
else:
# For compact files or files that already have first_row_id, don't assign
diff --git a/paimon-python/pypaimon/write/writer/blob_file_writer.py b/paimon-python/pypaimon/write/writer/blob_file_writer.py
index 2b3ba1a1af1f..b038fcf2ab30 100644
--- a/paimon-python/pypaimon/write/writer/blob_file_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_file_writer.py
@@ -31,10 +31,9 @@ class BlobFileWriter:
Writes rows one by one and tracks file size.
"""
- def __init__(self, file_io, file_path: Path, blob_as_descriptor: bool):
+ def __init__(self, file_io, file_path: Path):
self.file_io = file_io
self.file_path = file_path
- self.blob_as_descriptor = blob_as_descriptor
self.output_stream = file_io.new_output_stream(file_path)
self.writer = BlobFormatWriter(self.output_stream)
self.row_count = 0
@@ -50,30 +49,7 @@ def write_row(self, row_data: pa.Table):
field_name = row_data.schema[0].name
col_data = records_dict[field_name][0]
- # Convert to Blob
- if self.blob_as_descriptor:
- # In blob-as-descriptor mode, we need to read external file data
- # for rolling size calculation (based on external file size)
- if isinstance(col_data, bytes):
- blob_descriptor = BlobDescriptor.deserialize(col_data)
- else:
- # Handle PyArrow types
- if hasattr(col_data, 'as_py'):
- col_data = col_data.as_py()
- if isinstance(col_data, str):
- col_data = col_data.encode('utf-8')
- blob_descriptor = BlobDescriptor.deserialize(col_data)
- # Read external file data for rolling size calculation
- uri_reader = self.file_io.uri_reader_factory.create(blob_descriptor.uri)
- blob_data = Blob.from_descriptor(uri_reader, blob_descriptor)
- elif isinstance(col_data, bytes):
- blob_data = BlobData(col_data)
- else:
- if hasattr(col_data, 'as_py'):
- col_data = col_data.as_py()
- if isinstance(col_data, str):
- col_data = col_data.encode('utf-8')
- blob_data = BlobData(col_data)
+ blob_data = self._to_blob(col_data)
# Create GenericRow
fields = [DataField(0, field_name, PyarrowFieldParser.to_paimon_type(row_data.schema[0].type, False))]
@@ -83,6 +59,36 @@ def write_row(self, row_data: pa.Table):
self.writer.add_element(row)
self.row_count += 1
+ def _to_blob(self, col_data) -> Blob:
+ if hasattr(col_data, 'as_py'):
+ col_data = col_data.as_py()
+ if isinstance(col_data, str):
+ col_data = col_data.encode('utf-8')
+ if isinstance(col_data, bytearray):
+ col_data = bytes(col_data)
+
+ if isinstance(col_data, Blob):
+ return col_data
+
+ if isinstance(col_data, bytes):
+ if BlobDescriptor.is_blob_descriptor(col_data):
+ descriptor = BlobDescriptor.deserialize(col_data)
+ uri_reader = self.file_io.uri_reader_factory.create(descriptor.uri)
+ return Blob.from_descriptor(uri_reader, descriptor)
+ else:
+ return BlobData(col_data)
+
+ raise ValueError(
+ "Blob field value must be bytes/blob or serialized BlobDescriptor bytes, "
+ f"got {type(col_data)}."
+ )
+
+ @staticmethod
+ def _deserialize_descriptor_or_none(raw: bytes):
+ if not BlobDescriptor.is_blob_descriptor(raw):
+ return None
+ return BlobDescriptor.deserialize(raw)
+
def reach_target_size(self, suggested_check: bool, target_size: int) -> bool:
return self.writer.reach_target_size(suggested_check, target_size)
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py b/paimon-python/pypaimon/write/writer/blob_writer.py
index 527045d1e0ab..d6994ca75c7e 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -60,30 +60,17 @@ def _check_and_roll_if_needed(self):
if self.pending_data is None:
return
- if self.blob_as_descriptor:
- # blob-as-descriptor=true: Write row by row and check actual file size
- for i in range(self.pending_data.num_rows):
- row_data = self.pending_data.slice(i, 1)
- self._write_row_to_file(row_data)
- self.record_count += 1
-
- if self.rolling_file(False):
- self.close_current_writer()
-
- # All data has been written
- self.pending_data = None
- else:
- # blob-as-descriptor=false: Use blob_target_file_size instead of target_file_size
- current_size = self.pending_data.nbytes
- if current_size > self.blob_target_file_size:
- split_row = self._find_optimal_split_point(self.pending_data, self.blob_target_file_size)
- if split_row > 0:
- data_to_write = self.pending_data.slice(0, split_row)
- remaining_data = self.pending_data.slice(split_row)
-
- self._write_data_to_file(data_to_write)
- self.pending_data = remaining_data
- self._check_and_roll_if_needed()
+ # Always write blob rows one-by-one so rolling uses actual blob bytes size rather than
+ # in-memory serialized descriptor size.
+ for i in range(self.pending_data.num_rows):
+ row_data = self.pending_data.slice(i, 1)
+ self._write_row_to_file(row_data)
+ self.record_count += 1
+
+ if self.rolling_file(False):
+ self.close_current_writer()
+
+ self.pending_data = None
def _write_row_to_file(self, row_data: pa.Table):
"""Write a single row to the current blob file. Opens a new file if needed."""
@@ -103,7 +90,7 @@ def open_current_writer(self):
self.file_count += 1 # Increment counter for next file
file_path = self._generate_file_path(file_name)
self.current_file_path = file_path
- self.current_writer = BlobFileWriter(self.file_io, file_path, self.blob_as_descriptor)
+ self.current_writer = BlobFileWriter(self.file_io, file_path)
def rolling_file(self, force_check: bool = False) -> bool:
if self.current_writer is None:
@@ -132,8 +119,8 @@ def close_current_writer(self):
def _write_data_to_file(self, data):
"""
- Override for blob format in normal mode (blob-as-descriptor=false).
- Only difference from parent: use shared UUID + counter for file naming.
+ Keep a fallback path for direct blob table writes while preserving the shared uuid+counter
+ naming behavior.
"""
if data.num_rows == 0:
return
@@ -146,7 +133,6 @@ def _write_data_to_file(self, data):
self.file_count += 1
file_path = self._generate_file_path(file_name)
- # Write blob file (parent class already supports blob format)
self.file_io.write_blob(file_path, data, self.blob_as_descriptor)
file_size = self.file_io.get_file_size(file_path)
@@ -219,20 +205,20 @@ def _add_file_metadata(self, file_name: str, file_path: str, data_or_row_count,
def prepare_commit(self):
"""Prepare commit, ensuring all data is written."""
- # Close current file if open (blob-as-descriptor=true mode)
+ # Close current file if open.
if self.current_writer is not None:
self.close_current_writer()
- # Call parent to handle pending_data (blob-as-descriptor=false mode)
+ # Call parent to handle pending_data fallback.
return super().prepare_commit()
def close(self):
"""Close current writer if open."""
- # Close current file if open (blob-as-descriptor=true mode)
+ # Close current file if open.
if self.current_writer is not None:
self.close_current_writer()
- # Call parent to handle pending_data (blob-as-descriptor=false mode)
+ # Call parent to handle pending_data fallback.
super().close()
def abort(self):
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index 800e21e5a64d..f97bcf99b3e2 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -18,7 +18,7 @@
import logging
import uuid
-from typing import List, Optional, Tuple
+from typing import Dict, List, Optional, Tuple
import pyarrow as pa
@@ -38,8 +38,9 @@ class DataBlobWriter(DataWriter):
A rolling file writer that handles both normal data and blob data. This writer creates separate
files for normal columns and blob columns, managing their lifecycle independently.
- For example, given a table schema with normal columns (id INT, name STRING) and a blob column
- (data BLOB), this writer will create separate files for (id, name) and (data).
+ For example, given a table schema with normal columns (id INT, name STRING) and blob columns
+ (pic1 BLOB, pic2 BLOB), this writer will create separate files for normal columns and each
+ blob-file column.
Key features:
- Blob data can roll independently when normal data doesn't need rolling
@@ -79,13 +80,31 @@ class DataBlobWriter(DataWriter):
def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, options: CoreOptions = None):
super().__init__(table, partition, bucket, max_seq_number, options)
- # Determine blob column from table schema
- self.blob_column_name = self._get_blob_columns_from_schema()
+ # Determine blob columns from table schema
+ self.blob_column_names = self._get_blob_columns_from_schema()
+ self.blob_stored_descriptor_fields = CoreOptions.blob_stored_descriptor_fields(self.options)
+
+ unknown_descriptor_fields = self.blob_stored_descriptor_fields.difference(
+ set(self.blob_column_names)
+ )
+ if unknown_descriptor_fields:
+ raise ValueError(
+ "Fields in 'blob.stored-descriptor-fields' must be blob fields in schema. "
+ f"Unknown fields: {sorted(unknown_descriptor_fields)}"
+ )
+
+ # Blob fields that should still be written to `.blob` files.
+ self.blob_file_column_names = [
+ col for col in self.blob_column_names if col not in self.blob_stored_descriptor_fields
+ ]
- # Split schema into normal and blob columns
all_column_names = self.table.field_names
- self.normal_column_names = [col for col in all_column_names if col != self.blob_column_name]
- self.normal_columns = [field for field in self.table.table_schema.fields if field.name != self.blob_column_name]
+ self.normal_column_names = [
+ col for col in all_column_names if col not in self.blob_file_column_names
+ ]
+ self.normal_columns = [
+ field for field in self.table.table_schema.fields if field.name in self.normal_column_names
+ ]
self.write_cols = self.normal_column_names
# State management for blob writer
@@ -95,33 +114,37 @@ def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: int, op
# Track pending data for normal data only
self.pending_normal_data: Optional[pa.Table] = None
- # Initialize blob writer with blob column name
+ # Initialize blob writers for each blob-file column.
from pypaimon.write.writer.blob_writer import BlobWriter
- self.blob_writer = BlobWriter(
- table=self.table,
- partition=self.partition,
- bucket=self.bucket,
- max_seq_number=max_seq_number,
- blob_column=self.blob_column_name,
- options=options
- )
+ self.blob_writers: Dict[str, BlobWriter] = {}
+ for blob_column in self.blob_file_column_names:
+ self.blob_writers[blob_column] = BlobWriter(
+ table=self.table,
+ partition=self.partition,
+ bucket=self.bucket,
+ max_seq_number=max_seq_number,
+ blob_column=blob_column,
+ options=options
+ )
- logger.info(f"Initialized DataBlobWriter with blob column: {self.blob_column_name}")
+ logger.info(
+ "Initialized DataBlobWriter with blob columns: %s, blob file columns: %s, descriptor "
+ "stored columns: %s",
+ self.blob_column_names,
+ self.blob_file_column_names,
+ sorted(self.blob_stored_descriptor_fields),
+ )
- def _get_blob_columns_from_schema(self) -> str:
+ def _get_blob_columns_from_schema(self) -> List[str]:
blob_columns = []
for field in self.table.table_schema.fields:
type_str = str(field.type).lower()
if 'blob' in type_str:
blob_columns.append(field.name)
- # Validate blob column count (matching Java constraint)
if len(blob_columns) == 0:
raise ValueError("No blob field found in table schema.")
- elif len(blob_columns) > 1:
- raise ValueError("Limit exactly one blob field in one paimon table yet.")
-
- return blob_columns[0] # Return single blob column name
+ return blob_columns
def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
normal_data, _ = self._split_data(data)
@@ -133,7 +156,8 @@ def _merge_data(self, existing_data: pa.Table, new_data: pa.Table) -> pa.Table:
def write(self, data: pa.RecordBatch):
try:
# Split data into normal and blob parts
- normal_data, blob_data = self._split_data(data)
+ normal_data, blob_data_map = self._split_data(data)
+ self._validate_descriptor_stored_fields_input(data)
# Process and accumulate normal data
processed_normal = self._process_normal_data(normal_data)
@@ -142,10 +166,10 @@ def write(self, data: pa.RecordBatch):
else:
self.pending_normal_data = self._merge_normal_data(self.pending_normal_data, processed_normal)
- # Write blob data directly to blob writer (handles its own rolling)
- if blob_data is not None and blob_data.num_rows > 0:
- # Write blob data directly to blob writer
- self.blob_writer.write(blob_data)
+ # Write blob-file columns to dedicated blob writers.
+ for blob_column, blob_data in blob_data_map.items():
+ if blob_data is not None and blob_data.num_rows > 0:
+ self.blob_writers[blob_column].write(blob_data)
self.record_count += data.num_rows
@@ -181,21 +205,51 @@ def close(self):
def abort(self):
"""Abort all writers and clean up resources."""
- self.blob_writer.abort()
+ for blob_writer in self.blob_writers.values():
+ blob_writer.abort()
self.pending_normal_data = None
self.committed_files.clear()
- def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, pa.RecordBatch]:
+ def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, Dict[str, pa.RecordBatch]]:
"""Split data into normal and blob parts based on column names."""
- # Use the pre-computed column names
- normal_columns = self.normal_column_names
- blob_columns = [self.blob_column_name] # Single blob column
-
- # Create projected batches
- normal_data = data.select(normal_columns) if normal_columns else None
- blob_data = data.select(blob_columns) if blob_columns else None
+ normal_data = data.select(self.normal_column_names) if self.normal_column_names else None
+ blob_data_map = {
+ blob_column: data.select([blob_column]) for blob_column in self.blob_file_column_names
+ }
+ return normal_data, blob_data_map
+
+ def _validate_descriptor_stored_fields_input(self, data: pa.RecordBatch):
+ if not self.blob_stored_descriptor_fields:
+ return
- return normal_data, blob_data
+ from pypaimon.table.row.blob import BlobDescriptor
+
+ for field_name in self.blob_stored_descriptor_fields:
+ if field_name not in data.schema.names:
+ continue
+ values = data.column(data.schema.get_field_index(field_name)).to_pylist()
+ for value in values:
+ if value is None:
+ continue
+ if hasattr(value, 'as_py'):
+ value = value.as_py()
+ if isinstance(value, str):
+ value = value.encode('utf-8')
+ if not isinstance(value, (bytes, bytearray)):
+ raise ValueError(
+ "blob.stored-descriptor-fields requires blob field value to be a serialized "
+ "BlobDescriptor."
+ )
+ try:
+ descriptor_bytes = bytes(value)
+ descriptor = BlobDescriptor.deserialize(descriptor_bytes)
+ if descriptor.serialize() != descriptor_bytes:
+ raise ValueError("Descriptor payload contains trailing bytes.")
+ except Exception as e:
+ raise ValueError(
+ "blob.stored-descriptor-fields requires blob field value to be a serialized "
+ "BlobDescriptor."
+ ) from e
@staticmethod
def _process_normal_data(data: pa.RecordBatch) -> pa.Table:
@@ -228,11 +282,11 @@ def _close_current_writers(self):
# Close normal writer and get metadata
normal_meta = self._write_normal_data_to_file(self.pending_normal_data)
- # Fetch blob metadata from blob writer
- blob_metas = self.blob_writer.prepare_commit()
-
- # Validate consistency between normal and blob files (Java behavior)
- self._validate_consistency(normal_meta, blob_metas)
+ blob_metas = []
+ for blob_column in self.blob_file_column_names:
+ writer_metas = self.blob_writers[blob_column].prepare_commit()
+ self._validate_consistency(normal_meta, writer_metas, blob_column)
+ blob_metas.extend(writer_metas)
# Add normal file metadata first
self.committed_files.append(normal_meta)
@@ -301,7 +355,8 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table,
file_path=file_path,
write_cols=self.write_cols)
- def _validate_consistency(self, normal_meta: DataFileMeta, blob_metas: List[DataFileMeta]):
+ def _validate_consistency(
+ self, normal_meta: DataFileMeta, blob_metas: List[DataFileMeta], blob_column: str):
if normal_meta is None:
return
@@ -312,5 +367,6 @@ def _validate_consistency(self, normal_meta: DataFileMeta, blob_metas: List[Data
raise RuntimeError(
f"This is a bug: The row count of main file and blob files does not match. "
f"Main file: {normal_meta.file_name} (row count: {normal_row_count}), "
+ f"blob field: {blob_column}, "
f"blob files: {[meta.file_name for meta in blob_metas]} (total row count: {blob_row_count})"
)
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
index 3ad3666e9760..d2e70b9ed3b4 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java
@@ -61,28 +61,25 @@ public class SparkInternalRowWrapper implements InternalRow, Serializable {
private final StructType tableSchema;
private final int length;
- private final boolean blobAsDescriptor;
@Nullable private final UriReaderFactory uriReaderFactory;
@Nullable private final int[] fieldIndexMap;
private transient org.apache.spark.sql.catalyst.InternalRow internalRow;
public SparkInternalRowWrapper(StructType tableSchema, int length) {
- this(tableSchema, length, null, false, null);
+ this(tableSchema, length, null, null);
}
public SparkInternalRowWrapper(
StructType tableSchema,
int length,
StructType dataSchema,
- boolean blobAsDescriptor,
CatalogContext catalogContext) {
this.tableSchema = tableSchema;
this.length = length;
this.fieldIndexMap =
dataSchema != null ? buildFieldIndexMap(tableSchema, dataSchema) : null;
- this.blobAsDescriptor = blobAsDescriptor;
- this.uriReaderFactory = blobAsDescriptor ? new UriReaderFactory(catalogContext) : null;
+ this.uriReaderFactory = new UriReaderFactory(catalogContext);
}
public SparkInternalRowWrapper replace(org.apache.spark.sql.catalyst.InternalRow internalRow) {
@@ -243,12 +240,14 @@ public Variant getVariant(int pos) {
@Override
public Blob getBlob(int pos) {
- if (blobAsDescriptor) {
- BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(internalRow.getBinary(pos));
+ byte[] bytes = internalRow.getBinary(pos);
+ boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
+ if (blobDes) {
+ BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri());
return Blob.fromDescriptor(uriReader, blobDescriptor);
} else {
- return new BlobData(internalRow.getBinary(pos));
+ return new BlobData(bytes);
}
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
index cc947c7ea04b..36b5624ff52f 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java
@@ -62,24 +62,17 @@ public class SparkRow implements InternalRow, Serializable {
private final RowType type;
private final Row row;
private final RowKind rowKind;
- private final boolean blobAsDescriptor;
private final UriReaderFactory uriReaderFactory;
public SparkRow(RowType type, Row row) {
- this(type, row, RowKind.INSERT, false, null);
+ this(type, row, RowKind.INSERT, null);
}
- public SparkRow(
- RowType type,
- Row row,
- RowKind rowkind,
- boolean blobAsDescriptor,
- CatalogContext catalogContext) {
+ public SparkRow(RowType type, Row row, RowKind rowkind, CatalogContext catalogContext) {
this.type = type;
this.row = row;
this.rowKind = rowkind;
- this.blobAsDescriptor = blobAsDescriptor;
- this.uriReaderFactory = blobAsDescriptor ? new UriReaderFactory(catalogContext) : null;
+ this.uriReaderFactory = new UriReaderFactory(catalogContext);
}
@Override
@@ -168,12 +161,14 @@ public Variant getVariant(int i) {
@Override
public Blob getBlob(int i) {
- if (blobAsDescriptor) {
- BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(row.getAs(i));
+ byte[] bytes = row.getAs(i);
+ boolean blobDes = BlobDescriptor.isBlobDescriptor(bytes);
+ if (blobDes) {
+ BlobDescriptor blobDescriptor = BlobDescriptor.deserialize(bytes);
UriReader uriReader = uriReaderFactory.create(blobDescriptor.uri());
return Blob.fromDescriptor(uriReader, blobDescriptor);
} else {
- return new BlobData(row.getAs(i));
+ return new BlobData(bytes);
}
}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
index 077d92cc292b..a5d9278523ab 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DataEvolutionPaimonWriter.scala
@@ -74,7 +74,6 @@ case class DataEvolutionPaimonWriter(paimonTable: FileStoreTable, dataSplits: Se
table.newBatchWriteBuilder(),
writeType,
firstRowIdToPartitionMap,
- coreOptions.blobAsDescriptor(),
table.catalogEnvironment().catalogContext())
try {
iter.foreach(row => write.write(row))
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index de5804cc72f2..ebdbf038fff2 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -138,7 +138,6 @@ case class PaimonSparkWriter(
writeRowTracking,
fullCompactionDeltaCommits,
batchId,
- coreOptions.blobAsDescriptor(),
table.catalogEnvironment().catalogContext(),
postponePartitionBucketComputer
)
@@ -455,7 +454,6 @@ case class PaimonSparkWriter(
val toPaimonRow = SparkRowUtils.toPaimonRow(
rowType,
rowKindColIdx,
- table.coreOptions().blobAsDescriptor(),
table.catalogEnvironment().catalogContext())
bootstrapIterator.asScala
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala
index 572e1cf96c22..5c25b70e6754 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkPostponeCompactProcedure.scala
@@ -104,7 +104,6 @@ case class SparkPostponeCompactProcedure(
writeRowTracking = coreOptions.dataEvolutionEnabled(),
Option.apply(coreOptions.fullCompactionDeltaCommits()),
None,
- coreOptions.blobAsDescriptor(),
table.catalogEnvironment().catalogContext(),
Some(postponePartitionBucketComputer)
)
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala
index 86b070083fe9..eee55454546c 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SparkRowUtils.scala
@@ -30,7 +30,6 @@ object SparkRowUtils {
def toPaimonRow(
writeType: RowType,
rowkindColIdx: Int,
- blobAsDescriptor: Boolean,
catalogContext: CatalogContext): Row => SparkRow = {
if (rowkindColIdx != -1) {
row =>
@@ -38,9 +37,8 @@ object SparkRowUtils {
writeType,
row,
RowKind.fromByteValue(row.getByte(rowkindColIdx)),
- blobAsDescriptor,
catalogContext)
- } else { row => new SparkRow(writeType, row, RowKind.INSERT, blobAsDescriptor, catalogContext) }
+ } else { row => new SparkRow(writeType, row, RowKind.INSERT, catalogContext) }
}
def getFieldIndex(schema: StructType, colName: String): Int = {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala
index 0bc9bc0a7684..c0e5190d5dfa 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/DataEvolutionTableDataWrite.scala
@@ -40,7 +40,6 @@ case class DataEvolutionTableDataWrite(
writeBuilder: BatchWriteBuilder,
writeType: RowType,
firstRowIdToPartitionMap: mutable.HashMap[Long, (BinaryRow, Long)],
- blobAsDescriptor: Boolean,
catalogContext: CatalogContext)
extends InnerTableV1DataWrite {
@@ -51,7 +50,7 @@ case class DataEvolutionTableDataWrite(
private val commitMessages = ListBuffer[CommitMessageImpl]()
private val toPaimonRow = {
- SparkRowUtils.toPaimonRow(writeType, -1, blobAsDescriptor, catalogContext)
+ SparkRowUtils.toPaimonRow(writeType, -1, catalogContext)
}
def write(row: Row): Unit = {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonDataWrite.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonDataWrite.scala
index f0861c27b5cb..02b9338b4bc9 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonDataWrite.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonDataWrite.scala
@@ -37,7 +37,6 @@ case class PaimonDataWrite(
writeRowTracking: Boolean = false,
fullCompactionDeltaCommits: Option[Int],
batchId: Option[Long],
- blobAsDescriptor: Boolean,
catalogContext: CatalogContext,
postponePartitionBucketComputer: Option[BinaryRow => Integer])
extends abstractInnerTableDataWrite[Row]
@@ -58,7 +57,7 @@ case class PaimonDataWrite(
}
private val toPaimonRow = {
- SparkRowUtils.toPaimonRow(writeType, rowKindColIdx, blobAsDescriptor, catalogContext)
+ SparkRowUtils.toPaimonRow(writeType, rowKindColIdx, catalogContext)
}
def write(row: Row): Unit = {
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
index fbd166a18312..aa2dfcdf8f56 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
@@ -45,7 +45,6 @@ case class PaimonV2DataWriter(
val fullCompactionDeltaCommits: Option[Int] =
Option.apply(coreOptions.fullCompactionDeltaCommits())
- val blobAsDescriptor: Boolean = coreOptions.blobAsDescriptor()
val write: TableWriteImpl[InternalRow] = {
writeBuilder
@@ -57,12 +56,8 @@ case class PaimonV2DataWriter(
private val rowConverter: InternalRow => SparkInternalRowWrapper = {
val numFields = writeSchema.fields.length
- val reusableWrapper = new SparkInternalRowWrapper(
- writeSchema,
- numFields,
- dataSchema,
- blobAsDescriptor,
- catalogContext)
+ val reusableWrapper =
+ new SparkInternalRowWrapper(writeSchema, numFields, dataSchema, catalogContext)
record => reusableWrapper.replace(record)
}
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
index 08c874cd3bee..1c108a9abf31 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/BlobTestBase.scala
@@ -91,13 +91,15 @@ class BlobTestBase extends PaimonSparkTestBase {
val blobDescriptor = new BlobDescriptor(uri, 0, blobData.length)
sql(
- "CREATE TABLE t (id INT, data STRING, picture BINARY) TBLPROPERTIES ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture', 'blob-as-descriptor'='true')")
+ "CREATE TABLE t (id INT, data STRING, picture BINARY) TBLPROPERTIES ('row-tracking.enabled'='true', 'data-evolution.enabled'='true', 'blob-field'='picture')")
sql(
"INSERT INTO t VALUES (1, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "'),"
+ "(5, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "'),"
+ "(2, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "'),"
+ "(3, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "'),"
+ "(4, 'paimon', X'" + bytesToHex(blobDescriptor.serialize()) + "')")
+
+ sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='true')")
val newDescriptorBytes =
sql("SELECT picture FROM t WHERE id = 1").collect()(0).get(0).asInstanceOf[Array[Byte]]
val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes)
@@ -132,10 +134,11 @@ class BlobTestBase extends PaimonSparkTestBase {
sql(
"CREATE TABLE IF NOT EXISTS t (\n" + "id STRING,\n" + "name STRING,\n" + "file_size STRING,\n" + "crc64 STRING,\n" + "modified_time STRING,\n" + "content BINARY\n" + ") \n" +
"PARTITIONED BY (ds STRING, batch STRING) \n" +
- "TBLPROPERTIES ('comment' = 'blob table','partition.expiration-time' = '365 d','row-tracking.enabled' = 'true','data-evolution.enabled' = 'true','blob-field' = 'content','blob-as-descriptor' = 'true')")
+ "TBLPROPERTIES ('comment' = 'blob table','partition.expiration-time' = '365 d','row-tracking.enabled' = 'true','data-evolution.enabled' = 'true','blob-field' = 'content')")
sql(
"INSERT OVERWRITE TABLE t\nPARTITION(ds= '1017',batch = 'test') VALUES \n('1','paimon','1024','12345678','20241017',X'" + bytesToHex(
blobDescriptor.serialize()) + "')")
+ sql("ALTER TABLE t SET TBLPROPERTIES ('blob-as-descriptor'='true')")
val newDescriptorBytes =
sql("SELECT content FROM t WHERE id = '1'").collect()(0).get(0).asInstanceOf[Array[Byte]]
val newBlobDescriptor = BlobDescriptor.deserialize(newDescriptorBytes)