diff --git a/common/src/main/java/org/apache/comet/parquet/BatchReader.java b/common/src/main/java/org/apache/comet/parquet/BatchReader.java
deleted file mode 100644
index 5a0bc9f6d3..0000000000
--- a/common/src/main/java/org/apache/comet/parquet/BatchReader.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.parquet;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
-
-import scala.Option;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.parquet.HadoopReadOptions;
-import org.apache.parquet.ParquetReadOptions;
-import org.apache.parquet.Preconditions;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReadStore;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.Type;
-import org.apache.spark.TaskContext;
-import org.apache.spark.TaskContext$;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.comet.parquet.CometParquetReadSupport;
-import org.apache.spark.sql.comet.shims.ShimTaskMetrics;
-import org.apache.spark.sql.execution.datasources.PartitionedFile;
-import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter;
-import org.apache.spark.sql.execution.metric.SQLMetric;
-import org.apache.spark.sql.types.*;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-import org.apache.spark.util.AccumulatorV2;
-
-import org.apache.comet.CometConf;
-import org.apache.comet.CometSchemaImporter;
-import org.apache.comet.IcebergApi;
-import org.apache.comet.shims.ShimBatchReader;
-import org.apache.comet.shims.ShimFileFormat;
-import org.apache.comet.vector.CometVector;
-
-/**
- * A vectorized Parquet reader that reads a Parquet file in a batched fashion.
- *
- *
Example of how to use this:
- *
- *
- * BatchReader reader = new BatchReader(parquetFile, batchSize);
- * try {
- * reader.init();
- * while (reader.readBatch()) {
- * ColumnarBatch batch = reader.currentBatch();
- * // consume the batch
- * }
- * } finally { // resources associated with the reader should be released
- * reader.close();
- * }
- *
- *
- * @deprecated since 0.14.0. This class is kept for Iceberg compatibility only.
- */
-@Deprecated
-@IcebergApi
-public class BatchReader extends RecordReader implements Closeable {
- private static final Logger LOG = LoggerFactory.getLogger(FileReader.class);
- protected static final BufferAllocator ALLOCATOR = new RootAllocator();
-
- private Configuration conf;
- private int capacity;
- private boolean isCaseSensitive;
- private boolean useFieldId;
- private boolean ignoreMissingIds;
- private StructType partitionSchema;
- private InternalRow partitionValues;
- private PartitionedFile file;
- protected Map metrics;
-
- private long rowsRead;
- protected StructType sparkSchema;
- private MessageType requestedSchema;
- protected CometVector[] vectors;
- protected AbstractColumnReader[] columnReaders;
- private CometSchemaImporter importer;
- protected ColumnarBatch currentBatch;
- private FileReader fileReader;
- private boolean[] missingColumns;
- protected boolean isInitialized;
- private ParquetMetadata footer;
-
- /** The total number of rows across all row groups of the input split. */
- private long totalRowCount;
-
- /**
- * The total number of rows loaded so far, including all the rows from row groups that we've
- * processed and the current row group.
- */
- private long totalRowsLoaded;
-
- /**
- * Whether the native scan should always return decimal represented by 128 bits, regardless of its
- * precision. Normally, this should be true if native execution is enabled, since Arrow compute
- * kernels doesn't support 32 and 64 bit decimals yet.
- */
- private boolean useDecimal128;
-
- /** Whether to use the lazy materialization reader for reading columns. */
- private boolean useLazyMaterialization;
-
- /**
- * Whether to return dates/timestamps that were written with legacy hybrid (Julian + Gregorian)
- * calendar as it is. If this is true, Comet will return them as it is, instead of rebasing them
- * to the new Proleptic Gregorian calendar. If this is false, Comet will throw exceptions when
- * seeing these dates/timestamps.
- */
- private boolean useLegacyDateTimestamp;
-
- /** The TaskContext object for executing this task. */
- private TaskContext taskContext;
-
- public BatchReader() {}
-
- // Only for testing
- public BatchReader(String file, int capacity) {
- this(file, capacity, null, null);
- }
-
- // Only for testing
- public BatchReader(
- String file, int capacity, StructType partitionSchema, InternalRow partitionValues) {
- this(new Configuration(), file, capacity, partitionSchema, partitionValues);
- }
-
- // Only for testing
- public BatchReader(
- Configuration conf,
- String file,
- int capacity,
- StructType partitionSchema,
- InternalRow partitionValues) {
- conf.set("spark.sql.parquet.binaryAsString", "false");
- conf.set("spark.sql.parquet.int96AsTimestamp", "false");
- conf.set("spark.sql.caseSensitive", "false");
- conf.set("spark.sql.parquet.inferTimestampNTZ.enabled", "true");
- conf.set("spark.sql.legacy.parquet.nanosAsLong", "false");
-
- this.conf = conf;
- this.capacity = capacity;
- this.isCaseSensitive = false;
- this.useFieldId = false;
- this.ignoreMissingIds = false;
- this.partitionSchema = partitionSchema;
- this.partitionValues = partitionValues;
-
- this.file = ShimBatchReader.newPartitionedFile(partitionValues, file);
- this.metrics = new HashMap<>();
-
- this.taskContext = TaskContext$.MODULE$.get();
- }
-
- /**
- * @see Comet Issue #2079
- */
- @IcebergApi
- public BatchReader(AbstractColumnReader[] columnReaders) {
- // Todo: set useDecimal128 and useLazyMaterialization
- int numColumns = columnReaders.length;
- this.columnReaders = new AbstractColumnReader[numColumns];
- vectors = new CometVector[numColumns];
- currentBatch = new ColumnarBatch(vectors);
- // This constructor is used by Iceberg only. The columnReaders are
- // initialized in Iceberg, so no need to call the init()
- isInitialized = true;
- this.taskContext = TaskContext$.MODULE$.get();
- this.metrics = new HashMap<>();
- }
-
- BatchReader(
- Configuration conf,
- PartitionedFile inputSplit,
- ParquetMetadata footer,
- int capacity,
- StructType sparkSchema,
- boolean isCaseSensitive,
- boolean useFieldId,
- boolean ignoreMissingIds,
- boolean useLegacyDateTimestamp,
- StructType partitionSchema,
- InternalRow partitionValues,
- Map metrics) {
- this.conf = conf;
- this.capacity = capacity;
- this.sparkSchema = sparkSchema;
- this.isCaseSensitive = isCaseSensitive;
- this.useFieldId = useFieldId;
- this.ignoreMissingIds = ignoreMissingIds;
- this.useLegacyDateTimestamp = useLegacyDateTimestamp;
- this.partitionSchema = partitionSchema;
- this.partitionValues = partitionValues;
- this.file = inputSplit;
- this.footer = footer;
- this.metrics = metrics;
- this.taskContext = TaskContext$.MODULE$.get();
- }
-
- /**
- * Initialize this reader. The reason we don't do it in the constructor is that we want to close
- * any resource hold by this reader when error happens during the initialization.
- */
- public void init() throws URISyntaxException, IOException {
- useDecimal128 =
- conf.getBoolean(
- CometConf.COMET_USE_DECIMAL_128().key(),
- (Boolean) CometConf.COMET_USE_DECIMAL_128().defaultValue().get());
- useLazyMaterialization =
- conf.getBoolean(
- CometConf.COMET_USE_LAZY_MATERIALIZATION().key(),
- (Boolean) CometConf.COMET_USE_LAZY_MATERIALIZATION().defaultValue().get());
-
- long start = file.start();
- long length = file.length();
- String filePath = file.filePath().toString();
-
- ParquetReadOptions.Builder builder = HadoopReadOptions.builder(conf, new Path(filePath));
-
- if (start >= 0 && length >= 0) {
- builder = builder.withRange(start, start + length);
- }
- ParquetReadOptions readOptions = builder.build();
-
- // TODO: enable off-heap buffer when they are ready
- ReadOptions cometReadOptions = ReadOptions.builder(conf).build();
-
- Path path = new Path(new URI(filePath));
- fileReader =
- new FileReader(
- CometInputFile.fromPath(path, conf), footer, readOptions, cometReadOptions, metrics);
- requestedSchema = fileReader.getFileMetaData().getSchema();
- MessageType fileSchema = requestedSchema;
-
- if (sparkSchema == null) {
- sparkSchema = new ParquetToSparkSchemaConverter(conf).convert(requestedSchema);
- } else {
- requestedSchema =
- CometParquetReadSupport.clipParquetSchema(
- requestedSchema, sparkSchema, isCaseSensitive, useFieldId, ignoreMissingIds);
- if (requestedSchema.getFieldCount() != sparkSchema.size()) {
- throw new IllegalArgumentException(
- String.format(
- "Spark schema has %d columns while " + "Parquet schema has %d columns",
- sparkSchema.size(), requestedSchema.getColumns().size()));
- }
- }
-
- totalRowCount = fileReader.getRecordCount();
- List columns = requestedSchema.getColumns();
- int numColumns = columns.size();
- if (partitionSchema != null) numColumns += partitionSchema.size();
- columnReaders = new AbstractColumnReader[numColumns];
-
- // Initialize missing columns and use null vectors for them
- missingColumns = new boolean[columns.size()];
- List paths = requestedSchema.getPaths();
- // We do not need the column index of the row index; but this method has the
- // side effect of throwing an exception if a column with the same name is
- // found which we do want (spark unit tests explicitly test for that).
- ShimFileFormat.findRowIndexColumnIndexInSchema(sparkSchema);
- StructField[] nonPartitionFields = sparkSchema.fields();
- for (int i = 0; i < requestedSchema.getFieldCount(); i++) {
- Type t = requestedSchema.getFields().get(i);
- Preconditions.checkState(
- t.isPrimitive() && !t.isRepetition(Type.Repetition.REPEATED),
- "Complex type is not supported");
- String[] colPath = paths.get(i);
- if (nonPartitionFields[i].name().equals(ShimFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME())) {
- // Values of ROW_INDEX_TEMPORARY_COLUMN_NAME column are always populated with
- // generated row indexes, rather than read from the file.
- // TODO(SPARK-40059): Allow users to include columns named
- // FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME in their schemas.
- long[] rowIndices = fileReader.getRowIndices();
- columnReaders[i] = new RowIndexColumnReader(nonPartitionFields[i], capacity, rowIndices);
- missingColumns[i] = true;
- } else if (fileSchema.containsPath(colPath)) {
- ColumnDescriptor fd = fileSchema.getColumnDescription(colPath);
- if (!fd.equals(columns.get(i))) {
- throw new UnsupportedOperationException("Schema evolution is not supported");
- }
- missingColumns[i] = false;
- } else {
- if (columns.get(i).getMaxDefinitionLevel() == 0) {
- throw new IOException(
- "Required column '"
- + Arrays.toString(colPath)
- + "' is missing"
- + " in data file "
- + filePath);
- }
- ConstantColumnReader reader =
- new ConstantColumnReader(nonPartitionFields[i], capacity, useDecimal128);
- columnReaders[i] = reader;
- missingColumns[i] = true;
- }
- }
-
- // Initialize constant readers for partition columns
- if (partitionSchema != null) {
- StructField[] partitionFields = partitionSchema.fields();
- for (int i = columns.size(); i < columnReaders.length; i++) {
- int fieldIndex = i - columns.size();
- StructField field = partitionFields[fieldIndex];
- ConstantColumnReader reader =
- new ConstantColumnReader(field, capacity, partitionValues, fieldIndex, useDecimal128);
- columnReaders[i] = reader;
- }
- }
-
- vectors = new CometVector[numColumns];
- currentBatch = new ColumnarBatch(vectors);
- fileReader.setRequestedSchema(requestedSchema.getColumns());
-
- // For test purpose only
- // If the last external accumulator is `NumRowGroupsAccumulator`, the row group number to read
- // will be updated to the accumulator. So we can check if the row groups are filtered or not
- // in test case.
- // Note that this tries to get thread local TaskContext object, if this is called at other
- // thread, it won't update the accumulator.
- if (taskContext != null) {
- Option> accu =
- ShimTaskMetrics.getTaskAccumulator(taskContext.taskMetrics());
- if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
- @SuppressWarnings("unchecked")
- AccumulatorV2 intAccum = (AccumulatorV2) accu.get();
- intAccum.add(fileReader.getRowGroups().size());
- }
- }
-
- isInitialized = true;
- }
-
- /**
- * @see Comet Issue #2079
- */
- @IcebergApi
- public void setSparkSchema(StructType schema) {
- this.sparkSchema = schema;
- }
-
- /**
- * @see Comet Issue #2079
- */
- @IcebergApi
- public AbstractColumnReader[] getColumnReaders() {
- return columnReaders;
- }
-
- @Override
- public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
- throws IOException, InterruptedException {
- // Do nothing. The initialization work is done in 'init' already.
- }
-
- @Override
- public boolean nextKeyValue() throws IOException {
- return nextBatch();
- }
-
- @Override
- public Void getCurrentKey() {
- return null;
- }
-
- @Override
- public ColumnarBatch getCurrentValue() {
- return currentBatch();
- }
-
- @Override
- public float getProgress() {
- return (float) rowsRead / totalRowCount;
- }
-
- /**
- * Returns the current columnar batch being read.
- *
- *
Note that this must be called AFTER {@link BatchReader#nextBatch()}.
- */
- public ColumnarBatch currentBatch() {
- return currentBatch;
- }
-
- /**
- * Loads the next batch of rows.
- *
- * @return true if there are no more rows to read, false otherwise.
- */
- public boolean nextBatch() throws IOException {
- Preconditions.checkState(isInitialized, "init() should be called first!");
-
- if (rowsRead >= totalRowCount) return false;
- boolean hasMore;
-
- try {
- hasMore = loadNextRowGroupIfNecessary();
- } catch (RuntimeException e) {
- // Spark will check certain exception e.g. `SchemaColumnConvertNotSupportedException`.
- throw e;
- } catch (Throwable e) {
- throw new IOException(e);
- }
-
- if (!hasMore) return false;
- int batchSize = (int) Math.min(capacity, totalRowsLoaded - rowsRead);
-
- return nextBatch(batchSize);
- }
-
- @IcebergApi
- public boolean nextBatch(int batchSize) {
- long totalDecodeTime = 0, totalLoadTime = 0;
- for (int i = 0; i < columnReaders.length; i++) {
- AbstractColumnReader reader = columnReaders[i];
- long startNs = System.nanoTime();
- reader.readBatch(batchSize);
- totalDecodeTime += System.nanoTime() - startNs;
- startNs = System.nanoTime();
- vectors[i] = reader.currentBatch();
- totalLoadTime += System.nanoTime() - startNs;
- }
-
- SQLMetric decodeMetric = metrics.get("ParquetNativeDecodeTime");
- if (decodeMetric != null) {
- decodeMetric.add(totalDecodeTime);
- }
- SQLMetric loadMetric = metrics.get("ParquetNativeLoadTime");
- if (loadMetric != null) {
- loadMetric.add(totalLoadTime);
- }
-
- currentBatch.setNumRows(batchSize);
- rowsRead += batchSize;
- return true;
- }
-
- @IcebergApi
- @Override
- public void close() throws IOException {
- if (columnReaders != null) {
- for (AbstractColumnReader reader : columnReaders) {
- if (reader != null) {
- reader.close();
- }
- }
- }
- if (fileReader != null) {
- fileReader.close();
- fileReader = null;
- }
- if (importer != null) {
- importer.close();
- importer = null;
- }
- }
-
- private boolean loadNextRowGroupIfNecessary() throws Throwable {
- // More rows can be read from loaded row group. No need to load next one.
- if (rowsRead != totalRowsLoaded) return true;
-
- SQLMetric rowGroupTimeMetric = metrics.get("ParquetLoadRowGroupTime");
- SQLMetric numRowGroupsMetric = metrics.get("ParquetRowGroups");
- long startNs = System.nanoTime();
-
- PageReadStore rowGroupReader = fileReader.readNextRowGroup();
-
- if (rowGroupTimeMetric != null) {
- rowGroupTimeMetric.add(System.nanoTime() - startNs);
- }
- if (rowGroupReader == null) {
- return false;
- }
- if (numRowGroupsMetric != null) {
- numRowGroupsMetric.add(1);
- }
-
- if (importer != null) importer.close();
- importer = new CometSchemaImporter(ALLOCATOR);
-
- List columns = requestedSchema.getColumns();
- for (int i = 0; i < columns.size(); i++) {
- if (missingColumns[i]) continue;
- if (columnReaders[i] != null) columnReaders[i].close();
- // TODO: handle tz, datetime & int96 rebase
- // TODO: consider passing page reader via ctor - however we need to fix the shading issue
- // from Iceberg side.
- DataType dataType = sparkSchema.fields()[i].dataType();
- ColumnReader reader =
- Utils.getColumnReader(
- dataType,
- columns.get(i),
- importer,
- capacity,
- useDecimal128,
- useLazyMaterialization,
- useLegacyDateTimestamp);
- reader.setPageReader(rowGroupReader.getPageReader(columns.get(i)));
- columnReaders[i] = reader;
- }
- totalRowsLoaded += rowGroupReader.getRowCount();
- return true;
- }
-}
diff --git a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java b/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
deleted file mode 100644
index 1d4ec84c2c..0000000000
--- a/common/src/main/java/org/apache/comet/parquet/ConstantColumnReader.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.parquet;
-
-import java.math.BigInteger;
-
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns;
-import org.apache.spark.sql.types.*;
-import org.apache.spark.unsafe.types.UTF8String;
-
-import org.apache.comet.IcebergApi;
-
-/**
- * A column reader that always return constant vectors. Used for reading partition columns, for
- * instance.
- */
-@IcebergApi
-public class ConstantColumnReader extends MetadataColumnReader {
- /** Whether all the values in this constant column are nulls */
- private boolean isNull;
-
- /** The constant value in the format of Object that are used to initialize this column reader. */
- private Object value;
-
- ConstantColumnReader(StructField field, int batchSize, boolean useDecimal128) {
- this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, useDecimal128);
- this.value =
- ResolveDefaultColumns.getExistenceDefaultValues(new StructType(new StructField[] {field}))[
- 0];
- init(value);
- }
-
- ConstantColumnReader(
- StructField field, int batchSize, InternalRow values, int index, boolean useDecimal128) {
- this(field.dataType(), TypeUtil.convertToParquet(field), batchSize, useDecimal128);
- init(values, index);
- }
-
- /**
- * @see Comet Issue #2079
- */
- @IcebergApi
- public ConstantColumnReader(
- DataType type, ColumnDescriptor descriptor, Object value, boolean useDecimal128) {
- super(type, descriptor, useDecimal128, true);
- this.value = value;
- }
-
- // Used by Iceberg
- @IcebergApi
- public ConstantColumnReader(
- DataType type, ParquetColumnSpec spec, Object value, boolean useDecimal128) {
- super(type, spec, useDecimal128, true);
- this.value = value;
- }
-
- ConstantColumnReader(
- DataType type, ColumnDescriptor descriptor, int batchSize, boolean useDecimal128) {
- super(type, descriptor, useDecimal128, true);
- this.batchSize = batchSize;
- initNative();
- }
-
- @Override
- public void setBatchSize(int batchSize) {
- super.setBatchSize(batchSize);
- init(value);
- }
-
- @Override
- public void readBatch(int total) {
- super.readBatch(total);
- if (isNull) setNumNulls(total);
- }
-
- private void init(InternalRow values, int index) {
- Object value = values.get(index, type);
- init(value);
- }
-
- private void init(Object value) {
- if (value == null) {
- Native.setNull(nativeHandle);
- isNull = true;
- } else if (type == DataTypes.BooleanType) {
- Native.setBoolean(nativeHandle, (boolean) value);
- } else if (type == DataTypes.ByteType) {
- Native.setByte(nativeHandle, (byte) value);
- } else if (type == DataTypes.ShortType) {
- Native.setShort(nativeHandle, (short) value);
- } else if (type == DataTypes.IntegerType) {
- Native.setInt(nativeHandle, (int) value);
- } else if (type == DataTypes.LongType) {
- Native.setLong(nativeHandle, (long) value);
- } else if (type == DataTypes.FloatType) {
- Native.setFloat(nativeHandle, (float) value);
- } else if (type == DataTypes.DoubleType) {
- Native.setDouble(nativeHandle, (double) value);
- } else if (type == DataTypes.BinaryType) {
- Native.setBinary(nativeHandle, (byte[]) value);
- } else if (type == DataTypes.StringType) {
- Native.setBinary(nativeHandle, ((UTF8String) value).getBytes());
- } else if (type == DataTypes.DateType) {
- Native.setInt(nativeHandle, (int) value);
- } else if (type == DataTypes.TimestampType || type == TimestampNTZType$.MODULE$) {
- Native.setLong(nativeHandle, (long) value);
- } else if (type instanceof DecimalType) {
- DecimalType dt = (DecimalType) type;
- Decimal d = (Decimal) value;
- if (!useDecimal128 && dt.precision() <= Decimal.MAX_INT_DIGITS()) {
- Native.setInt(nativeHandle, ((int) d.toUnscaledLong()));
- } else if (!useDecimal128 && dt.precision() <= Decimal.MAX_LONG_DIGITS()) {
- Native.setLong(nativeHandle, d.toUnscaledLong());
- } else {
- final BigInteger integer = d.toJavaBigDecimal().unscaledValue();
- byte[] bytes = integer.toByteArray();
- Native.setDecimal(nativeHandle, bytes);
- }
- } else {
- throw new UnsupportedOperationException("Unsupported Spark type: " + type);
- }
- }
-}
diff --git a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java b/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java
deleted file mode 100644
index bd66f2deab..0000000000
--- a/common/src/main/java/org/apache/comet/parquet/IcebergCometBatchReader.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.parquet;
-
-import java.util.HashMap;
-
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.vectorized.ColumnarBatch;
-
-import org.apache.comet.IcebergApi;
-import org.apache.comet.vector.CometVector;
-
-/** This class is a public interface used by Apache Iceberg to read batches using Comet */
-@IcebergApi
-public class IcebergCometBatchReader extends BatchReader {
- public IcebergCometBatchReader(int numColumns, StructType schema) {
- this.columnReaders = new AbstractColumnReader[numColumns];
- this.vectors = new CometVector[numColumns];
- this.currentBatch = new ColumnarBatch(vectors);
- this.metrics = new HashMap<>();
- this.sparkSchema = schema;
- }
-
- public void init(AbstractColumnReader[] columnReaders) {
- this.columnReaders = columnReaders;
- this.isInitialized = true;
- }
-}
diff --git a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java b/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
deleted file mode 100644
index ace1ab4164..0000000000
--- a/common/src/main/java/org/apache/comet/parquet/MetadataColumnReader.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.parquet;
-
-import org.apache.arrow.c.ArrowArray;
-import org.apache.arrow.c.ArrowSchema;
-import org.apache.arrow.c.Data;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.FieldVector;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.spark.sql.types.DataType;
-
-import org.apache.comet.IcebergApi;
-import org.apache.comet.vector.CometPlainVector;
-import org.apache.comet.vector.CometVector;
-
-/** A metadata column reader that can be extended by {@link RowIndexColumnReader} etc. */
-@IcebergApi
-public class MetadataColumnReader extends AbstractColumnReader {
- private final BufferAllocator allocator = new RootAllocator();
-
- private CometVector vector;
-
- private ArrowArray array = null;
- private ArrowSchema schema = null;
-
- private boolean isConstant;
-
- /**
- * @see Comet Issue #2079
- */
- @IcebergApi
- public MetadataColumnReader(
- DataType type, ColumnDescriptor descriptor, boolean useDecimal128, boolean isConstant) {
- // TODO: should we handle legacy dates & timestamps for metadata columns?
- super(type, descriptor, useDecimal128, false);
-
- this.isConstant = isConstant;
- }
-
- // Used by Iceberg
- @IcebergApi
- public MetadataColumnReader(
- DataType type, ParquetColumnSpec spec, boolean useDecimal128, boolean isConstant) {
- // TODO: should we handle legacy dates & timestamps for metadata columns?
- super(type, Utils.buildColumnDescriptor(spec), useDecimal128, false);
-
- this.isConstant = isConstant;
- }
-
- @Override
- public void setBatchSize(int batchSize) {
- close();
- super.setBatchSize(batchSize);
- }
-
- @IcebergApi
- @Override
- public void readBatch(int total) {
- if (vector == null) {
- array = ArrowArray.allocateNew(allocator);
- schema = ArrowSchema.allocateNew(allocator);
-
- long arrayAddr = array.memoryAddress();
- long schemaAddr = schema.memoryAddress();
-
- Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);
- FieldVector fieldVector = Data.importVector(allocator, array, schema, null);
- vector = new CometPlainVector(fieldVector, useDecimal128, false, isConstant);
- }
-
- vector.setNumValues(total);
- }
-
- void setNumNulls(int total) {
- vector.setNumNulls(total);
- }
-
- @IcebergApi
- @Override
- public CometVector currentBatch() {
- return vector;
- }
-
- @Override
- public void close() {
- if (vector != null) {
- vector.close();
- vector = null;
- }
- super.close();
- }
-}
diff --git a/common/src/main/java/org/apache/comet/parquet/Native.java b/common/src/main/java/org/apache/comet/parquet/Native.java
index babd0d392c..fb9cedc1e2 100644
--- a/common/src/main/java/org/apache/comet/parquet/Native.java
+++ b/common/src/main/java/org/apache/comet/parquet/Native.java
@@ -200,39 +200,6 @@ public static native void setPageV2(
*/
public static native void currentBatch(long handle, long arrayAddr, long schemaAddr);
- /** Set methods to set a constant value for the reader, so it'll return constant vectors */
- public static native void setNull(long handle);
-
- public static native void setBoolean(long handle, boolean value);
-
- public static native void setByte(long handle, byte value);
-
- public static native void setShort(long handle, short value);
-
- public static native void setInt(long handle, int value);
-
- public static native void setLong(long handle, long value);
-
- public static native void setFloat(long handle, float value);
-
- public static native void setDouble(long handle, double value);
-
- public static native void setBinary(long handle, byte[] value);
-
- /** Set decimal backed by FixedLengthByteArray */
- public static native void setDecimal(long handle, byte[] value);
-
- /** Set position of row index vector for Iceberg Metadata Column */
- @IcebergApi
- public static native void setPosition(long handle, long value, int size);
-
- /** Set row index vector for Spark row index metadata column and return vector size */
- public static native int setIndices(long handle, long offset, int size, long[] indices);
-
- /** Set deleted info for Iceberg Metadata Column */
- @IcebergApi
- public static native void setIsDeleted(long handle, boolean[] isDeleted);
-
/**
* Closes the native Parquet column reader and releases all resources associated with it.
*
diff --git a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
index 32edcb2640..9413b9316d 100644
--- a/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
+++ b/common/src/main/java/org/apache/comet/parquet/NativeBatchReader.java
@@ -89,7 +89,7 @@
*
Example of how to use this:
*
*
- * BatchReader reader = new BatchReader(parquetFile, batchSize);
+ * NativeBatchReader reader = new NativeBatchReader(parquetFile, batchSize);
* try {
* reader.init();
* while (reader.readBatch()) {
diff --git a/common/src/main/java/org/apache/comet/parquet/RowIndexColumnReader.java b/common/src/main/java/org/apache/comet/parquet/RowIndexColumnReader.java
deleted file mode 100644
index 46e6ee67f5..0000000000
--- a/common/src/main/java/org/apache/comet/parquet/RowIndexColumnReader.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.comet.parquet;
-
-import org.apache.spark.sql.types.*;
-
-/**
- * A column reader that returns the row index vector. Used for reading row index metadata column for
- * Spark 3.4+. The row index can be accessed by {@code _tmp_metadata_row_index} column.
- */
-public class RowIndexColumnReader extends MetadataColumnReader {
- /** The row indices that are used to initialize this column reader. */
- private final long[] indices;
-
- /** The current number of indices to skip reading from {@code indices}. */
- private long offset;
-
- public RowIndexColumnReader(StructField field, int batchSize, long[] indices) {
- super(field.dataType(), TypeUtil.convertToParquet(field), false, false);
- this.indices = indices;
- setBatchSize(batchSize);
- }
-
- @Override
- public void readBatch(int total) {
- Native.resetBatch(nativeHandle);
- int count = Native.setIndices(nativeHandle, offset, total, indices);
- offset += count;
-
- super.readBatch(count);
- }
-}
diff --git a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java
index cb5f4997dd..87cecdc65d 100644
--- a/common/src/main/java/org/apache/comet/parquet/TypeUtil.java
+++ b/common/src/main/java/org/apache/comet/parquet/TypeUtil.java
@@ -57,9 +57,8 @@ public static ColumnDescriptor convertToParquet(StructField field) {
DataType type = field.dataType();
Types.PrimitiveBuilder builder = null;
- // Only partition column can be `NullType`, which also uses `ConstantColumnReader`. Here we
- // piggy-back onto Parquet boolean type for constant vector of null values, we don't really
- // care what Parquet type it is.
+ // Only partition column can be `NullType`. Here we piggy-back onto Parquet boolean type
+ // for constant vector of null values, we don't really care what Parquet type it is.
if (type == DataTypes.BooleanType || type == DataTypes.NullType) {
builder = Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition);
} else if (type == DataTypes.IntegerType || type instanceof YearMonthIntervalType) {
diff --git a/common/src/test/java/org/apache/comet/parquet/TestColumnReader.java b/common/src/test/java/org/apache/comet/parquet/TestColumnReader.java
index 32accae8e5..2052222ecd 100644
--- a/common/src/test/java/org/apache/comet/parquet/TestColumnReader.java
+++ b/common/src/test/java/org/apache/comet/parquet/TestColumnReader.java
@@ -19,13 +19,6 @@
package org.apache.comet.parquet;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.function.BiFunction;
-
import org.junit.Test;
import org.apache.arrow.memory.BufferAllocator;
@@ -34,145 +27,13 @@
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
-import org.apache.spark.sql.types.*;
-import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.comet.vector.CometPlainVector;
import org.apache.comet.vector.CometVector;
-import static org.apache.spark.sql.types.DataTypes.*;
import static org.junit.Assert.*;
-import static scala.jdk.javaapi.CollectionConverters.*;
-@SuppressWarnings("unchecked")
public class TestColumnReader {
- private static final int BATCH_SIZE = 1024;
- private static final List TYPES =
- Arrays.asList(
- BooleanType,
- ByteType,
- ShortType,
- IntegerType,
- LongType,
- FloatType,
- DoubleType,
- BinaryType,
- DecimalType.apply(5, 2),
- DecimalType.apply(18, 10),
- DecimalType.apply(19, 5));
- private static final List