Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,13 @@ default int getExceptionsSize() {
*/
boolean isMaxRowsInJoinReached();


/**
* Returns whether the limit for max rows in window has been reached.
*/
boolean isMaxRowsInWindowReached();


/**
* Returns the total time used for query execution in milliseconds.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,14 @@ public boolean isMaxRowsInJoinReached() {
return false;
}


@JsonIgnore
@Override
public boolean isMaxRowsInWindowReached() {
return false;
}


@Override
public long getTimeUsedMs() {
return _timeUsedMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
*/
@JsonPropertyOrder({
"resultTable", "numRowsResultSet", "partialResult", "exceptions", "numGroupsLimitReached",
"numGroupsWarningLimitReached", "maxRowsInJoinReached", "maxRowsInWindowReached", "timeUsedMs", "stageStats",
"numGroupsWarningLimitReached", "numGroups", "maxRowsInJoinReached", "maxRowsInJoin",
"maxRowsInWindowReached", "maxRowsInWindow", "timeUsedMs", "stageStats",
"maxRowsInOperator", "requestId", "clientRequestId", "brokerId", "numDocsScanned", "totalDocs",
"numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numServersQueried", "numServersResponded",
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried",
Expand All @@ -59,7 +60,9 @@ public class BrokerResponseNativeV2 implements BrokerResponse {
private ResultTable _resultTable;
private int _numRowsResultSet;
private boolean _maxRowsInJoinReached;
private long _maxRowsInJoin;
private boolean _maxRowsInWindowReached;
private long _maxRowsInWindow;
private long _timeUsedMs;
/**
* Statistics for each stage of the query execution.
Expand Down Expand Up @@ -143,6 +146,14 @@ public void mergeNumGroupsLimitReached(boolean numGroupsLimitReached) {
_brokerStats.merge(StatKey.NUM_GROUPS_LIMIT_REACHED, numGroupsLimitReached);
}

public long getNumGroups() {
return _brokerStats.getLong(StatKey.NUM_GROUPS);
}

public void mergeNumGroups(long numGroups) {
_brokerStats.merge(StatKey.NUM_GROUPS, numGroups);
}

@Override
public boolean isNumGroupsWarningLimitReached() {
return _brokerStats.getBoolean(StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED);
Expand All @@ -161,6 +172,14 @@ public void mergeMaxRowsInJoinReached(boolean maxRowsInJoinReached) {
_maxRowsInJoinReached |= maxRowsInJoinReached;
}

public long getMaxRowsInJoin() {
return _maxRowsInJoin;
}

public void mergeMaxRowsInJoin(long maxRowsInJoin) {
_maxRowsInJoin = Math.max(_maxRowsInJoin, maxRowsInJoin);
}

@Override
public boolean isMaxRowsInWindowReached() {
return _maxRowsInWindowReached;
Expand All @@ -170,6 +189,14 @@ public void mergeMaxRowsInWindowReached(boolean maxRowsInWindowReached) {
_maxRowsInWindowReached |= maxRowsInWindowReached;
}

public long getMaxRowsInWindow() {
return _maxRowsInWindow;
}

public void mergeMaxRowsInWindow(long maxRowsInWindow) {
_maxRowsInWindow = Math.max(_maxRowsInWindow, maxRowsInWindow);
}

/**
* Returns the stage statistics.
*/
Expand Down Expand Up @@ -453,7 +480,13 @@ public long merge(long value1, long value2) {
NUM_SEGMENTS_PRUNED_BY_VALUE(StatMap.Type.INT),
GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN);
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
NUM_GROUPS(StatMap.Type.LONG) {
@Override
public long merge(long value1, long value2) {
return Math.max(value1, value2);
}
};

private final StatMap.Type _type;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ private MseBlock produceAggregatedBlock() {
rows = _groupByExecutor.getResult(_groupTrimSize);
}

// Record stat before we check for limit so we can propagate to query response
_statMap.merge(StatKey.NUM_GROUPS, _groupByExecutor.getNumGroups());

if (rows.isEmpty()) {
return _eosBlock;
} else {
Expand Down Expand Up @@ -472,6 +475,12 @@ public boolean includeDefaultInJson() {
GROUPS_TRIMMED(StatMap.Type.BOOLEAN),
NUM_GROUPS_LIMIT_REACHED(StatMap.Type.BOOLEAN),
NUM_GROUPS_WARNING_LIMIT_REACHED(StatMap.Type.BOOLEAN),
NUM_GROUPS(StatMap.Type.LONG) {
@Override
public long merge(long value1, long value2) {
return Math.max(value1, value2);
}
},
/**
* Allocated memory in bytes for this operator or its children in the same stage.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ protected void buildRightTable() {
// Row based overflow check.
if (rows.size() + numRows > _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
// Record stat before we throw so it propagates to query response
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN, numRows + rows.size());
throwForJoinRowLimitExceeded(
"Cannot build in memory hash table for join operator, reached number of rows limit: " + _maxRowsInJoin);
} else {
Expand All @@ -236,6 +238,7 @@ protected void buildRightTable() {
} else {
_isRightTableBuilt = true;
finishBuildingRightTable();
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN, numRows);
}

_statMap.merge(StatKey.TIME_BUILDING_HASH_TABLE_MS, System.currentTimeMillis() - startTime);
Expand Down Expand Up @@ -343,13 +346,15 @@ public StatMap<StatKey> copyStatMaps() {
protected boolean isMaxRowsLimitReached(int numJoinedRows) {
if (numJoinedRows == _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN, numJoinedRows);
throwForJoinRowLimitExceeded(
"Cannot process join, reached number of rows limit: " + _maxRowsInJoin);
} else {
// Skip over remaining blocks until we reach the end of stream since we already breached the rows limit.
logger().info("Terminating join operator early as the maximum number of rows limit was reached: {}",
_maxRowsInJoin);
earlyTerminateLeftInput();
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN, numJoinedRows);
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
return true;
}
Expand Down Expand Up @@ -390,6 +395,16 @@ public boolean includeDefaultInJson() {
* How long (CPU time) has been spent on building the hash table.
*/
TIME_BUILDING_HASH_TABLE_MS(StatMap.Type.LONG),
/**
* The max number of rows seen in the join. Recorded during right table build (normal and overflow paths)
* and at the joined-output limit check in {@link #isMaxRowsLimitReached}.
*/
MAX_ROWS_IN_JOIN(StatMap.Type.LONG) {
@Override
public long merge(long value1, long value2) {
return Math.max(value1, value2);
}
},
/**
* Allocated memory in bytes for this operator or its children in the same stage.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ public void mergeInto(BrokerResponseNativeV2 response, StatMap<?> map) {
response.mergeNumGroupsLimitReached(stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED));
response.mergeNumGroupsWarningLimitReached(
stats.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED));
response.mergeNumGroups(stats.getLong(AggregateOperator.StatKey.NUM_GROUPS));
response.mergeMaxRowsInOperator(stats.getLong(AggregateOperator.StatKey.EMITTED_ROWS));
}

Expand All @@ -275,6 +276,7 @@ public void mergeInto(BrokerResponseNativeV2 response, StatMap<?> map) {
StatMap<HashJoinOperator.StatKey> stats = (StatMap<HashJoinOperator.StatKey>) map;
response.mergeMaxRowsInOperator(stats.getLong(HashJoinOperator.StatKey.EMITTED_ROWS));
response.mergeMaxRowsInJoinReached(stats.getBoolean(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN_REACHED));
response.mergeMaxRowsInJoin(stats.getLong(HashJoinOperator.StatKey.MAX_ROWS_IN_JOIN));
}

@Override
Expand Down Expand Up @@ -411,6 +413,7 @@ public void mergeInto(BrokerResponseNativeV2 response, StatMap<?> map) {
response.mergeMaxRowsInOperator(stats.getLong(WindowAggregateOperator.StatKey.EMITTED_ROWS));
response.mergeMaxRowsInWindowReached(
stats.getBoolean(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW_REACHED));
response.mergeMaxRowsInWindow(stats.getLong(WindowAggregateOperator.StatKey.MAX_ROWS_IN_WINDOW));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,18 @@ private MseBlock computeBlocks() {
int containerSize = container.size();
if (_numRows + containerSize > _maxRowsInWindowCache) {
if (_windowOverflowMode == WindowOverFlowMode.THROW) {
// Record stat before we throw so it propagates to query response
_statMap.merge(StatKey.MAX_ROWS_IN_WINDOW, _numRows + containerSize);
throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
"Cannot build in memory window cache for WINDOW operator, reach number of rows limit: "
+ _maxRowsInWindowCache);
} else {
// Just fill up the buffer.
int remainingRows = _maxRowsInWindowCache - _numRows;
container = container.subList(0, remainingRows);
// Update container size here since MAX_ROWS_IN_WINDOW is recorded after the loop exits
// via _numRows once EOS is received from the early-terminated input.
containerSize = remainingRows;
_statMap.merge(StatKey.MAX_ROWS_IN_WINDOW_REACHED, true);
// setting the inputOperator to be early terminated and awaits EOS block next.
_input.earlyTerminate();
Expand All @@ -239,6 +244,7 @@ private MseBlock computeBlocks() {
checkTerminationAndSampleUsage();
block = _input.nextBlock();
}
_statMap.merge(StatKey.MAX_ROWS_IN_WINDOW, _numRows);
MseBlock.Eos eosBlock = (MseBlock.Eos) block;
_eosBlock = eosBlock;
// Early termination if the block is an error block
Expand Down Expand Up @@ -301,6 +307,12 @@ public boolean includeDefaultInJson() {
}
},
MAX_ROWS_IN_WINDOW_REACHED(StatMap.Type.BOOLEAN),
MAX_ROWS_IN_WINDOW(StatMap.Type.LONG) {
@Override
public long merge(long value1, long value2) {
return Math.max(value1, value2);
}
},
/**
* Allocated memory in bytes for this operator or its children in the same stage.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.openMocks;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;


Expand Down Expand Up @@ -145,6 +146,7 @@ public void testAggregateMultipleInputBlocks() {
when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1.0}, new Object[]{2, 2.0}))
.thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 3.0}))
.thenReturn(SuccessMseBlock.INSTANCE);
when(_input.calculateStats()).thenReturn(MultiStageQueryStats.emptyStats(0));
DataSchema resultSchema = new DataSchema(new String[]{"group", "sum"}, new ColumnDataType[]{INT, DOUBLE});
AggregateOperator operator = getOperator(resultSchema, aggCalls, filterArgs, groupKeys);

Expand All @@ -156,6 +158,10 @@ public void testAggregateMultipleInputBlocks() {
assertEquals(resultRows.get(0), new Object[]{2, 6.0},
"Expected two columns (group by key, agg value), agg value is final result");
assertTrue(operator.nextBlock().isSuccess(), "Second block is EOS (done processing)");
MultiStageQueryStats stats = operator.calculateStats();
StatMap<AggregateOperator.StatKey> statMap = OperatorTestUtil.getStatMap(AggregateOperator.StatKey.class, stats);
assertEquals(statMap.getLong(AggregateOperator.StatKey.NUM_GROUPS), 1,
"Num groups should equal the number of distinct group keys");
}

@Test
Expand Down Expand Up @@ -312,6 +318,8 @@ public void shouldHandleGroupLimitExceed() {
"num groups limit should be reached");
assertTrue(statMap.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_WARNING_LIMIT_REACHED),
"num groups warning limit should be reached");
assertEquals(statMap.getLong(AggregateOperator.StatKey.NUM_GROUPS), 1,
"Num groups should equal the limit since only one group was accepted");
}

@Test
Expand Down Expand Up @@ -354,6 +362,37 @@ private AggregateOperator getAggregateOperator(OpChainExecutionContext context,
collations, limit));
}

@Test
public void shouldRecordNumGroupsBelowLimit() {
// Given: 1 distinct group key, limit = 2 — below limit, no overflow
List<RexExpression.FunctionCall> aggCalls = List.of(getSum(new RexExpression.InputRef(1)));
List<Integer> filterArgs = List.of(-1);
List<Integer> groupKeys = List.of(0);
PlanNode.NodeHint nodeHint = new PlanNode.NodeHint(Map.of(PinotHintOptions.AGGREGATE_HINT_OPTIONS,
Map.of(PinotHintOptions.AggregateOptions.NUM_GROUPS_LIMIT, "2")));
DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, DOUBLE});

_input = new BlockListMultiStageOperator.Builder(inSchema)
.addRow(2, 1.0)
.addRow(2, 2.0)
.buildWithEos();
DataSchema resultSchema = new DataSchema(new String[]{"group", "sum"}, new ColumnDataType[]{INT, DOUBLE});
AggregateOperator operator = getOperator(resultSchema, aggCalls, filterArgs, groupKeys, nodeHint, Map.of());

// When:
List<Object[]> resultRows = ((MseBlock.Data) operator.nextBlock()).asRowHeap().getRows();

// Then:
assertEquals(resultRows.size(), 1);
assertTrue(operator.nextBlock().isEos());
MultiStageQueryStats stats = operator.calculateStats();
StatMap<AggregateOperator.StatKey> statMap = OperatorTestUtil.getStatMap(AggregateOperator.StatKey.class, stats);
assertFalse(statMap.getBoolean(AggregateOperator.StatKey.NUM_GROUPS_LIMIT_REACHED),
"Num groups limit should not be reached when groups are below limit");
assertEquals(statMap.getLong(AggregateOperator.StatKey.NUM_GROUPS), 1,
"Num groups should equal 1");
}

private static RexExpression.FunctionCall getSum(RexExpression arg) {
return new RexExpression.FunctionCall(ColumnDataType.INT, SqlKind.SUM.name(), List.of(arg));
}
Expand Down
Loading
Loading