Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2254,6 +2254,14 @@ public InlineElement getDescription() {
"Specifies column names that should be stored as blob type. "
+ "This is used when you want to treat a BYTES column as a BLOB.");

public static final ConfigOption<String> BLOB_REF_FIELD =
key("blob-ref-field")
.stringType()
.noDefaultValue()
.withDescription(
"Specifies column names that should be stored as blob reference type. "
+ "This is used when you want to treat a BYTES column as a BLOB_REF.");

@Immutable
public static final ConfigOption<String> BLOB_DESCRIPTOR_FIELD =
key("blob-descriptor-field")
Expand Down Expand Up @@ -2935,7 +2943,13 @@ public Set<String> blobExternalStorageField() {
* subset of descriptor fields and therefore are also updatable.
*/
public Set<String> updatableBlobFields() {
return blobDescriptorField();
Set<String> fields = new HashSet<>(blobDescriptorField());
fields.addAll(blobRefField());
return fields;
}

public Set<String> blobRefField() {
return parseCommaSeparatedSet(BLOB_REF_FIELD);
}

/**
Expand Down Expand Up @@ -3274,6 +3288,15 @@ public static List<String> blobField(Map<String, String> options) {
return Arrays.stream(string.split(",")).map(String::trim).collect(Collectors.toList());
}

public static List<String> blobRefField(Map<String, String> options) {
String string = options.get(BLOB_REF_FIELD.key());
if (string == null) {
return Collections.emptyList();
}

return Arrays.stream(string.split(",")).map(String::trim).collect(Collectors.toList());
}

public boolean sequenceFieldSortOrderIsAscending() {
return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING;
}
Expand Down
65 changes: 65 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/types/BlobRefType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.types;

import org.apache.paimon.annotation.Public;

/**
* Data type of blob reference.
*
* <p>{@link BlobRefType} stores reference bytes inline in data files instead of writing payloads to
* Paimon-managed {@code .blob} files.
*
* @since 1.5.0
*/
@Public
public final class BlobRefType extends DataType {

private static final long serialVersionUID = 1L;

private static final String FORMAT = "BLOB_REF";

public BlobRefType(boolean isNullable) {
super(isNullable, DataTypeRoot.BLOB_REF);
}

public BlobRefType() {
this(true);
}

@Override
public int defaultSize() {
return BlobType.DEFAULT_SIZE;
}

@Override
public DataType copy(boolean isNullable) {
return new BlobRefType(isNullable);
}

@Override
public String asSQLString() {
return withNullability(FORMAT);
}

@Override
public <R> R accept(DataTypeVisitor<R> visitor) {
return visitor.visit(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public R visit(BlobType blobType) {
return defaultMethod(blobType);
}

@Override
public R visit(BlobRefType blobRefType) {
return defaultMethod(blobRefType);
}

@Override
public R visit(ArrayType arrayType) {
return defaultMethod(arrayType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ private enum Keyword {
LEGACY,
VARIANT,
BLOB,
BLOB_REF,
NOT
}

Expand Down Expand Up @@ -549,6 +550,8 @@ private DataType parseTypeByKeyword() {
return new VariantType();
case BLOB:
return new BlobType();
case BLOB_REF:
return new BlobRefType();
case VECTOR:
return parseVectorType();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public enum DataTypeRoot {

BLOB(DataTypeFamily.PREDEFINED),

BLOB_REF(DataTypeFamily.PREDEFINED),

ARRAY(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION),

VECTOR(DataTypeFamily.CONSTRUCTED, DataTypeFamily.COLLECTION),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public interface DataTypeVisitor<R> {

R visit(BlobType blobType);

R visit(BlobRefType blobRefType);

R visit(ArrayType arrayType);

R visit(VectorType vectorType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ public static BlobType BLOB() {
return new BlobType();
}

public static BlobRefType BLOB_REF() {
return new BlobRefType();
}

public static OptionalInt getPrecision(DataType dataType) {
return dataType.accept(PRECISION_EXTRACTOR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.BlobRefType;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.CharType;
Expand Down Expand Up @@ -163,6 +164,11 @@ public FieldType visit(BlobType blobType) {
throw new UnsupportedOperationException();
}

@Override
public FieldType visit(BlobRefType blobRefType) {
throw new UnsupportedOperationException();
}

private TimeUnit getTimeUnit(int precision) {
if (precision == 0) {
return TimeUnit.SECOND;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.BlobRefType;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.CharType;
Expand Down Expand Up @@ -447,6 +448,11 @@ public Arrow2PaimonVectorConverter visit(BlobType blobType) {
throw new UnsupportedOperationException();
}

@Override
public Arrow2PaimonVectorConverter visit(BlobRefType blobRefType) {
throw new UnsupportedOperationException();
}

@Override
public Arrow2PaimonVectorConverter visit(ArrayType arrayType) {
final Arrow2PaimonVectorConverter arrowVectorConvertor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.BlobRefType;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.CharType;
Expand Down Expand Up @@ -156,6 +157,11 @@ public ArrowFieldWriterFactory visit(BlobType blobType) {
throw new UnsupportedOperationException("Doesn't support BlobType.");
}

@Override
public ArrowFieldWriterFactory visit(BlobRefType blobRefType) {
throw new UnsupportedOperationException("Doesn't support BlobRefType.");
}

@Override
public ArrowFieldWriterFactory visit(ArrayType arrayType) {
ArrowFieldWriterFactory elementWriterFactory = arrayType.getElementType().accept(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ static void write(
case BLOB:
writer.writeBlob(pos, (Blob) o);
break;
case BLOB_REF:
byte[] refBytes = BlobUtils.serializeBlobReference((Blob) o);
writer.writeBinary(pos, refBytes, 0, refBytes.length);
break;
default:
throw new UnsupportedOperationException("Not support type: " + type);
}
Expand Down Expand Up @@ -241,6 +245,11 @@ static ValueSetter createValueSetter(DataType elementType, Serializer<?> seriali
return (writer, pos, value) -> writer.writeVariant(pos, (Variant) value);
case BLOB:
return (writer, pos, value) -> writer.writeBlob(pos, (Blob) value);
case BLOB_REF:
return (writer, pos, value) -> {
byte[] bytes = BlobUtils.serializeBlobReference((Blob) value);
writer.writeBinary(pos, bytes, 0, bytes.length);
};
default:
String msg =
String.format(
Expand Down
4 changes: 4 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/data/Blob.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ static Blob fromDescriptor(UriReader reader, BlobDescriptor descriptor) {
return new BlobRef(reader, descriptor);
}

static Blob fromReference(BlobReferenceResolver resolver, BlobReference reference) {
return new BlobReferenceBlob(resolver, reference);
}

static Blob fromInputStream(Supplier<SeekableInputStream> supplier) {
return new BlobStream(supplier);
}
Expand Down
Loading
Loading