From c01b94d8e8b2ccd22a1c649cf75875be883359bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=9F=E5=BC=8B?= Date: Wed, 3 Jun 2026 21:26:23 +0800 Subject: [PATCH] Filter side files in btree global index scans --- .../btree/BTreeGlobalIndexBuilder.java | 13 +++- .../btree/BTreeGlobalIndexBuilderTest.java | 70 +++++++++++++++++++ .../table/BtreeGlobalIndexTableTest.java | 35 ++++++++++ 3 files changed, 116 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java index 9f143d6cf666..ad68d83eb340 100644 --- a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java @@ -74,8 +74,10 @@ import java.util.stream.IntStream; import static java.util.Collections.singletonList; +import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile; import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.createIndexWriter; import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas; +import static org.apache.paimon.types.VectorType.isVectorStoreFile; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Builder to build btree global index. */ @@ -143,7 +145,7 @@ public Optional>> scan() { if (snapshot == null) { return Optional.empty(); } - snapshotReader = snapshotReader.withSnapshot(snapshot); + snapshotReader = withManifestEntryFilter(snapshotReader.withSnapshot(snapshot)); Range dataRange = new Range(0, snapshot.nextRowId() - 1); return Optional.of( @@ -164,7 +166,7 @@ public Optional>> incrementalScan() { if (snapshot == null) { return Optional.empty(); } - snapshotReader = snapshotReader.withSnapshot(snapshot); + snapshotReader = withManifestEntryFilter(snapshotReader.withSnapshot(snapshot)); Preconditions.checkArgument(indexField != null, "indexField must be set before scan."); Range dataRange = new Range(0, snapshot.nextRowId() - 1); @@ -180,6 +182,13 @@ public Optional>> incrementalScan() { snapshotReader.read().dataSplits())); } + private SnapshotReader withManifestEntryFilter(SnapshotReader snapshotReader) { + return snapshotReader.withManifestEntryFilter( + entry -> + !isBlobFile(entry.file().fileName()) + && !isVectorStoreFile(entry.file().fileName())); + } + private List indexedRowRanges(Snapshot snapshot) { List ranges = new ArrayList<>(); for (IndexManifestEntry entry : diff --git a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java index 1010b8fc79d7..0c55221a50f7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java @@ -22,11 +22,14 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BlobData; import org.apache.paimon.data.GenericRow; import org.apache.paimon.index.GlobalIndexMeta; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.memory.MemorySlice; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; @@ -296,6 +299,53 @@ public void testIncrementalScanWithPartitionPredicate() throws Exception { "incrementalScan should only return the new rows in partition p0"); } + @Test + public void testScanFiltersBlobFilesByManifestEntryFilter() throws Exception { + Schema.Builder schemaBuilder = Schema.newBuilder(); + schemaBuilder.column("dt", DataTypes.STRING()); + schemaBuilder.column("f0", DataTypes.INT()); + schemaBuilder.column("f1", DataTypes.BLOB()); + schemaBuilder.option(CoreOptions.ROW_TRACKING_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true"); + schemaBuilder.option(CoreOptions.BLOB_TARGET_FILE_SIZE.key(), "1 b"); + schemaBuilder.partitionKeys(Collections.singletonList("dt")); + + catalog.createTable(identifier("BlobTable"), schemaBuilder.build(), false); + FileStoreTable table = getTable(identifier("BlobTable")); + + byte[] blobBytes = new byte[] {1}; + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + try (BatchTableWrite write = writeBuilder.newWrite()) { + for (int i = 0; i < 10; i++) { + write.write( + GenericRow.of(BinaryString.fromString("p0"), i, new BlobData(blobBytes))); + } + try (BatchTableCommit commit = writeBuilder.newCommit()) { + commit.commit(write.prepareCommit()); + } + } + + Assertions.assertTrue( + containsBlobFile(table.store().newScan().plan().files()), + "Test table should contain blob manifest entries."); + + BTreeGlobalIndexBuilder builder = new BTreeGlobalIndexBuilder(table).withIndexField("f0"); + assertNoBlobFiles( + builder.scan() + .map(Pair::getRight) + .orElseThrow( + () -> + new IllegalStateException( + "Expected scan result for blob table."))); + assertNoBlobFiles( + builder.incrementalScan() + .map(Pair::getRight) + .orElseThrow( + () -> + new IllegalStateException( + "Expected incremental scan result for blob table."))); + } + private Map>> gatherIndexMetas(FileStoreTable table) { IndexFileHandler handler = table.store().newIndexFileHandler(); @@ -319,6 +369,26 @@ private Map>> gatherIndexMetas(FileStore return metasByParts; } + private boolean containsBlobFile(List entries) { + for (ManifestEntry entry : entries) { + if ("blob".equals(entry.file().fileFormat())) { + return true; + } + } + return false; + } + + private void assertNoBlobFiles(List splits) { + for (DataSplit split : splits) { + for (DataFileMeta file : split.dataFiles()) { + Assertions.assertNotEquals( + "blob", + file.fileFormat(), + "BTree global index scan should not include blob files."); + } + } + } + private void assertFilesNonOverlapping( BinaryRow partition, List> metas) { if (metas.isEmpty()) { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java index 60eacd767bcf..80c82fd62641 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.table; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; import org.apache.paimon.globalindex.DataEvolutionBatchScan; import org.apache.paimon.globalindex.GlobalIndexResult; import org.apache.paimon.globalindex.GlobalIndexScanner; @@ -27,11 +28,15 @@ import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.sink.BatchTableCommit; +import org.apache.paimon.table.sink.BatchTableWrite; +import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ReadBuilder; import org.apache.paimon.table.source.Split; +import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.Range; import org.apache.paimon.utils.RoaringNavigableMap64; @@ -165,6 +170,36 @@ public void testMultipleBTreeIndices() throws Exception { assertThat(result).containsExactly("a200", "a56789"); } + @Test + public void testBTreeGlobalIndexOnAddedColumnContainsOldRowsAsNull() throws Exception { + long oldRowCount = 10L; + write(oldRowCount); + + catalog.alterTable(identifier(), SchemaChange.addColumn("f3", DataTypes.STRING()), false); + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier()); + BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder(); + try (BatchTableWrite write = writeBuilder.newWrite()) { + write.write( + GenericRow.of( + 100, + BinaryString.fromString("a-new"), + BinaryString.fromString("b-new"), + BinaryString.fromString("not-null"))); + try (BatchTableCommit commit = writeBuilder.newCommit()) { + commit.commit(write.prepareCommit()); + } + } + + createIndex("f3"); + + table = (FileStoreTable) catalog.getTable(identifier()); + Predicate predicate = new PredicateBuilder(table.rowType()).isNull(3); + RoaringNavigableMap64 rowIds = globalIndexScan(table, predicate); + assertNotNull(rowIds); + assertThat(rowIds.getLongCardinality()).isEqualTo(oldRowCount); + assertThat(rowIds.toRangeList()).containsExactly(new Range(0L, oldRowCount - 1)); + } + private void createIndex(String fieldName) throws Exception { createIndex(fieldName, null); }