Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.core.startree.StarTreeUtils;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorCustomConfigs;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
Expand Down Expand Up @@ -91,7 +94,7 @@ public File purgeSegment()
return null;
}

initSegmentGeneratorConfig(segmentName);
initSegmentGeneratorConfig(segmentName, segmentMetadata);

// Keep index creation time the same as original segment because both segments use the same raw data.
// This way, for REFRESH case, when new segment gets pushed to controller, we can use index creation time to
Expand Down Expand Up @@ -120,10 +123,23 @@ public File purgeSegment()
}

@VisibleForTesting
void initSegmentGeneratorConfig(String segmentName) {
void initSegmentGeneratorConfig(String segmentName, SegmentMetadataImpl segmentMetadata) {
_segmentGeneratorConfig = new SegmentGeneratorConfig(_tableConfig, _schema);
_segmentGeneratorConfig.setOutDir(_workingDir.getPath());

// When enableDynamicStarTreeCreation is false, preserve the original segment's star-tree state exactly,
// consistent with SegmentPreProcessor which skips all star-tree additions, deletions, and modifications.
// When enableDynamicStarTreeCreation is true, use the table config to allow star-tree changes.
if (!_tableConfig.getIndexingConfig().isEnableDynamicStarTreeCreation()) {
List<StarTreeV2Metadata> starTreeV2MetadataList = segmentMetadata.getStarTreeV2MetadataList();
if (starTreeV2MetadataList == null) {
_segmentGeneratorConfig.setStarTreeIndexConfigs(null);
} else {
_segmentGeneratorConfig.setStarTreeIndexConfigs(StarTreeUtils.toStarTreeIndexConfigs(starTreeV2MetadataList));
}
_segmentGeneratorConfig.setEnableDefaultStarTree(false);
}

if (_segmentGeneratorCustomConfigs != null && StringUtils.isNotEmpty(
_segmentGeneratorCustomConfigs.getSegmentName())) {
_segmentGeneratorConfig.setSegmentName(_segmentGeneratorCustomConfigs.getSegmentName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import org.apache.pinot.segment.spi.index.startree.AggregationSpec;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata;
import org.apache.pinot.spi.config.table.StarTreeAggregationConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -463,4 +466,38 @@ public static BaseProjectOperator<?> createStarTreeBasedProjectOperator(IndexSeg
}
return null;
}

/**
* Converts star-tree metadata from an existing segment back to {@link StarTreeIndexConfig} so the rebuilt segment
* preserves the original star-tree configuration exactly.
*/
public static List<StarTreeIndexConfig> toStarTreeIndexConfigs(List<StarTreeV2Metadata> metadataList) {
List<StarTreeIndexConfig> configs = new ArrayList<>(metadataList.size());
for (StarTreeV2Metadata metadata : metadataList) {
List<String> functionColumnPairs = new ArrayList<>();
List<StarTreeAggregationConfig> aggregationConfigs = new ArrayList<>();
for (Map.Entry<AggregationFunctionColumnPair, AggregationSpec> entry
: metadata.getAggregationSpecs().entrySet()) {
AggregationFunctionColumnPair pair = entry.getKey();
AggregationSpec spec = entry.getValue();
functionColumnPairs.add(pair.toColumnName());
aggregationConfigs.add(new StarTreeAggregationConfig(
pair.getColumn(),
pair.getFunctionType().getName(),
spec.getFunctionParameters().isEmpty() ? null : spec.getFunctionParameters(),
spec.getCompressionCodec(),
spec.isDeriveNumDocsPerChunk(),
spec.getIndexVersion(),
DataSizeUtils.fromBytes(spec.getTargetMaxChunkSizeBytes()),
spec.getTargetDocsPerChunk()));
}
configs.add(new StarTreeIndexConfig(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to add error handling here

metadata.getDimensionsSplitOrder(),
new ArrayList<>(metadata.getSkipStarNodeCreationForDimensions()),
functionColumnPairs.isEmpty() ? null : functionColumnPairs,
aggregationConfigs.isEmpty() ? null : aggregationConfigs,
metadata.getMaxLeafRecords()));
}
return configs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,30 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.configuration2.PropertiesConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.core.startree.StarTreeUtils;
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorCustomConfigs;
import org.apache.pinot.segment.spi.index.StandardIndexes;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
import org.apache.pinot.segment.spi.index.startree.AggregationSpec;
import org.apache.pinot.segment.spi.index.startree.StarTreeV2Metadata;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.config.table.FieldConfig.CompressionCodec;
import org.apache.pinot.spi.config.table.StarTreeAggregationConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
Expand All @@ -48,29 +59,39 @@
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;


public class SegmentPurgerTest {
private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SegmentPurgerTest");
private static final File ORIGINAL_SEGMENT_DIR = new File(TEMP_DIR, "originalSegment");
private static final File STAR_TREE_SEGMENT_DIR = new File(TEMP_DIR, "starTreeSegment");
private static final File PURGED_SEGMENT_DIR = new File(TEMP_DIR, "purgedSegment");
private static final Random RANDOM = new Random();

private static final int NUM_ROWS = 10000;
private static final String TABLE_NAME = "testTable";
private static final String SEGMENT_NAME = "testSegment";
private static final String STAR_TREE_SEGMENT_NAME = "starTreeSegment";
private static final String D1 = "d1";
private static final String D2 = "d2";
private static final StarTreeIndexConfig STAR_TREE_CONFIG_A =
new StarTreeIndexConfig(List.of(D1, D2), null, List.of("SUM__d2"), null, 10000);
private static final StarTreeIndexConfig STAR_TREE_CONFIG_B =
new StarTreeIndexConfig(List.of(D2, D1), null, List.of("SUM__d1"), null, 5000);

private TableConfig _tableConfig;
private Schema _schema;
private File _originalIndexDir;
private File _starTreeIndexDir;
private int _expectedNumRecordsPurged;
private int _expectedNumRecordsModified;

Expand Down Expand Up @@ -109,6 +130,18 @@ public void setUp()
driver.init(config, genericRowRecordReader, InstanceType.MINION);
driver.build();
_originalIndexDir = new File(ORIGINAL_SEGMENT_DIR, SEGMENT_NAME);

// Create a segment WITH star tree index
TableConfig starTreeTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
.setStarTreeIndexConfigs(Collections.singletonList(STAR_TREE_CONFIG_A)).build();
SegmentGeneratorConfig starTreeConfig = new SegmentGeneratorConfig(starTreeTableConfig, _schema);
starTreeConfig.setOutDir(STAR_TREE_SEGMENT_DIR.getPath());
starTreeConfig.setSegmentName(STAR_TREE_SEGMENT_NAME);
genericRowRecordReader = new GenericRowRecordReader(rows);
SegmentIndexCreationDriverImpl starTreeDriver = new SegmentIndexCreationDriverImpl();
starTreeDriver.init(starTreeConfig, genericRowRecordReader);
starTreeDriver.build();
_starTreeIndexDir = new File(STAR_TREE_SEGMENT_DIR, STAR_TREE_SEGMENT_NAME);
}

@Test
Expand Down Expand Up @@ -177,12 +210,14 @@ public void testPurgeSegment()
}

@Test
public void testSegmentPurgerWithCustomSegmentGeneratorConfig() {
public void testSegmentPurgerWithCustomSegmentGeneratorConfig()
throws Exception {
SegmentPurger.RecordPurger recordPurger = row -> row.getValue(D1).equals(0);
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_originalIndexDir);

SegmentPurger segmentPurger =
new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, _schema, recordPurger, null, null);
segmentPurger.initSegmentGeneratorConfig("currentSegmentName");
segmentPurger.initSegmentGeneratorConfig("currentSegmentName", segmentMetadata);
assertEquals(segmentPurger.getSegmentGeneratorConfig().getSegmentName(), "currentSegmentName");

String newSegmentName = "myTable_segment_001";
Expand All @@ -193,10 +228,127 @@ public void testSegmentPurgerWithCustomSegmentGeneratorConfig() {
SegmentPurger segmentPurger2 =
new SegmentPurger(_originalIndexDir, PURGED_SEGMENT_DIR, _tableConfig, _schema, recordPurger, null,
segmentGeneratorCustomConfigs);
segmentPurger2.initSegmentGeneratorConfig("currentSegmentName");
segmentPurger2.initSegmentGeneratorConfig("currentSegmentName", segmentMetadata);
assertEquals(segmentPurger2.getSegmentGeneratorConfig().getSegmentName(), newSegmentName);
}

// Each row: {segmentHasStarTree, tableStarTreeConfigs, enableDynamic, expectedNull, expectedMaxLeaf}
// expectedMaxLeaf is only checked when expectedNull=false
@DataProvider(name = "starTreePreservationCases")
public Object[][] starTreePreservationCases() {
return new Object[][]{
// Segment without star tree, table has config A, dynamic=false → skip (preserve no-star-tree state)
{false, List.of(STAR_TREE_CONFIG_A), false, true, -1},
// Segment without star tree, table has config A, dynamic=true → add from table config
{false, List.of(STAR_TREE_CONFIG_A), true, false, 10000},
// Segment with star tree (A), table updated to config B, dynamic=false → preserve original A
{true, List.of(STAR_TREE_CONFIG_B), false, false, 10000},
// Segment with star tree (A), table updated to config B, dynamic=true → use B
{true, List.of(STAR_TREE_CONFIG_B), true, false, 5000},
// Segment with star tree, table removed config, dynamic=false → preserve original
{true, null, false, false, 10000},
// Segment with star tree, table removed config, dynamic=true → remove
{true, null, true, true, -1},
};
}

@Test(dataProvider = "starTreePreservationCases")
public void testStarTreePreservation(boolean segmentHasStarTree, List<StarTreeIndexConfig> tableStarTreeConfigs,
boolean enableDynamic, boolean expectedNull, int expectedMaxLeaf)
throws Exception {
File indexDir = segmentHasStarTree ? _starTreeIndexDir : _originalIndexDir;
String segmentName = segmentHasStarTree ? STAR_TREE_SEGMENT_NAME : SEGMENT_NAME;

TableConfigBuilder builder = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME);
if (tableStarTreeConfigs != null) {
builder.setStarTreeIndexConfigs(tableStarTreeConfigs);
}
TableConfig tableConfig = builder.build();
if (enableDynamic) {
tableConfig.getIndexingConfig().setEnableDynamicStarTreeCreation(true);
}

SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
SegmentPurger.RecordPurger recordPurger = row -> row.getValue(D1).equals(0);
SegmentPurger segmentPurger =
new SegmentPurger(indexDir, PURGED_SEGMENT_DIR, tableConfig, _schema, recordPurger, null, null);
segmentPurger.initSegmentGeneratorConfig(segmentName, segmentMetadata);

List<StarTreeIndexConfig> resultConfigs = segmentPurger.getSegmentGeneratorConfig().getStarTreeIndexConfigs();
if (expectedNull) {
assertNull(resultConfigs);
assertFalse(segmentPurger.getSegmentGeneratorConfig().isEnableDefaultStarTree());
} else {
assertNotNull(resultConfigs);
assertEquals(resultConfigs.size(), 1);
assertEquals(resultConfigs.get(0).getMaxLeafRecords(), expectedMaxLeaf);
}
}

@Test
public void testToStarTreeIndexConfigs() {
// Tree 1: two aggregations with default spec and skip dimensions
TreeMap<AggregationFunctionColumnPair, AggregationSpec> specs1 = new TreeMap<>();
specs1.put(new AggregationFunctionColumnPair(AggregationFunctionType.SUM, "d2"), AggregationSpec.DEFAULT);
specs1.put(new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, "*"), AggregationSpec.DEFAULT);
StarTreeV2Metadata metadata1 = createStarTreeMetadata(List.of("d1", "d2"), specs1, 10000, Set.of("d2"));

// Tree 2: single aggregation, different dimension order
TreeMap<AggregationFunctionColumnPair, AggregationSpec> specs2 = new TreeMap<>();
specs2.put(new AggregationFunctionColumnPair(AggregationFunctionType.SUM, "d1"), AggregationSpec.DEFAULT);
StarTreeV2Metadata metadata2 = createStarTreeMetadata(List.of("d2", "d1"), specs2, 5000, Set.of());

List<StarTreeIndexConfig> configs = StarTreeUtils.toStarTreeIndexConfigs(List.of(metadata1, metadata2));
assertEquals(configs.size(), 2);

// Verify tree 1
StarTreeIndexConfig config1 = configs.get(0);
assertEquals(config1.getDimensionsSplitOrder(), List.of("d1", "d2"));
assertNotNull(config1.getSkipStarNodeCreationForDimensions());
assertEquals(config1.getSkipStarNodeCreationForDimensions(), List.of("d2"));
assertEquals(config1.getMaxLeafRecords(), 10000);
assertNotNull(config1.getFunctionColumnPairs());
assertTrue(config1.getFunctionColumnPairs().contains("count__*"));
assertTrue(config1.getFunctionColumnPairs().contains("sum__d2"));
assertEquals(config1.getAggregationConfigs().size(), 2);

// Verify tree 2
StarTreeIndexConfig config2 = configs.get(1);
assertEquals(config2.getDimensionsSplitOrder(), List.of("d2", "d1"));
assertNull(config2.getSkipStarNodeCreationForDimensions());
assertEquals(config2.getMaxLeafRecords(), 5000);
}

@Test
public void testToStarTreeIndexConfigsWithCustomSpec() {
TreeMap<AggregationFunctionColumnPair, AggregationSpec> aggregationSpecs = new TreeMap<>();
aggregationSpecs.put(new AggregationFunctionColumnPair(AggregationFunctionType.SUM, "d2"),
new AggregationSpec(CompressionCodec.LZ4, true, 4, 4096, 1000, Map.of("param1", "value1")));
StarTreeV2Metadata metadata = createStarTreeMetadata(List.of("d1"), aggregationSpecs, 5000, Set.of());

List<StarTreeIndexConfig> configs = StarTreeUtils.toStarTreeIndexConfigs(List.of(metadata));
assertEquals(configs.size(), 1);
assertEquals(configs.get(0).getMaxLeafRecords(), 5000);

StarTreeAggregationConfig aggConfig = configs.get(0).getAggregationConfigs().get(0);
assertEquals(aggConfig.getColumnName(), "d2");
assertEquals(aggConfig.getAggregationFunction(), "sum");
assertEquals(aggConfig.getCompressionCodec(), CompressionCodec.LZ4);
assertTrue(aggConfig.getDeriveNumDocsPerChunk());
assertEquals((int) aggConfig.getIndexVersion(), 4);
assertEquals((int) aggConfig.getTargetDocsPerChunk(), 1000);
assertEquals(aggConfig.getFunctionParameters().get("param1"), "value1");
}

private static StarTreeV2Metadata createStarTreeMetadata(List<String> dimensions,
TreeMap<AggregationFunctionColumnPair, AggregationSpec> aggregationSpecs, int maxLeafRecords,
Set<String> skipStarNodeCreationForDimensions) {
PropertiesConfiguration props = new PropertiesConfiguration();
StarTreeV2Metadata.writeMetadata(props, 100, dimensions, aggregationSpecs, maxLeafRecords,
skipStarNodeCreationForDimensions);
return new StarTreeV2Metadata(props);
}

@AfterClass
public void tearDown()
throws Exception {
Expand Down
Loading