Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,25 @@ public void showCountTimeSeries() throws SQLException {
}
}

@Test
public void showCountTimeSeriesExcludeInternalDatabaseAndIncludeView() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
final long baseVisibleCount = queryCount(statement, "COUNT TIMESERIES root.ln*.**");
statement.execute("CREATE DATABASE root.count_it");
statement.execute(
"CREATE TIMESERIES root.count_it.src.s1 WITH DATATYPE = INT32, ENCODING = PLAIN");
statement.execute(
"CREATE TIMESERIES root.count_it.src.s2 WITH DATATYPE = INT32, ENCODING = PLAIN");
statement.execute("CREATE VIEW root.count_it.dst.v1 AS SELECT s1 FROM root.count_it.src;");

final long localCount = queryCount(statement, "COUNT TIMESERIES root.count_it.**");
assertEquals(3L, localCount);
assertEquals(
baseVisibleCount + localCount, queryCount(statement, "COUNT TIMESERIES root.**"));
}
}

@Test
public void showCountTimeSeriesWithTag() throws SQLException {
try (Connection connection = EnvFactory.getEnv().getConnection();
Expand Down Expand Up @@ -865,4 +884,11 @@ public void showDeadbandInfo() throws SQLException {
}
}
}

private long queryCount(final Statement statement, final String sql) throws SQLException {
try (ResultSet resultSet = statement.executeQuery(sql)) {
Assert.assertTrue(resultSet.next());
return resultSet.getLong(1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.queryengine.execution.operator.schema.source.ISchemaSource;
import org.apache.iotdb.db.queryengine.execution.operator.source.SourceOperator;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.info.ISchemaInfo;
import org.apache.iotdb.db.schemaengine.schemaregion.read.resp.reader.ISchemaReader;

Expand Down Expand Up @@ -95,6 +96,10 @@ public OperatorContext getOperatorContext() {
return operatorContext;
}

private ISchemaRegion getSchemaRegion() {
return ((SchemaDriverContext) operatorContext.getDriverContext()).getSchemaRegion();
}

@Override
public ListenableFuture<?> isBlocked() {
if (isBlocked == null) {
Expand All @@ -109,6 +114,11 @@ public ListenableFuture<?> isBlocked() {
*/
private ListenableFuture<?> tryGetNext() {
if (schemaReader == null) {
if (schemaSource.shouldSkipSchemaRegion(getSchemaRegion())) {
next = null;
isFinished = true;
return NOT_BLOCKED;
}
schemaReader = createTimeSeriesReader();
}
while (true) {
Expand Down Expand Up @@ -172,15 +182,14 @@ public TsBlock next() throws Exception {
@Override
public boolean hasNext() throws Exception {
isBlocked().get(); // wait for the next TsBlock
if (!schemaReader.isSuccess()) {
if (schemaReader != null && !schemaReader.isSuccess()) {
throw new SchemaExecutionException(schemaReader.getFailure());
}
return next != null;
}

public ISchemaReader<T> createTimeSeriesReader() {
return schemaSource.getSchemaReader(
((SchemaDriverContext) operatorContext.getDriverContext()).getSchemaRegion());
return schemaSource.getSchemaReader(getSchemaRegion());
}

private TsBlock constructTsBlockAndClearMap(Map<PartialPath, Long> countMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public ListenableFuture<?> isBlocked() {
*/
private ListenableFuture<?> tryGetNext() {
ISchemaRegion schemaRegion = getSchemaRegion();
if (schemaSource.shouldSkipSchemaRegion(schemaRegion)) {
next = constructTsBlock(0);
return NOT_BLOCKED;
}
if (schemaSource.hasSchemaStatistic(schemaRegion)) {
long statisticCount = schemaSource.getSchemaStatistic(schemaRegion);
// Check if database path itself is counted as a device (bug fix)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ void transformToTsBlockColumns(

long getSchemaStatistic(final ISchemaRegion schemaRegion);

default boolean shouldSkipSchemaRegion(final ISchemaRegion schemaRegion) {
return false;
}

default boolean checkRegionDatabaseIncluded(final ISchemaRegion schemaRegion) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static ISchemaSource<ITimeSeriesSchemaInfo> getTimeSeriesSchemaCountSourc
Map<Integer, Template> templateMap,
PathPatternTree scope) {
return new TimeSeriesSchemaSource(
pathPattern, isPrefixMatch, 0, 0, schemaFilter, templateMap, false, scope, null);
pathPattern, isPrefixMatch, 0, 0, schemaFilter, templateMap, false, true, scope, null);
}

// show time series
Expand All @@ -69,6 +69,7 @@ public static ISchemaSource<ITimeSeriesSchemaInfo> getTimeSeriesSchemaScanSource
schemaFilter,
templateMap,
true,
false,
scope,
timeseriesOrdering);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iotdb.commons.schema.column.ColumnHeader;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import org.apache.iotdb.commons.schema.filter.SchemaFilter;
import org.apache.iotdb.commons.schema.table.Audit;
import org.apache.iotdb.commons.schema.template.Template;
import org.apache.iotdb.commons.schema.view.ViewType;
import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class TimeSeriesSchemaSource implements ISchemaSource<ITimeSeriesSchemaIn
private final SchemaFilter schemaFilter;
private final Map<Integer, Template> templateMap;
private final boolean needViewDetail;
private final boolean excludeInternalDatabase;
private final Ordering timeseriesOrdering;

TimeSeriesSchemaSource(
Expand All @@ -65,6 +67,7 @@ public class TimeSeriesSchemaSource implements ISchemaSource<ITimeSeriesSchemaIn
SchemaFilter schemaFilter,
Map<Integer, Template> templateMap,
boolean needViewDetail,
boolean excludeInternalDatabase,
PathPatternTree scope,
Ordering timeseriesOrdering) {
this.pathPattern = pathPattern;
Expand All @@ -74,6 +77,7 @@ public class TimeSeriesSchemaSource implements ISchemaSource<ITimeSeriesSchemaIn
this.schemaFilter = schemaFilter;
this.templateMap = templateMap;
this.needViewDetail = needViewDetail;
this.excludeInternalDatabase = excludeInternalDatabase;
this.scope = scope;
this.timeseriesOrdering = timeseriesOrdering;
}
Expand Down Expand Up @@ -141,6 +145,25 @@ public long getSchemaStatistic(ISchemaRegion schemaRegion) {
return schemaRegion.getSchemaRegionStatistics().getSeriesNumber(true);
}

@Override
public boolean shouldSkipSchemaRegion(final ISchemaRegion schemaRegion) {
if (!excludeInternalDatabase) {
return false;
}

final String database = schemaRegion.getDatabaseFullPath();
if (!SchemaConstant.SYSTEM_DATABASE.equals(database)
&& !SchemaConstant.AUDIT_DATABASE.equals(database)
&& !Audit.TABLE_MODEL_AUDIT_DATABASE.equals(database)) {
return false;
}

final String[] nodes = pathPattern.getNodes();
return nodes.length < 2
|| !SchemaConstant.ROOT.equals(nodes[0])
|| !database.endsWith("." + nodes[1]);
}

public static String mapToString(Map<String, String> map) {
if (map == null || map.isEmpty()) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,82 @@ public void testSchemaCountOperator() throws Exception {
}
}

@Test
public void testSchemaCountOperatorSkipSchemaRegion() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
QueryId queryId = new QueryId("stub_query");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
createFragmentInstanceContext(instanceId, stateMachine);
DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
PlanNodeId planNodeId = queryId.genPlanNodeId();
ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
OperatorContext operatorContext =
driverContext.addOperatorContext(
1, planNodeId, SchemaCountOperator.class.getSimpleName());
operatorContext.setDriverContext(
new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
ISchemaSource<ISchemaInfo> schemaSource = Mockito.mock(ISchemaSource.class);
Mockito.when(schemaSource.shouldSkipSchemaRegion(schemaRegion)).thenReturn(true);

SchemaCountOperator<?> schemaCountOperator =
new SchemaCountOperator<>(
planNodeId, driverContext.getOperatorContexts().get(0), schemaSource);

assertTrue(schemaCountOperator.hasNext());
TsBlock tsBlock = schemaCountOperator.next();
assertEquals(0, tsBlock.getColumn(0).getLong(0));
assertTrue(schemaCountOperator.isFinished());
Mockito.verify(schemaSource, Mockito.never()).getSchemaReader(schemaRegion);
} finally {
instanceNotificationExecutor.shutdown();
}
}

@Test
public void testSchemaCountOperatorUseSchemaStatistic() throws Exception {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
QueryId queryId = new QueryId("stub_query");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
createFragmentInstanceContext(instanceId, stateMachine);
DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
PlanNodeId planNodeId = queryId.genPlanNodeId();
ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
OperatorContext operatorContext =
driverContext.addOperatorContext(
1, planNodeId, SchemaCountOperator.class.getSimpleName());
operatorContext.setDriverContext(
new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
ISchemaSource<ISchemaInfo> schemaSource = Mockito.mock(ISchemaSource.class);
Mockito.when(schemaSource.hasSchemaStatistic(schemaRegion)).thenReturn(true);
Mockito.when(schemaSource.getSchemaStatistic(schemaRegion)).thenReturn(7L);
Mockito.when(schemaSource.checkRegionDatabaseIncluded(schemaRegion)).thenReturn(true);

SchemaCountOperator<?> schemaCountOperator =
new SchemaCountOperator<>(
planNodeId, driverContext.getOperatorContexts().get(0), schemaSource);

assertTrue(schemaCountOperator.hasNext());
TsBlock tsBlock = schemaCountOperator.next();
assertEquals(7, tsBlock.getColumn(0).getLong(0));
Mockito.verify(schemaSource).getSchemaStatistic(schemaRegion);
Mockito.verify(schemaSource, Mockito.never()).getSchemaReader(schemaRegion);
} finally {
instanceNotificationExecutor.shutdown();
}
}

@Test
public void testLevelTimeSeriesCountOperator() {
ExecutorService instanceNotificationExecutor =
Expand Down Expand Up @@ -185,6 +261,43 @@ public void testLevelTimeSeriesCountOperator() {
}
}

@Test
public void testLevelTimeSeriesCountOperatorSkipSchemaRegion() {
ExecutorService instanceNotificationExecutor =
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
try {
QueryId queryId = new QueryId("stub_query");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
createFragmentInstanceContext(instanceId, stateMachine);
DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
PlanNodeId planNodeId = queryId.genPlanNodeId();
OperatorContext operatorContext =
driverContext.addOperatorContext(
1, planNodeId, CountGroupByLevelScanOperator.class.getSimpleName());
ISchemaRegion schemaRegion = Mockito.mock(ISchemaRegion.class);
operatorContext.setDriverContext(
new SchemaDriverContext(fragmentInstanceContext, schemaRegion, 0));
ISchemaSource<ITimeSeriesSchemaInfo> schemaSource = Mockito.mock(ISchemaSource.class);
Mockito.when(schemaSource.shouldSkipSchemaRegion(schemaRegion)).thenReturn(true);

CountGroupByLevelScanOperator<ITimeSeriesSchemaInfo> timeSeriesCountOperator =
new CountGroupByLevelScanOperator<>(
planNodeId, driverContext.getOperatorContexts().get(0), 1, schemaSource);

assertTrue(collectResult(timeSeriesCountOperator).isEmpty());
Mockito.verify(schemaSource, Mockito.never()).getSchemaReader(schemaRegion);
} catch (Exception e) {
e.printStackTrace();
fail();
} finally {
instanceNotificationExecutor.shutdown();
}
}

private List<TsBlock> collectResult(CountGroupByLevelScanOperator<?> operator) throws Exception {
List<TsBlock> tsBlocks = new ArrayList<>();
while (operator.hasNext()) {
Expand Down
Loading
Loading