From a36b4f70788c4a912ab5df942d3bdeca71eebc55 Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Tue, 19 May 2026 17:16:34 +0800 Subject: [PATCH 1/3] feat: enhance CometPlainVector to support variable width vectors --- .../apache/comet/vector/CometPlainVector.java | 19 +++-- .../comet/vector/TestCometPlainVector.java | 75 +++++++++++++++++++ 2 files changed, 87 insertions(+), 7 deletions(-) create mode 100644 spark/src/test/java/org/apache/comet/vector/TestCometPlainVector.java diff --git a/spark/src/main/java/org/apache/comet/vector/CometPlainVector.java b/spark/src/main/java/org/apache/comet/vector/CometPlainVector.java index 2a30be1b1c..2c57740553 100644 --- a/spark/src/main/java/org/apache/comet/vector/CometPlainVector.java +++ b/spark/src/main/java/org/apache/comet/vector/CometPlainVector.java @@ -33,6 +33,7 @@ /** A column vector whose elements are plainly decoded. */ public class CometPlainVector extends CometDecodedVector { private final long valueBufferAddress; + private final long offsetBufferAddress; private final boolean isBaseFixedWidthVector; private byte booleanByteCache; @@ -59,6 +60,12 @@ public CometPlainVector( } isBaseFixedWidthVector = valueVector instanceof BaseFixedWidthVector; + if (vector instanceof BaseVariableWidthVector) { + this.offsetBufferAddress = + ((BaseVariableWidthVector) vector).getOffsetBuffer().memoryAddress(); + } else { + this.offsetBufferAddress = -1; + } this.isReused = isReused; } @@ -124,13 +131,11 @@ public double getDouble(int rowId) { @Override public UTF8String getUTF8String(int rowId) { if (isNullAt(rowId)) return null; - if (!isBaseFixedWidthVector) { - BaseVariableWidthVector varWidthVector = (BaseVariableWidthVector) valueVector; - long offsetBufferAddress = varWidthVector.getOffsetBuffer().memoryAddress(); + if (offsetBufferAddress != -1) { int offset = Platform.getInt(null, offsetBufferAddress + rowId * 4L); int length = Platform.getInt(null, offsetBufferAddress + (rowId + 1L) * 4L) - offset; return UTF8String.fromAddress(null, valueBufferAddress + offset, length); - } else { + } else if (isBaseFixedWidthVector) { BaseFixedWidthVector fixedWidthVector = (BaseFixedWidthVector) valueVector; int length = fixedWidthVector.getTypeWidth(); int offset = rowId * length; @@ -143,6 +148,8 @@ public UTF8String getUTF8String(int rowId) { } else { return UTF8String.fromString(convertToUuid(result).toString()); } + } else { + throw new RuntimeException("Unsupported UTF8 vector type: " + valueVector.getName()); } } @@ -151,9 +158,7 @@ public byte[] getBinary(int rowId) { if (isNullAt(rowId)) return null; int offset; int length; - if (valueVector instanceof BaseVariableWidthVector) { - BaseVariableWidthVector varWidthVector = (BaseVariableWidthVector) valueVector; - long offsetBufferAddress = varWidthVector.getOffsetBuffer().memoryAddress(); + if (offsetBufferAddress != -1) { offset = Platform.getInt(null, offsetBufferAddress + rowId * 4L); length = Platform.getInt(null, offsetBufferAddress + (rowId + 1L) * 4L) - offset; } else if (valueVector instanceof BaseFixedWidthVector) { diff --git a/spark/src/test/java/org/apache/comet/vector/TestCometPlainVector.java b/spark/src/test/java/org/apache/comet/vector/TestCometPlainVector.java new file mode 100644 index 0000000000..e0bbb74c33 --- /dev/null +++ b/spark/src/test/java/org/apache/comet/vector/TestCometPlainVector.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet.vector; + +import org.junit.Test; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class TestCometPlainVector { + + @Test + public void testGetUTF8StringWithVariableWidthVector() { + try (RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE)) { + VarCharVector vector = new VarCharVector("strings", allocator); + vector.allocateNew(); + vector.setSafe(0, bytes("alpha")); + vector.setSafe(1, bytes("")); + vector.setSafe(2, bytes("spark")); + vector.setValueCount(4); // row 3 is null (validity bit not set) + + try (CometPlainVector cv = new CometPlainVector(vector, false)) { + assertEquals("alpha", cv.getUTF8String(0).toString()); + assertEquals("", cv.getUTF8String(1).toString()); + assertEquals("spark", cv.getUTF8String(2).toString()); + assertNull(cv.getUTF8String(3)); + } + } + } + + @Test + public void testGetBinaryWithVariableWidthVector() { + try (RootAllocator allocator = new RootAllocator(Integer.MAX_VALUE)) { + VarBinaryVector vector = new VarBinaryVector("bytes", allocator); + vector.allocateNew(); + vector.setSafe(0, new byte[] {1, 2, 3}, 0, 3); + vector.setSafe(1, new byte[0], 0, 0); + vector.setSafe(2, new byte[] {4, 5}, 0, 2); + vector.setValueCount(4); // row 3 is null (validity bit not set) + + try (CometPlainVector cv = new CometPlainVector(vector, false)) { + assertArrayEquals(new byte[] {1, 2, 3}, cv.getBinary(0)); + assertArrayEquals(new byte[0], cv.getBinary(1)); + assertArrayEquals(new byte[] {4, 5}, cv.getBinary(2)); + assertNull(cv.getBinary(3)); + } + } + } + + private static byte[] bytes(String s) { + return s.getBytes(); + } +} From b957bf2c98ae5ec4dac1b558e8b35d70d5c38d01 Mon Sep 17 00:00:00 2001 From: ChenChen Lai <72776271+0lai0@users.noreply.github.com> Date: Thu, 21 May 2026 13:36:04 +0800 Subject: [PATCH 2/3] Update spark/src/test/java/org/apache/comet/vector/TestCometPlainVector.java Co-authored-by: Andy Grove --- .../test/java/org/apache/comet/vector/TestCometPlainVector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/java/org/apache/comet/vector/TestCometPlainVector.java b/spark/src/test/java/org/apache/comet/vector/TestCometPlainVector.java index e0bbb74c33..4d5aa23a61 100644 --- a/spark/src/test/java/org/apache/comet/vector/TestCometPlainVector.java +++ b/spark/src/test/java/org/apache/comet/vector/TestCometPlainVector.java @@ -70,6 +70,6 @@ public void testGetBinaryWithVariableWidthVector() { } private static byte[] bytes(String s) { - return s.getBytes(); + return s.getBytes(StandardCharsets.UTF_8); } } From 0a2c84f75306fdaf9ae58aef24f8c4fa6d261253 Mon Sep 17 00:00:00 2001 From: 0lai0 Date: Thu, 21 May 2026 16:47:20 +0800 Subject: [PATCH 3/3] add import --- .../test/java/org/apache/comet/vector/TestCometPlainVector.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/test/java/org/apache/comet/vector/TestCometPlainVector.java b/spark/src/test/java/org/apache/comet/vector/TestCometPlainVector.java index 4d5aa23a61..79faa04c55 100644 --- a/spark/src/test/java/org/apache/comet/vector/TestCometPlainVector.java +++ b/spark/src/test/java/org/apache/comet/vector/TestCometPlainVector.java @@ -19,6 +19,8 @@ package org.apache.comet.vector; +import java.nio.charset.StandardCharsets; + import org.junit.Test; import org.apache.arrow.memory.RootAllocator;