diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java deleted file mode 100644 index 5a0bc9f6d3..0000000000 --- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java +++ /dev/null @@ -1,534 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.parquet; - -import java.io.Closeable; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.*; - -import scala.Option; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.parquet.HadoopReadOptions; -import org.apache.parquet.ParquetReadOptions; -import org.apache.parquet.Preconditions; -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.parquet.column.page.PageReadStore; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Type; -import org.apache.spark.TaskContext; -import org.apache.spark.TaskContext$; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.comet.parquet.CometParquetReadSupport; -import org.apache.spark.sql.comet.shims.ShimTaskMetrics; -import org.apache.spark.sql.execution.datasources.PartitionedFile; -import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter; -import org.apache.spark.sql.execution.metric.SQLMetric; -import org.apache.spark.sql.types.*; -import org.apache.spark.sql.vectorized.ColumnarBatch; -import org.apache.spark.util.AccumulatorV2; - -import org.apache.comet.CometConf; -import org.apache.comet.CometSchemaImporter; -import org.apache.comet.IcebergApi; -import org.apache.comet.shims.ShimBatchReader; -import org.apache.comet.shims.ShimFileFormat; -import org.apache.comet.vector.CometVector; - -/** - * A vectorized Parquet reader that reads a Parquet file in a batched fashion. - * - *

Example of how to use this: - * - *

- *   BatchReader reader = new BatchReader(parquetFile, batchSize);
- *   try {
- *     reader.init();
- *     while (reader.readBatch()) {
- *       ColumnarBatch batch = reader.currentBatch();
- *       // consume the batch
- *     }
- *   } finally { // resources associated with the reader should be released
- *     reader.close();
- *   }
- * 
- * - * @deprecated since 0.14.0. This class is kept for Iceberg compatibility only. - */ -@Deprecated -@IcebergApi -public class BatchReader extends RecordReader implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(FileReader.class); - protected static final BufferAllocator ALLOCATOR = new RootAllocator(); - - private Configuration conf; - private int capacity; - private boolean isCaseSensitive; - private boolean useFieldId; - private boolean ignoreMissingIds; - private StructType partitionSchema; - private InternalRow partitionValues; - private PartitionedFile file; - protected Map metrics; - - private long rowsRead; - protected StructType sparkSchema; - private MessageType requestedSchema; - protected CometVector[] vectors; - protected AbstractColumnReader[] columnReaders; - private CometSchemaImporter importer; - protected ColumnarBatch currentBatch; - private FileReader fileReader; - private boolean[] missingColumns; - protected boolean isInitialized; - private ParquetMetadata footer; - - /** The total number of rows across all row groups of the input split. */ - private long totalRowCount; - - /** - * The total number of rows loaded so far, including all the rows from row groups that we've - * processed and the current row group. - */ - private long totalRowsLoaded; - - /** - * Whether the native scan should always return decimal represented by 128 bits, regardless of its - * precision. Normally, this should be true if native execution is enabled, since Arrow compute - * kernels doesn't support 32 and 64 bit decimals yet. - */ - private boolean useDecimal128; - - /** Whether to use the lazy materialization reader for reading columns. */ - private boolean useLazyMaterialization; - - /** - * Whether to return dates/timestamps that were written with legacy hybrid (Julian + Gregorian) - * calendar as it is. If this is true, Comet will return them as it is, instead of rebasing them - * to the new Proleptic Gregorian calendar. If this is false, Comet will throw exceptions when - * seeing these dates/timestamps. - */ - private boolean useLegacyDateTimestamp; - - /** The TaskContext object for executing this task. */ - private TaskContext taskContext; - - public BatchReader() {} - - // Only for testing - public BatchReader(String file, int capacity) { - this(file, capacity, null, null); - } - - // Only for testing - public BatchReader( - String file, int capacity, StructType partitionSchema, InternalRow partitionValues) { - this(new Configuration(), file, capacity, partitionSchema, partitionValues); - } - - // Only for testing - public BatchReader( - Configuration conf, - String file, - int capacity, - StructType partitionSchema, - InternalRow partitionValues) { - conf.set("spark.sql.parquet.binaryAsString", "false"); - conf.set("spark.sql.parquet.int96AsTimestamp", "false"); - conf.set("spark.sql.caseSensitive", "false"); - conf.set("spark.sql.parquet.inferTimestampNTZ.enabled", "true"); - conf.set("spark.sql.legacy.parquet.nanosAsLong", "false"); - - this.conf = conf; - this.capacity = capacity; - this.isCaseSensitive = false; - this.useFieldId = false; - this.ignoreMissingIds = false; - this.partitionSchema = partitionSchema; - this.partitionValues = partitionValues; - - this.file = ShimBatchReader.newPartitionedFile(partitionValues, file); - this.metrics = new HashMap<>(); - - this.taskContext = TaskContext$.MODULE$.get(); - } - - /** - * @see Comet Issue #2079 - */ - @IcebergApi - public BatchReader(AbstractColumnReader[] columnReaders) { - // Todo: set useDecimal128 and useLazyMaterialization - int numColumns = columnReaders.length; - this.columnReaders = new AbstractColumnReader[numColumns]; - vectors = new CometVector[numColumns]; - currentBatch = new ColumnarBatch(vectors); - // This constructor is used by Iceberg only. The columnReaders are - // initialized in Iceberg, so no need to call the init() - isInitialized = true; - this.taskContext = TaskContext$.MODULE$.get(); - this.metrics = new HashMap<>(); - } - - BatchReader( - Configuration conf, - PartitionedFile inputSplit, - ParquetMetadata footer, - int capacity, - StructType sparkSchema, - boolean isCaseSensitive, - boolean useFieldId, - boolean ignoreMissingIds, - boolean useLegacyDateTimestamp, - StructType partitionSchema, - InternalRow partitionValues, - Map metrics) { - this.conf = conf; - this.capacity = capacity; - this.sparkSchema = sparkSchema; - this.isCaseSensitive = isCaseSensitive; - this.useFieldId = useFieldId; - this.ignoreMissingIds = ignoreMissingIds; - this.useLegacyDateTimestamp = useLegacyDateTimestamp; - this.partitionSchema = partitionSchema; - this.partitionValues = partitionValues; - this.file = inputSplit; - this.footer = footer; - this.metrics = metrics; - this.taskContext = TaskContext$.MODULE$.get(); - } - - /** - * Initialize this reader. The reason we don't do it in the constructor is that we want to close - * any resource hold by this reader when error happens during the initialization. - */ - public void init() throws URISyntaxException, IOException { - useDecimal128 = - conf.getBoolean( - CometConf.COMET_USE_DECIMAL_128().key(), - (Boolean) CometConf.COMET_USE_DECIMAL_128().defaultValue().get()); - useLazyMaterialization = - conf.getBoolean( - CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), - (Boolean) CometConf.COMET_USE_LAZY_MATERIALIZATION().defaultValue().get()); - - long start = file.start(); - long length = file.length(); - String filePath = file.filePath().toString(); - - ParquetReadOptions.Builder builder = HadoopReadOptions.builder(conf, new Path(filePath)); - - if (start >= 0 && length >= 0) { - builder = builder.withRange(start, start + length); - } - ParquetReadOptions readOptions = builder.build(); - - // TODO: enable off-heap buffer when they are ready - ReadOptions cometReadOptions = ReadOptions.builder(conf).build(); - - Path path = new Path(new URI(filePath)); - fileReader = - new FileReader( - CometInputFile.fromPath(path, conf), footer, readOptions, cometReadOptions, metrics); - requestedSchema = fileReader.getFileMetaData().getSchema(); - MessageType fileSchema = requestedSchema; - - if (sparkSchema == null) { - sparkSchema = new ParquetToSparkSchemaConverter(conf).convert(requestedSchema); - } else { - requestedSchema = - CometParquetReadSupport.clipParquetSchema( - requestedSchema, sparkSchema, isCaseSensitive, useFieldId, ignoreMissingIds); - if (requestedSchema.getFieldCount() != sparkSchema.size()) { - throw new IllegalArgumentException( - String.format( - "Spark schema has %d columns while " + "Parquet schema has %d columns", - sparkSchema.size(), requestedSchema.getColumns().size())); - } - } - - totalRowCount = fileReader.getRecordCount(); - List columns = requestedSchema.getColumns(); - int numColumns = columns.size(); - if (partitionSchema != null) numColumns += partitionSchema.size(); - columnReaders = new AbstractColumnReader[numColumns]; - - // Initialize missing columns and use null vectors for them - missingColumns = new boolean[columns.size()]; - List paths = requestedSchema.getPaths(); - // We do not need the column index of the row index; but this method has the - // side effect of throwing an exception if a column with the same name is - // found which we do want (spark unit tests explicitly test for that). - ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema); - StructField[] nonPartitionFields = sparkSchema.fields(); - for (int i = 0; i < requestedSchema.getFieldCount(); i++) { - Type t = requestedSchema.getFields().get(i); - Preconditions.checkState( - t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED), - "Complex type is not supported"); - String[] colPath = paths.get(i); - if (nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME())) { - // Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always populated with - // generated row indexes, rather than read from the file. - // TODO(SPARK-40059): Allow users to include columns named - // FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in their schemas. - long[] rowIndices = fileReader.getRowIndices(); - columnReaders[i] = new RowIndexColumnReader(nonPartitionFields[i], capacity, rowIndices); - missingColumns[i] = true; - } else if (fileSchema.containsPath(colPath)) { - ColumnDescriptor fd = fileSchema.getColumnDescription(colPath); - if (!fd.equals(columns.get(i))) { - throw new UnsupportedOperationException("Schema evolution is not supported"); - } - missingColumns[i] = false; - } else { - if (columns.get(i).getMaxDefinitionLevel() == 0) { - throw new IOException( - "Required column '" - + Arrays.toString(colPath) - + "' is missing" - + " in data file " - + filePath); - } - ConstantColumnReader reader = - new ConstantColumnReader(nonPartitionFields[i], capacity, useDecimal128); - columnReaders[i] = reader; - missingColumns[i] = true; - } - } - - // Initialize constant readers for partition columns - if (partitionSchema != null) { - StructField[] partitionFields = partitionSchema.fields(); - for (int i = columns.size(); i < columnReaders.length; i++) { - int fieldIndex = i - columns.size(); - StructField field = partitionFields[fieldIndex]; - ConstantColumnReader reader = - new ConstantColumnReader(field, capacity, partitionValues, fieldIndex, useDecimal128); - columnReaders[i] = reader; - } - } - - vectors = new CometVector[numColumns]; - currentBatch = new ColumnarBatch(vectors); - fileReader.setRequestedSchema(requestedSchema.getColumns()); - - // For test purpose only - // If the last external accumulator is `NumRowGroupsAccumulator`, the row group number to read - // will be updated to the accumulator. So we can check if the row groups are filtered or not - // in test case. - // Note that this tries to get thread local TaskContext object, if this is called at other - // thread, it won't update the accumulator. - if (taskContext != null) { - Option> accu = - ShimTaskMetrics.getTaskAccumulator(taskContext.taskMetrics()); - if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) { - @SuppressWarnings("unchecked") - AccumulatorV2 intAccum = (AccumulatorV2) accu.get(); - intAccum.add(fileReader.getRowGroups().size()); - } - } - - isInitialized = true; - } - - /** - * @see Comet Issue #2079 - */ - @IcebergApi - public void setSparkSchema(StructType schema) { - this.sparkSchema = schema; - } - - /** - * @see Comet Issue #2079 - */ - @IcebergApi - public AbstractColumnReader[] getColumnReaders() { - return columnReaders; - } - - @Override - public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - // Do nothing. The initialization work is done in 'init' already. - } - - @Override - public boolean nextKeyValue() throws IOException { - return nextBatch(); - } - - @Override - public Void getCurrentKey() { - return null; - } - - @Override - public ColumnarBatch getCurrentValue() { - return currentBatch(); - } - - @Override - public float getProgress() { - return (float) rowsRead / totalRowCount; - } - - /** - * Returns the current columnar batch being read. - * - *

Note that this must be called AFTER {@link BatchReader#nextBatch()}. - */ - public ColumnarBatch currentBatch() { - return currentBatch; - } - - /** - * Loads the next batch of rows. - * - * @return true if there are no more rows to read, false otherwise. - */ - public boolean nextBatch() throws IOException { - Preconditions.checkState(isInitialized, "init() should be called first!"); - - if (rowsRead >= totalRowCount) return false; - boolean hasMore; - - try { - hasMore = loadNextRowGroupIfNecessary(); - } catch (RuntimeException e) { - // Spark will check certain exception e.g. `SchemaColumnConvertNotSupportedException`. - throw e; - } catch (Throwable e) { - throw new IOException(e); - } - - if (!hasMore) return false; - int batchSize = (int) Math.min(capacity, totalRowsLoaded - rowsRead); - - return nextBatch(batchSize); - } - - @IcebergApi - public boolean nextBatch(int batchSize) { - long totalDecodeTime = 0, totalLoadTime = 0; - for (int i = 0; i < columnReaders.length; i++) { - AbstractColumnReader reader = columnReaders[i]; - long startNs = System.nanoTime(); - reader.readBatch(batchSize); - totalDecodeTime += System.nanoTime() - startNs; - startNs = System.nanoTime(); - vectors[i] = reader.currentBatch(); - totalLoadTime += System.nanoTime() - startNs; - } - - SQLMetric decodeMetric = metrics.get("ParquetNativeDecodeTime"); - if (decodeMetric != null) { - decodeMetric.add(totalDecodeTime); - } - SQLMetric loadMetric = metrics.get("ParquetNativeLoadTime"); - if (loadMetric != null) { - loadMetric.add(totalLoadTime); - } - - currentBatch.setNumRows(batchSize); - rowsRead += batchSize; - return true; - } - - @IcebergApi - @Override - public void close() throws IOException { - if (columnReaders != null) { - for (AbstractColumnReader reader : columnReaders) { - if (reader != null) { - reader.close(); - } - } - } - if (fileReader != null) { - fileReader.close(); - fileReader = null; - } - if (importer != null) { - importer.close(); - importer = null; - } - } - - private boolean loadNextRowGroupIfNecessary() throws Throwable { - // More rows can be read from loaded row group. No need to load next one. - if (rowsRead != totalRowsLoaded) return true; - - SQLMetric rowGroupTimeMetric = metrics.get("ParquetLoadRowGroupTime"); - SQLMetric numRowGroupsMetric = metrics.get("ParquetRowGroups"); - long startNs = System.nanoTime(); - - PageReadStore rowGroupReader = fileReader.readNextRowGroup(); - - if (rowGroupTimeMetric != null) { - rowGroupTimeMetric.add(System.nanoTime() - startNs); - } - if (rowGroupReader == null) { - return false; - } - if (numRowGroupsMetric != null) { - numRowGroupsMetric.add(1); - } - - if (importer != null) importer.close(); - importer = new CometSchemaImporter(ALLOCATOR); - - List columns = requestedSchema.getColumns(); - for (int i = 0; i < columns.size(); i++) { - if (missingColumns[i]) continue; - if (columnReaders[i] != null) columnReaders[i].close(); - // TODO: handle tz, datetime & int96 rebase - // TODO: consider passing page reader via ctor - however we need to fix the shading issue - // from Iceberg side. - DataType dataType = sparkSchema.fields()[i].dataType(); - ColumnReader reader = - Utils.getColumnReader( - dataType, - columns.get(i), - importer, - capacity, - useDecimal128, - useLazyMaterialization, - useLegacyDateTimestamp); - reader.setPageReader(rowGroupReader.getPageReader(columns.get(i))); - columnReaders[i] = reader; - } - totalRowsLoaded += rowGroupReader.getRowCount(); - return true; - } -} diff --git a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java deleted file mode 100644 index 1d4ec84c2c..0000000000 --- a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.parquet; - -import java.math.BigInteger; - -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.spark.sql.catalyst.InternalRow; -import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns; -import org.apache.spark.sql.types.*; -import org.apache.spark.unsafe.types.UTF8String; - -import org.apache.comet.IcebergApi; - -/** - * A column reader that always return constant vectors. Used for reading partition columns, for - * instance. - */ -@IcebergApi -public class ConstantColumnReader extends MetadataColumnReader { - /** Whether all the values in this constant column are nulls */ - private boolean isNull; - - /** The constant value in the format of Object that are used to initialize this column reader. */ - private Object value; - - ConstantColumnReader(StructField field, int batchSize, boolean useDecimal128) { - this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, useDecimal128); - this.value = - ResolveDefaultColumns.getExistenceDefaultValues(new StructType(new StructField[] {field}))[ - 0]; - init(value); - } - - ConstantColumnReader( - StructField field, int batchSize, InternalRow values, int index, boolean useDecimal128) { - this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, useDecimal128); - init(values, index); - } - - /** - * @see Comet Issue #2079 - */ - @IcebergApi - public ConstantColumnReader( - DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) { - super(type, descriptor, useDecimal128, true); - this.value = value; - } - - // Used by Iceberg - @IcebergApi - public ConstantColumnReader( - DataType type, ParquetColumnSpec spec, Object value, boolean useDecimal128) { - super(type, spec, useDecimal128, true); - this.value = value; - } - - ConstantColumnReader( - DataType type, ColumnDescriptor descriptor, int batchSize, boolean useDecimal128) { - super(type, descriptor, useDecimal128, true); - this.batchSize = batchSize; - initNative(); - } - - @Override - public void setBatchSize(int batchSize) { - super.setBatchSize(batchSize); - init(value); - } - - @Override - public void readBatch(int total) { - super.readBatch(total); - if (isNull) setNumNulls(total); - } - - private void init(InternalRow values, int index) { - Object value = values.get(index, type); - init(value); - } - - private void init(Object value) { - if (value == null) { - Native.setNull(nativeHandle); - isNull = true; - } else if (type == DataTypes.BooleanType) { - Native.setBoolean(nativeHandle, (boolean) value); - } else if (type == DataTypes.ByteType) { - Native.setByte(nativeHandle, (byte) value); - } else if (type == DataTypes.ShortType) { - Native.setShort(nativeHandle, (short) value); - } else if (type == DataTypes.IntegerType) { - Native.setInt(nativeHandle, (int) value); - } else if (type == DataTypes.LongType) { - Native.setLong(nativeHandle, (long) value); - } else if (type == DataTypes.FloatType) { - Native.setFloat(nativeHandle, (float) value); - } else if (type == DataTypes.DoubleType) { - Native.setDouble(nativeHandle, (double) value); - } else if (type == DataTypes.BinaryType) { - Native.setBinary(nativeHandle, (byte[]) value); - } else if (type == DataTypes.StringType) { - Native.setBinary(nativeHandle, ((UTF8String) value).getBytes()); - } else if (type == DataTypes.DateType) { - Native.setInt(nativeHandle, (int) value); - } else if (type == DataTypes.TimestampType || type == TimestampNTZType$.MODULE$) { - Native.setLong(nativeHandle, (long) value); - } else if (type instanceof DecimalType) { - DecimalType dt = (DecimalType) type; - Decimal d = (Decimal) value; - if (!useDecimal128 && dt.precision() <= Decimal.MAX_INT_DIGITS()) { - Native.setInt(nativeHandle, ((int) d.toUnscaledLong())); - } else if (!useDecimal128 && dt.precision() <= Decimal.MAX_LONG_DIGITS()) { - Native.setLong(nativeHandle, d.toUnscaledLong()); - } else { - final BigInteger integer = d.toJavaBigDecimal().unscaledValue(); - byte[] bytes = integer.toByteArray(); - Native.setDecimal(nativeHandle, bytes); - } - } else { - throw new UnsupportedOperationException("Unsupported Spark type: " + type); - } - } -} diff --git a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java deleted file mode 100644 index bd66f2deab..0000000000 --- a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.parquet; - -import java.util.HashMap; - -import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.vectorized.ColumnarBatch; - -import org.apache.comet.IcebergApi; -import org.apache.comet.vector.CometVector; - -/** This class is a public interface used by Apache Iceberg to read batches using Comet */ -@IcebergApi -public class IcebergCometBatchReader extends BatchReader { - public IcebergCometBatchReader(int numColumns, StructType schema) { - this.columnReaders = new AbstractColumnReader[numColumns]; - this.vectors = new CometVector[numColumns]; - this.currentBatch = new ColumnarBatch(vectors); - this.metrics = new HashMap<>(); - this.sparkSchema = schema; - } - - public void init(AbstractColumnReader[] columnReaders) { - this.columnReaders = columnReaders; - this.isInitialized = true; - } -} diff --git a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java deleted file mode 100644 index ace1ab4164..0000000000 --- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.comet.parquet; - -import org.apache.arrow.c.ArrowArray; -import org.apache.arrow.c.ArrowSchema; -import org.apache.arrow.c.Data; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.FieldVector; -import org.apache.parquet.column.ColumnDescriptor; -import org.apache.spark.sql.types.DataType; - -import org.apache.comet.IcebergApi; -import org.apache.comet.vector.CometPlainVector; -import org.apache.comet.vector.CometVector; - -/** A metadata column reader that can be extended by {@link RowIndexColumnReader} etc. */ -@IcebergApi -public class MetadataColumnReader extends AbstractColumnReader { - private final BufferAllocator allocator = new RootAllocator(); - - private CometVector vector; - - private ArrowArray array = null; - private ArrowSchema schema = null; - - private boolean isConstant; - - /** - * @see Comet Issue #2079 - */ - @IcebergApi - public MetadataColumnReader( - DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) { - // TODO: should we handle legacy dates & timestamps for metadata columns? - super(type, descriptor, useDecimal128, false); - - this.isConstant = isConstant; - } - - // Used by Iceberg - @IcebergApi - public MetadataColumnReader( - DataType type, ParquetColumnSpec spec, boolean useDecimal128, boolean isConstant) { - // TODO: should we handle legacy dates & timestamps for metadata columns? - super(type, Utils.buildColumnDescriptor(spec), useDecimal128, false); - - this.isConstant = isConstant; - } - - @Override - public void setBatchSize(int batchSize) { - close(); - super.setBatchSize(batchSize); - } - - @IcebergApi - @Override - public void readBatch(int total) { - if (vector == null) { - array = ArrowArray.allocateNew(allocator); - schema = ArrowSchema.allocateNew(allocator); - - long arrayAddr = array.memoryAddress(); - long schemaAddr = schema.memoryAddress(); - - Native.currentBatch(nativeHandle, arrayAddr, schemaAddr); - FieldVector fieldVector = Data.importVector(allocator, array, schema, null); - vector = new CometPlainVector(fieldVector, useDecimal128, false, isConstant); - } - - vector.setNumValues(total); - } - - void setNumNulls(int total) { - vector.setNumNulls(total); - } - - @IcebergApi - @Override - public CometVector currentBatch() { - return vector; - } - - @Override - public void close() { - if (vector != null) { - vector.close(); - vector = null; - } - super.close(); - } -} diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java b/common/src/main/java/org/apache/comet/parquet/Native.java index babd0d392c..fb9cedc1e2 100644 --- a/common/src/main/java/org/apache/comet/parquet/Native.java +++ b/common/src/main/java/org/apache/comet/parquet/Native.java @@ -200,39 +200,6 @@ public static native void setPageV2( */ public static native void currentBatch(long handle, long arrayAddr, long schemaAddr); - /** Set methods to set a constant value for the reader, so it'll return constant vectors */ - public static native void setNull(long handle); - - public static native void setBoolean(long handle, boolean value); - - public static native void setByte(long handle, byte value); - - public static native void setShort(long handle, short value); - - public static native void setInt(long handle, int value); - - public static native void setLong(long handle, long value); - - public static native void setFloat(long handle, float value); - - public static native void setDouble(long handle, double value); - - public static native void setBinary(long handle, byte[] value); - - /** Set decimal backed by FixedLengthByteArray */ - public static native void setDecimal(long handle, byte[] value); - - /** Set position of row index vector for Iceberg Metadata Column */ - @IcebergApi - public static native void setPosition(long handle, long value, int size); - - /** Set row index vector for Spark row index metadata column and return vector size */ - public static native int setIndices(long handle, long offset, int size, long[] indices); - - /** Set deleted info for Iceberg Metadata Column */ - @IcebergApi - public static native void setIsDeleted(long handle, boolean[] isDeleted); - /** * Closes the native Parquet column reader and releases all resources associated with it. * diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java index 32edcb2640..9413b9316d 100644 --- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java +++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java @@ -89,7 +89,7 @@ *

Example of how to use this: * *

- *   BatchReader reader = new BatchReader(parquetFile, batchSize);
+ *   NativeBatchReader reader = new NativeBatchReader(parquetFile, batchSize);
  *   try {
  *     reader.init();
  *     while (reader.readBatch()) {
diff --git a/common/src/main/java/org/apache/comet/parquet/RowIndexColumnReader.java b/common/src/main/java/org/apache/comet/parquet/RowIndexColumnReader.java
deleted file mode 100644
index 46e6ee67f5..0000000000
--- a/common/src/main/java/org/apache/comet/parquet/RowIndexColumnReader.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.parquet;
-
-import org.apache.spark.sql.types.*;
-
-/**
- * A column reader that returns the row index vector. Used for reading row index metadata column for
- * Spark 3.4+. The row index can be accessed by {@code _tmp_metadata_row_index} column.
- */
-public class RowIndexColumnReader extends MetadataColumnReader {
-  /** The row indices that are used to initialize this column reader. */
-  private final long[] indices;
-
-  /** The current number of indices to skip reading from {@code indices}. */
-  private long offset;
-
-  public RowIndexColumnReader(StructField field, int batchSize, long[] indices) {
-    super(field.dataType(), TypeUtil.convertToParquet(field), false, false);
-    this.indices = indices;
-    setBatchSize(batchSize);
-  }
-
-  @Override
-  public void readBatch(int total) {
-    Native.resetBatch(nativeHandle);
-    int count = Native.setIndices(nativeHandle, offset, total, indices);
-    offset += count;
-
-    super.readBatch(count);
-  }
-}
diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java
index cb5f4997dd..87cecdc65d 100644
--- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java
+++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java
@@ -57,9 +57,8 @@ public static ColumnDescriptor convertToParquet(StructField field) {
     DataType type = field.dataType();
 
     Types.PrimitiveBuilder builder = null;
-    // Only partition column can be `NullType`, which also uses `ConstantColumnReader`. Here we
-    // piggy-back onto Parquet boolean type for constant vector of null values, we don't really
-    // care what Parquet type it is.
+    // Only partition column can be `NullType`. Here we piggy-back onto Parquet boolean type
+    // for constant vector of null values, we don't really care what Parquet type it is.
     if (type == DataTypes.BooleanType || type == DataTypes.NullType) {
       builder = Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition);
     } else if (type == DataTypes.IntegerType || type instanceof YearMonthIntervalType) {
diff --git a/common/src/test/java/org/apache/comet/parquet/TestColumnReader.java b/common/src/test/java/org/apache/comet/parquet/TestColumnReader.java
index 32accae8e5..2052222ecd 100644
--- a/common/src/test/java/org/apache/comet/parquet/TestColumnReader.java
+++ b/common/src/test/java/org/apache/comet/parquet/TestColumnReader.java
@@ -19,13 +19,6 @@
 
 package org.apache.comet.parquet;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.BiFunction;
-
 import org.junit.Test;
 
 import org.apache.arrow.memory.BufferAllocator;
@@ -34,145 +27,13 @@
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.types.*;
-import org.apache.spark.sql.vectorized.ColumnVector;
 
 import org.apache.comet.vector.CometPlainVector;
 import org.apache.comet.vector.CometVector;
 
-import static org.apache.spark.sql.types.DataTypes.*;
 import static org.junit.Assert.*;
-import static scala.jdk.javaapi.CollectionConverters.*;
 
-@SuppressWarnings("unchecked")
 public class TestColumnReader {
-  private static final int BATCH_SIZE = 1024;
-  private static final List TYPES =
-      Arrays.asList(
-          BooleanType,
-          ByteType,
-          ShortType,
-          IntegerType,
-          LongType,
-          FloatType,
-          DoubleType,
-          BinaryType,
-          DecimalType.apply(5, 2),
-          DecimalType.apply(18, 10),
-          DecimalType.apply(19, 5));
-  private static final List VALUES =
-      Arrays.asList(
-          true,
-          (byte) 42,
-          (short) 100,
-          1000,
-          (long) 10000,
-          (float) 3.14,
-          3.1415926,
-          new byte[] {1, 2, 3, 4, 5, 6, 7, 8},
-          Decimal.apply("123.45"),
-          Decimal.apply("00.0123456789"),
-          Decimal.apply("-001234.56789"));
-  private static final List> GETTERS =
-      Arrays.asList(
-          ColumnVector::getBoolean,
-          ColumnVector::getByte,
-          ColumnVector::getShort,
-          ColumnVector::getInt,
-          ColumnVector::getLong,
-          ColumnVector::getFloat,
-          ColumnVector::getDouble,
-          ColumnVector::getBinary,
-          (v, i) -> v.getDecimal(i, 5, 2),
-          (v, i) -> v.getDecimal(i, 18, 10),
-          (v, i) -> v.getDecimal(i, 19, 5));
-
-  @Test
-  public void testConstantVectors() {
-    for (int i = 0; i < TYPES.size(); i++) {
-      DataType type = TYPES.get(i);
-      StructField field = StructField.apply("f", type, false, null);
-
-      List values = Collections.singletonList(VALUES.get(i));
-      InternalRow row = GenericInternalRow.apply(asScala(values).toSeq());
-      ConstantColumnReader reader = new ConstantColumnReader(field, BATCH_SIZE, row, 0, true);
-      reader.readBatch(BATCH_SIZE);
-      CometVector vector = reader.currentBatch();
-      assertEquals(BATCH_SIZE, vector.numValues());
-      assertEquals(0, vector.numNulls());
-      for (int j = 0; j < BATCH_SIZE; j++) {
-        if (TYPES.get(i) == BinaryType || TYPES.get(i) == StringType) {
-          assertArrayEquals((byte[]) VALUES.get(i), (byte[]) GETTERS.get(i).apply(vector, j));
-        } else {
-          assertEquals(VALUES.get(i), GETTERS.get(i).apply(vector, j));
-        }
-      }
-
-      // Test null values too
-      row.setNullAt(0);
-      reader = new ConstantColumnReader(field, BATCH_SIZE, row, 0, true);
-      reader.readBatch(BATCH_SIZE);
-      vector = reader.currentBatch();
-      assertEquals(BATCH_SIZE, vector.numValues());
-      assertEquals(BATCH_SIZE, vector.numNulls());
-      for (int j = 0; j < BATCH_SIZE; j++) {
-        assertTrue(vector.isNullAt(j));
-      }
-    }
-
-    if (org.apache.spark.package$.MODULE$.SPARK_VERSION_SHORT().compareTo("3.4") >= 0) {
-      Metadata meta = new MetadataBuilder().putString("EXISTS_DEFAULT", "123").build();
-      StructField field = StructField.apply("f", LongType, false, meta);
-      ConstantColumnReader reader = new ConstantColumnReader(field, BATCH_SIZE, true);
-      reader.readBatch(BATCH_SIZE);
-      CometVector vector = reader.currentBatch();
-
-      assertEquals(BATCH_SIZE, vector.numValues());
-      assertEquals(0, vector.numNulls());
-      for (int j = 0; j < BATCH_SIZE; j++) {
-        assertEquals(123, vector.getLong(j));
-      }
-    }
-  }
-
-  @Test
-  public void testRowIndexColumnVectors() {
-    StructField field = StructField.apply("f", LongType, false, null);
-    int bigBatchSize = BATCH_SIZE * 2;
-    int step = 4;
-    int batchSize = bigBatchSize / step;
-    long[] indices = new long[step * 2];
-    List expected = new ArrayList<>();
-
-    long idx = 0, len = 0;
-    for (int i = 0; i < step; i++) {
-      idx = ThreadLocalRandom.current().nextLong(idx + len, Long.MAX_VALUE);
-      indices[i * 2] = idx;
-      len = ThreadLocalRandom.current().nextLong(Long.max(bigBatchSize - expected.size(), 0));
-      indices[i * 2 + 1] = len;
-      for (int j = 0; j < len; j++) {
-        expected.add(idx + j);
-      }
-    }
-
-    RowIndexColumnReader reader = new RowIndexColumnReader(field, BATCH_SIZE, indices);
-    for (int i = 0; i < step; i++) {
-      reader.readBatch(batchSize);
-      CometVector vector = reader.currentBatch();
-      assertEquals(
-          Integer.min(batchSize, Integer.max(expected.size() - i * batchSize, 0)),
-          vector.numValues());
-      assertEquals(0, vector.numNulls());
-      for (int j = 0; j < vector.numValues(); j++) {
-        assertEquals((long) expected.get(i * batchSize + j), vector.getLong(j));
-      }
-    }
-
-    reader.close();
-  }
-
   @Test
   public void testIsFixedLength() {
     BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index f2b0e80ab2..11b352eda1 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -42,7 +42,7 @@ use arrow::ffi::FFI_ArrowArray;
 use jni::JNIEnv;
 use jni::{
     objects::{GlobalRef, JByteBuffer, JClass},
-    sys::{jboolean, jbyte, jdouble, jfloat, jint, jlong, jshort},
+    sys::{jboolean, jint, jlong},
 };
 
 use self::util::jni::TypePromotionInfo;
@@ -65,9 +65,7 @@ use datafusion::execution::SendableRecordBatchStream;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::{SessionConfig, SessionContext};
 use futures::{poll, StreamExt};
-use jni::objects::{
-    JBooleanArray, JByteArray, JLongArray, JMap, JObject, JObjectArray, JString, ReleaseMode,
-};
+use jni::objects::{JByteArray, JLongArray, JMap, JObject, JObjectArray, JString, ReleaseMode};
 use jni::sys::{jintArray, JNI_FALSE};
 use object_store::path::Path;
 use read::ColumnReader;
@@ -275,229 +273,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_setPageV2(
     })
 }
 
-#[no_mangle]
-pub extern "system" fn Java_org_apache_comet_parquet_Native_setNull(
-    env: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-) {
-    try_unwrap_or_throw(&env, |_| {
-        let reader = get_reader(handle)?;
-        reader.set_null();
-        Ok(())
-    })
-}
-
-#[no_mangle]
-pub extern "system" fn Java_org_apache_comet_parquet_Native_setBoolean(
-    env: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-    value: jboolean,
-) {
-    try_unwrap_or_throw(&env, |_| {
-        let reader = get_reader(handle)?;
-        reader.set_boolean(value != 0);
-        Ok(())
-    })
-}
-
-#[no_mangle]
-pub extern "system" fn Java_org_apache_comet_parquet_Native_setByte(
-    env: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-    value: jbyte,
-) {
-    try_unwrap_or_throw(&env, |_| {
-        let reader = get_reader(handle)?;
-        reader.set_fixed::(value);
-        Ok(())
-    })
-}
-
-#[no_mangle]
-pub extern "system" fn Java_org_apache_comet_parquet_Native_setShort(
-    env: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-    value: jshort,
-) {
-    try_unwrap_or_throw(&env, |_| {
-        let reader = get_reader(handle)?;
-        reader.set_fixed::(value);
-        Ok(())
-    })
-}
-
-#[no_mangle]
-pub extern "system" fn Java_org_apache_comet_parquet_Native_setInt(
-    env: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-    value: jint,
-) {
-    try_unwrap_or_throw(&env, |_| {
-        let reader = get_reader(handle)?;
-        reader.set_fixed::(value);
-        Ok(())
-    })
-}
-
-#[no_mangle]
-pub extern "system" fn Java_org_apache_comet_parquet_Native_setLong(
-    env: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-    value: jlong,
-) {
-    try_unwrap_or_throw(&env, |_| {
-        let reader = get_reader(handle)?;
-        reader.set_fixed::(value);
-        Ok(())
-    })
-}
-
-#[no_mangle]
-pub extern "system" fn Java_org_apache_comet_parquet_Native_setFloat(
-    env: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-    value: jfloat,
-) {
-    try_unwrap_or_throw(&env, |_| {
-        let reader = get_reader(handle)?;
-        reader.set_fixed::(value);
-        Ok(())
-    })
-}
-
-#[no_mangle]
-pub extern "system" fn Java_org_apache_comet_parquet_Native_setDouble(
-    env: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-    value: jdouble,
-) {
-    try_unwrap_or_throw(&env, |_| {
-        let reader = get_reader(handle)?;
-        reader.set_fixed::(value);
-        Ok(())
-    })
-}
-
-/// # Safety
-/// This function is inheritly unsafe since it deals with raw pointers passed from JNI.
-#[no_mangle]
-pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_setBinary(
-    e: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-    value: JByteArray,
-) {
-    try_unwrap_or_throw(&e, |env| {
-        let reader = get_reader(handle)?;
-
-        let len = env.get_array_length(&value)?;
-        let mut buffer = MutableBuffer::from_len_zeroed(len as usize);
-        env.get_byte_array_region(&value, 0, from_u8_slice(buffer.as_slice_mut()))?;
-        reader.set_binary(buffer);
-        Ok(())
-    })
-}
-
-/// # Safety
-/// This function is inheritly unsafe since it deals with raw pointers passed from JNI.
-#[no_mangle]
-pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_setDecimal(
-    e: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-    value: JByteArray,
-) {
-    try_unwrap_or_throw(&e, |env| {
-        let reader = get_reader(handle)?;
-
-        let len = env.get_array_length(&value)?;
-        let mut buffer = MutableBuffer::from_len_zeroed(len as usize);
-        env.get_byte_array_region(&value, 0, from_u8_slice(buffer.as_slice_mut()))?;
-        reader.set_decimal_flba(buffer);
-        Ok(())
-    })
-}
-
-#[no_mangle]
-pub extern "system" fn Java_org_apache_comet_parquet_Native_setPosition(
-    env: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-    value: jlong,
-    size: jint,
-) {
-    try_unwrap_or_throw(&env, |_| {
-        let reader = get_reader(handle)?;
-        reader.set_position(value, size as usize);
-        Ok(())
-    })
-}
-
-/// # Safety
-/// This function is inheritly unsafe since it deals with raw pointers passed from JNI.
-#[no_mangle]
-pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_setIndices(
-    e: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-    offset: jlong,
-    batch_size: jint,
-    indices: JLongArray,
-) -> jlong {
-    try_unwrap_or_throw(&e, |mut env| {
-        let reader = get_reader(handle)?;
-        let indices = unsafe { env.get_array_elements(&indices, ReleaseMode::NoCopyBack)? };
-        let len = indices.len();
-        // paris alternately contains start index and length of continuous indices
-        let pairs = unsafe { core::slice::from_raw_parts_mut(indices.as_ptr(), len) };
-        let mut skipped = 0;
-        let mut filled = 0;
-        for i in (0..len).step_by(2) {
-            let index = pairs[i];
-            let count = pairs[i + 1];
-            let skip = std::cmp::min(count, offset - skipped);
-            skipped += skip;
-            if count == skip {
-                continue;
-            } else if batch_size as i64 == filled {
-                break;
-            }
-            let count = std::cmp::min(count - skip, batch_size as i64 - filled);
-            filled += count;
-            reader.set_position(index + skip, count as usize);
-        }
-        Ok(filled)
-    })
-}
-
-/// # Safety
-/// This function is inheritly unsafe since it deals with raw pointers passed from JNI.
-#[no_mangle]
-pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_setIsDeleted(
-    e: JNIEnv,
-    _jclass: JClass,
-    handle: jlong,
-    is_deleted: JBooleanArray,
-) {
-    try_unwrap_or_throw(&e, |env| {
-        let reader = get_reader(handle)?;
-
-        let len = env.get_array_length(&is_deleted)?;
-        let mut buffer = MutableBuffer::from_len_zeroed(len as usize);
-        env.get_boolean_array_region(&is_deleted, 0, buffer.as_slice_mut())?;
-        reader.set_is_deleted(buffer);
-        Ok(())
-    })
-}
-
 #[no_mangle]
 pub extern "system" fn Java_org_apache_comet_parquet_Native_resetBatch(
     env: JNIEnv,
diff --git a/native/core/src/parquet/read/column.rs b/native/core/src/parquet/read/column.rs
index c2b64bba51..cdbb76bbf0 100644
--- a/native/core/src/parquet/read/column.rs
+++ b/native/core/src/parquet/read/column.rs
@@ -19,8 +19,8 @@ use std::{marker::PhantomData, sync::Arc};
 
 use arrow::{
     array::ArrayData,
-    buffer::{Buffer, MutableBuffer},
-    datatypes::{ArrowNativeType, DataType as ArrowDataType, TimeUnit},
+    buffer::Buffer,
+    datatypes::{DataType as ArrowDataType, TimeUnit},
 };
 
 use parquet::{
@@ -28,9 +28,7 @@ use parquet::{
     schema::types::{ColumnDescPtr, ColumnDescriptor},
 };
 
-use crate::parquet::{
-    data_type::*, read::DECIMAL_BYTE_WIDTH, util::jni::TypePromotionInfo, ParquetMutableVector,
-};
+use crate::parquet::{data_type::*, util::jni::TypePromotionInfo, ParquetMutableVector};
 
 use super::{
     levels::LevelDecoder,
@@ -38,7 +36,7 @@ use super::{
     ReadOptions,
 };
 
-use crate::common::{bit, bit::log2};
+use crate::common::bit::log2;
 use crate::execution::operators::ExecutionError;
 
 /// Maximum number of decimal digits an i32 can represent
@@ -575,41 +573,6 @@ impl ColumnReader {
         )
     }
 
-    #[inline]
-    pub fn set_null(&mut self) {
-        make_func_mut!(self, set_null)
-    }
-
-    #[inline]
-    pub fn set_boolean(&mut self, value: bool) {
-        make_func_mut!(self, set_boolean, value)
-    }
-
-    #[inline]
-    pub fn set_fixed(&mut self, value: U) {
-        make_func_mut!(self, set_fixed, value)
-    }
-
-    #[inline]
-    pub fn set_binary(&mut self, value: MutableBuffer) {
-        make_func_mut!(self, set_binary, value)
-    }
-
-    #[inline]
-    pub fn set_decimal_flba(&mut self, value: MutableBuffer) {
-        make_func_mut!(self, set_decimal_flba, value)
-    }
-
-    #[inline]
-    pub fn set_position(&mut self, value: i64, size: usize) {
-        make_func_mut!(self, set_position, value, size)
-    }
-
-    #[inline]
-    pub fn set_is_deleted(&mut self, value: MutableBuffer) {
-        make_func_mut!(self, set_is_deleted, value)
-    }
-
     #[inline]
     pub fn reset_batch(&mut self) {
         make_func_mut!(self, reset_batch)
@@ -647,9 +610,6 @@ pub struct TypedColumnReader {
     capacity: usize,
     /// Number of bits used to represent one value in Parquet.
     bit_width: usize,
-    /// Whether this is a constant column reader (always return constant vector).
-    is_const: bool,
-
     // Options for reading Parquet
     read_options: ReadOptions,
 
@@ -676,7 +636,6 @@ impl TypedColumnReader {
             vector,
             capacity,
             bit_width,
-            is_const: false,
             read_options,
             _phantom: PhantomData,
         }
@@ -857,128 +816,6 @@ impl TypedColumnReader {
         self.value_decoder = Some(value_decoder);
     }
 
-    /// Sets all values in the vector of this column reader to be null.
-    pub fn set_null(&mut self) {
-        self.check_const("set_null");
-        self.vector.put_nulls(self.capacity);
-    }
-
-    /// Sets all values in the vector of this column reader to be `value`.
-    pub fn set_boolean(&mut self, value: bool) {
-        self.check_const("set_boolean");
-        if value {
-            let dst = self.vector.value_buffer.as_slice_mut();
-            bit::set_bits(dst, 0, self.capacity);
-        }
-        self.vector.num_values += self.capacity;
-    }
-
-    /// Sets all values in the vector of this column reader to be `value`.
-    pub fn set_fixed(&mut self, value: U) {
-        self.check_const("set_fixed");
-        let type_size = std::mem::size_of::();
-
-        let mut offset = 0;
-        for _ in 0..self.capacity {
-            bit::memcpy_value(&value, type_size, &mut self.vector.value_buffer[offset..]);
-            offset += type_size;
-        }
-        self.vector.num_values += self.capacity;
-    }
-
-    /// Sets all values in the vector of this column reader to be binary represented by `buffer`.
-    pub fn set_binary(&mut self, buffer: MutableBuffer) {
-        self.check_const("set_binary");
-
-        // TODO: consider using dictionary here
-
-        let len = buffer.len();
-        let total_len = len * self.capacity;
-        let offset_buf = self.vector.value_buffer.as_slice_mut();
-        let child_vector = &mut self.vector.children[0];
-        let value_buf = &mut child_vector.value_buffer;
-
-        value_buf.resize(total_len);
-
-        let mut value_buf_offset = 0;
-        let mut offset_buf_offset = 4;
-        for _ in 0..self.capacity {
-            bit::memcpy(&buffer, &mut value_buf.as_slice_mut()[value_buf_offset..]);
-            value_buf_offset += len;
-
-            bit::memcpy_value(
-                &(value_buf_offset as i32),
-                4,
-                &mut offset_buf[offset_buf_offset..],
-            );
-            offset_buf_offset += 4;
-        }
-        self.vector.num_values += self.capacity;
-    }
-
-    /// Sets all values in the vector of this column reader to be decimal represented by `buffer`.
-    pub fn set_decimal_flba(&mut self, buffer: MutableBuffer) {
-        self.check_const("set_decimal_flba");
-
-        // TODO: consider using dictionary here
-
-        let len = buffer.len();
-        let mut bytes: [u8; DECIMAL_BYTE_WIDTH] = [0; DECIMAL_BYTE_WIDTH];
-
-        for i in 0..len {
-            bytes[len - i - 1] = buffer[i];
-        }
-        if bytes[len - 1] & 0x80 == 0x80 {
-            bytes[len..DECIMAL_BYTE_WIDTH].fill(0xff);
-        }
-
-        let mut offset = 0;
-        for _ in 0..self.capacity {
-            bit::memcpy(&bytes, &mut self.vector.value_buffer[offset..]);
-            offset += DECIMAL_BYTE_WIDTH;
-        }
-        self.vector.num_values += self.capacity;
-    }
-
-    /// Sets position values of this column reader to the vector starting from `value`.
-    pub fn set_position(&mut self, value: i64, size: usize) {
-        let i64_size = std::mem::size_of::();
-
-        let mut offset = self.vector.num_values * i64_size;
-        for i in value..(value + size as i64) {
-            // TODO: is it better to convert self.value_buffer to &mut [i64] and for-loop update?
-            bit::memcpy_value(&i, i64_size, &mut self.vector.value_buffer[offset..]);
-            offset += i64_size;
-        }
-        self.vector.num_values += size;
-    }
-
-    /// Sets the values in the vector of this column reader to be a boolean array represented
-    /// by `buffer`.
-    pub fn set_is_deleted(&mut self, buffer: MutableBuffer) {
-        let len = buffer.len();
-        let dst = self.vector.value_buffer.as_slice_mut();
-        for i in 0..len {
-            if buffer[i] == 1 {
-                bit::set_bit(dst, i);
-            } else if buffer[i] == 0 {
-                bit::unset_bit(dst, i);
-            }
-        }
-        self.vector.num_values += len;
-    }
-
-    /// Check a few pre-conditions for setting constants, as well as setting
-    /// that `is_const` to true for the particular column reader.
-    fn check_const(&mut self, method_name: &str) {
-        assert!(
-            self.value_decoder.is_none(),
-            "{method_name} cannot be called after set_page_v1/set_page_v2!"
-        );
-        assert!(!self.is_const, "can only set constant once!");
-        self.is_const = true;
-    }
-
     fn check_dictionary(&mut self, encoding: &Encoding) {
         // The column has a dictionary while the new page is of PLAIN encoding. In this case, we
         // should eagerly decode all the dictionary indices and convert the underlying vector to a
diff --git a/native/core/src/parquet/read/mod.rs b/native/core/src/parquet/read/mod.rs
index 5a55f21170..f66f9da3da 100644
--- a/native/core/src/parquet/read/mod.rs
+++ b/native/core/src/parquet/read/mod.rs
@@ -27,9 +27,6 @@ use crate::common::bit::{self, BitReader};
 use arrow::buffer::Buffer;
 use bytes::Buf;
 
-/// Number of bytes to store a decimal value in Arrow value vector
-pub(crate) const DECIMAL_BYTE_WIDTH: usize = 16;
-
 #[derive(Clone, Copy)]
 pub struct ReadOptions {
     // Whether to read legacy dates/timestamps as it is. If false, throw exceptions.