Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -48,6 +49,7 @@
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
Expand Down Expand Up @@ -334,9 +336,16 @@ protected TableConfig createTableConfig(File tableConfigFile)
* Creates a new OFFLINE table config.
*/
protected TableConfig createOfflineTableConfig() {
return createOfflineTableConfig(getTableName());
}

/**
* Creates a new OFFLINE table config.
*/
protected TableConfig createOfflineTableConfig(String tableName) {
// @formatter:off
return new TableConfigBuilder(TableType.OFFLINE)
.setTableName(getTableName())
.setTableName(tableName)
.setTimeColumnName(getTimeColumnName())
.setSortedColumn(getSortedColumn())
.setInvertedIndexColumns(getInvertedIndexColumns())
Expand Down Expand Up @@ -406,26 +415,71 @@ protected TableConfig createRealtimeTableConfig(File sampleAvroFile) {
return getTableConfigBuilder(TableType.REALTIME).build();
}

private List<String> getTimeBoundaryTable(List<String> offlineTables) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods are moved from BaseLogicalTableIntegrationTest to this class so that LogicalTableMultiStageEngineIntegrationTest can also leverage them

String timeBoundaryTable = null;
long maxEndTimeMillis = Long.MIN_VALUE;
try {
for (String tableName : offlineTables) {
String url = _controllerRequestURLBuilder.forSegmentMetadata(tableName, TableType.OFFLINE);
String response = ControllerTest.sendGetRequest(url);
JsonNode jsonNode = JsonUtils.stringToJsonNode(response);
Iterator<String> stringIterator = jsonNode.fieldNames();
while (stringIterator.hasNext()) {
String segmentName = stringIterator.next();
JsonNode segmentJsonNode = jsonNode.get(segmentName);
long endTimeMillis = segmentJsonNode.get("endTimeMillis").asLong();
if (endTimeMillis > maxEndTimeMillis) {
maxEndTimeMillis = endTimeMillis;
timeBoundaryTable = tableName;
}
}
}
} catch (IOException e) {
throw new RuntimeException("Failed to get the time boundary table", e);
}
return timeBoundaryTable != null ? List.of(TableNameBuilder.OFFLINE.tableNameWithType(timeBoundaryTable))
: List.of();
Comment on lines +440 to +441
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getTimeBoundaryTable() iterates over offlineTables and sets timeBoundaryTable = tableName, but then returns TableNameBuilder.OFFLINE.tableNameWithType(timeBoundaryTable). In current call sites (createLogicalTableConfig() and BaseLogicalTableIntegrationTest), offlineTables are already table-name-with-type (e.g. o_1_OFFLINE), so this produces an invalid name like o_1_OFFLINE_OFFLINE and breaks the includedTables in TimeBoundaryConfig.

Suggested change
return timeBoundaryTable != null ? List.of(TableNameBuilder.OFFLINE.tableNameWithType(timeBoundaryTable))
: List.of();
return timeBoundaryTable != null ? List.of(timeBoundaryTable) : List.of();

Copilot uses AI. Check for mistakes.
}

protected LogicalTableConfig createLogicalTableConfig(List<String> offlineTables, List<String> realtimeTables) {
Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
for (String physicalTableName : offlineTables) {
physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig());
}
for (String physicalTableName : realtimeTables) {
physicalTableConfigMap.put(physicalTableName, new PhysicalTableConfig());
}
String offlineTableName = offlineTables.stream().findFirst().orElse(null);
String realtimeTableName = realtimeTables.stream().findFirst().orElse(null);
LogicalTableConfigBuilder builder =
new LogicalTableConfigBuilder().setTableName(getLogicalTableName())
.setBrokerTenant(getBrokerTenant())
.setRefOfflineTableName(offlineTableName)
.setRefRealtimeTableName(realtimeTableName)
.setPhysicalTableConfigMap(physicalTableConfigMap);
if (!offlineTables.isEmpty() && !realtimeTables.isEmpty()) {
builder.setTimeBoundaryConfig(
new TimeBoundaryConfig("min", Map.of("includedTables", getTimeBoundaryTable(offlineTables)))
);
}
return builder.build();
}

/**
* Creates a LogicalTableConfig backed by the OFFLINE and REALTIME physical tables.
*/
protected LogicalTableConfig createLogicalTableConfig() {
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(getTableName());
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
return createLogicalTableConfig(List.of(offlineTableName), List.of(realtimeTableName));
}

Map<String, PhysicalTableConfig> physicalTableConfigMap = new HashMap<>();
physicalTableConfigMap.put(offlineTableName, new PhysicalTableConfig());
physicalTableConfigMap.put(realtimeTableName, new PhysicalTableConfig());

return new LogicalTableConfigBuilder()
.setTableName(getLogicalTableName())
.setBrokerTenant(getBrokerTenant())
.setRefOfflineTableName(offlineTableName)
.setRefRealtimeTableName(realtimeTableName)
.setPhysicalTableConfigMap(physicalTableConfigMap)
.setTimeBoundaryConfig(
new TimeBoundaryConfig("min", Map.of("includedTables", physicalTableConfigMap.keySet())))
.build();
protected void createLogicalTableAndSchema()
throws IOException {
Schema schema = createSchema(getSchemaFileName());
schema.setSchemaName(getTableName());
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createLogicalTableAndSchema() sets the schema name to getTableName(), but the schema for a logical table should be created with the logical table name (i.e., getLogicalTableName()). If a subclass uses a logical table name different from the physical table name, the controller will not find the expected schema for the logical table.

Suggested change
schema.setSchemaName(getTableName());
schema.setSchemaName(getLogicalTableName());

Copilot uses AI. Check for mistakes.
addSchema(schema);
createLogicalTable();
}

/**
Expand Down Expand Up @@ -673,6 +727,50 @@ protected List<File> unpackAvroData(File outputDir)
return unpackTarData(getAvroTarFileName(), outputDir);
}

protected void uploadDataToOfflineTables(List<String> tableNames, List<File> avroFiles)
throws Exception {
Map<String, List<File>> offlineTableDataFiles = distributeFilesToTables(tableNames, avroFiles);
for (Map.Entry<String, List<File>> entry : offlineTableDataFiles.entrySet()) {
String tableName = entry.getKey();
List<File> avroFilesForTable = entry.getValue();

File tarDir = new File(_tarDir, tableName);

TestUtils.ensureDirectoriesExistAndEmpty(tarDir);

// Create and upload the schema and table config
Schema schema = createSchema(getSchemaFileName());
schema.setSchemaName(tableName);
addSchema(schema);
TableConfig offlineTableConfig = createOfflineTableConfig(tableName);
addTableConfig(offlineTableConfig);

// Create and upload segments
ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFilesForTable, offlineTableConfig, schema, 0, _segmentDir,
tarDir);
uploadSegments(tableName, tarDir);
}
}

// Distributes the given Avro files to the given table names in a round-robin manner and
// returns the mapping from table name to list of Avro files.
protected Map<String, List<File>> distributeFilesToTables(List<String> tableNames, List<File> avroFiles) {
if (tableNames.isEmpty()) {
return Map.of();
}
Map<String, List<File>> tableNameToFilesMap = new HashMap<>();

// Initialize the map with empty lists for each table name
tableNames.forEach(table -> tableNameToFilesMap.put(table, new ArrayList<>()));

// Round-robin distribution of files to table names
for (int i = 0; i < avroFiles.size(); i++) {
String tableName = tableNames.get(i % tableNames.size());
tableNameToFilesMap.get(tableName).add(avroFiles.get(i));
}
return tableNameToFilesMap;
}

/**
* Unpack the tarred data into the given directory.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ protected String getSchemaFileName() {
@BeforeClass
public void setUp()
throws Exception {
initCluster();
List<File> avroFiles = unpackAvroData(_tempDir);
initTables(avroFiles);
initOtherDependencies(avroFiles);
}

protected void initCluster()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);

// Start the Pinot cluster
Expand All @@ -119,20 +127,23 @@ public void setUp()
startBroker();
startServer();
setupTenants();
}

protected void initTables(List<File> avroFiles)
throws Exception {
// Create and upload the schema and table config
Schema schema = createSchema();
addSchema(schema);
TableConfig tableConfig = createOfflineTableConfig();
addTableConfig(tableConfig);

// Unpack the Avro files
List<File> avroFiles = unpackAvroData(_tempDir);

// Create and upload segments
ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
uploadSegments(getTableName(), _tarDir);
}

protected void initOtherDependencies(List<File> avroFiles)
throws Exception {
// Set up the H2 connection
setUpH2Connection(avroFiles);

Expand Down Expand Up @@ -1012,7 +1023,7 @@ private void checkSingleColumnSameValueResult(JsonNode result, long expectedRows
}

@DataProvider(name = "polymorphicScalarComparisonFunctionsDataProvider")
Object[][] polymorphicScalarComparisonFunctionsDataProvider() {
protected Object[][] polymorphicScalarComparisonFunctionsDataProvider() {
List<Object[]> inputs = new ArrayList<>();

inputs.add(new Object[]{"STRING", "'test'", "'abc'", "test"});
Expand Down Expand Up @@ -1162,6 +1173,11 @@ public void testLiteralFilterReduce()
@Test
public void testBetween()
throws Exception {
testBetween(true);
}

protected void testBetween(boolean testExplainPlanForNotBetween)
throws Exception {
String sqlQuery = "SELECT COUNT(*) FROM mytable WHERE AirTime BETWEEN 10 AND 50";
JsonNode jsonNode = postQuery(sqlQuery);
assertNoError(jsonNode);
Expand Down Expand Up @@ -1213,18 +1229,20 @@ public void testBetween()
assertNoError(jsonNode);
assertEquals(jsonNode.get("resultTable").get("rows").get(0).get(0).asInt(), 58538);

explainQuery =
"SET " + CommonConstants.Broker.Request.QueryOptionKey.EXPLAIN_ASKING_SERVERS + "=true; EXPLAIN PLAN FOR "
+ sqlQuery;
jsonNode = postQuery(explainQuery);
assertNoError(jsonNode);
plan = jsonNode.get("resultTable").get("rows").get(0).get(1).asText();
// Ensure that the BETWEEN filter predicate was not converted. Also ensure that the NOT filter is added.
Assert.assertTrue(plan.contains("BETWEEN"));
Assert.assertTrue(plan.contains("FilterNot"));
Assert.assertFalse(plan.contains(">="));
Assert.assertFalse(plan.contains("<="));
Assert.assertFalse(plan.contains("Sarg"));
if (testExplainPlanForNotBetween) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this, why is NOT BETWEEN a special case here? Or is this the only one where the logical plan is being compared with the segment level plan and segment level explain plan for MSE with logical tables is broken?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is the only case. other explain assertions work fine. I don't yet know the cause as I haven't debugged yet. Will do that soon.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the result is correct with logical tables. Its only the explain plan which is different for this case.

explainQuery =
"SET " + CommonConstants.Broker.Request.QueryOptionKey.EXPLAIN_ASKING_SERVERS + "=true; EXPLAIN PLAN FOR "
+ sqlQuery;
jsonNode = postQuery(explainQuery);
assertNoError(jsonNode);
plan = jsonNode.get("resultTable").get("rows").get(0).get(1).asText();
// Ensure that the BETWEEN filter predicate was not converted. Also ensure that the NOT filter is added.
Assert.assertTrue(plan.contains("BETWEEN"));
Assert.assertTrue(plan.contains("FilterNot"));
Assert.assertFalse(plan.contains(">="));
Assert.assertFalse(plan.contains("<="));
Assert.assertFalse(plan.contains("Sarg"));
}
}

@Test
Expand Down Expand Up @@ -1405,6 +1423,10 @@ protected String getTableName() {
return _tableName;
}

protected List<String> getOfflineTablesCreated() {
return Collections.singletonList(_tableName);
}

@Test
public void testWithoutDatabaseContext()
throws Exception {
Expand Down Expand Up @@ -2261,7 +2283,9 @@ public void testPipelineBreakerWithoutKeepingStats() {
@AfterClass
public void tearDown()
throws Exception {
dropOfflineTable(DEFAULT_TABLE_NAME);
for (String tableName : getOfflineTablesCreated()) {
dropOfflineTable(tableName);
}
dropOfflineTable(TABLE_NAME_WITH_DATABASE);
dropOfflineTable(DIM_TABLE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,12 @@ public void addTableConfig(TableConfig tableConfig)
sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), tableConfig.toJsonString(), AUTH_HEADER);
}

@Override
protected LogicalTableConfig createLogicalTableConfig() {
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
return createLogicalTableConfig(List.of(), List.of(realtimeTableName));
}

@Override
protected void createLogicalTable()
throws IOException {
Expand Down
Loading
Loading