Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ public class IoTDBWindowFunction3IT {
"insert into demo values (2021-01-01T09:10:00, 'd1', 1)",
"insert into demo values (2021-01-01T09:08:00, 'd2', 2)",
"insert into demo values (2021-01-01T09:15:00, 'd2', 4)",
"create table stock_rank_cases (symbol string tag, amplitude float field, close_hfq float field, ma_3 float field, ma_5 float field, ma_13 float field, ma_64 float field, ma_120 float field)",
"insert into stock_rank_cases values (2026-03-20T00:00:00.000+08:00, '600000', 0.023391813, 131.21382, 129.4694, 127.74202, 136.61397, 146.17342, 124.21007)",
"insert into stock_rank_cases values (2026-03-27T00:00:00.000+08:00, '600000', 0.03696498, 127.89519, 130.02252, 128.07388, 134.97429, 146.16643, 124.58377)",
"insert into stock_rank_cases values (2026-04-03T00:00:00.000+08:00, '600000', 0.045908183, 129.17159, 129.42686, 129.095, 133.40334, 146.23744, 124.978065)",
"insert into stock_rank_cases values (2026-04-10T00:00:00.000+08:00, '600000', 0.03063241, 126.23586, 127.76755, 129.095, 130.89963, 146.24904, 125.36582)",
"insert into stock_rank_cases values (2026-04-17T00:00:00.000+08:00, '600000', 0.03943377, 125.85294, 127.0868, 128.07388, 129.25014, 146.22755, 125.76035)",
"insert into stock_rank_cases values (2026-04-24T00:00:00.000+08:00, '600000', 0.048681542, 120.61971, 124.236176, 125.955055, 127.688995, 146.10107, 126.11027)",
"insert into stock_rank_cases values (2026-05-01T00:00:00.000+08:00, '600000', 0.026455026, 118.32219, 121.59828, 124.04046, 126.47151, 145.88837, 126.43706)",
"insert into stock_rank_cases values (2026-05-08T00:00:00.000+08:00, '600000', 0.025889968, 115.769394, 118.2371, 121.36002, 125.51912, 145.69579, 126.736595)",
"insert into stock_rank_cases values (2026-05-15T00:00:00.000+08:00, '600000', 0.033076074, 115.769394, 116.62032, 119.26673, 124.48818, 145.48964, 127.05406)",
"create table stock_rank_date_cases (symbol string tag, amplitude float field, close_hfq float field, ma_5 float field, ma_64 float field, ma_120 float field)",
"insert into stock_rank_date_cases values (2026-04-17, '600000', 0.03943377, 125.85294, 128.07388, 146.22755, 125.76035)",
"insert into stock_rank_date_cases values (2026-04-24, '600000', 0.048681542, 120.61971, 125.955055, 146.10107, 126.11027)",
"insert into stock_rank_date_cases values (2026-05-01, '600000', 0.026455026, 118.32219, 124.04046, 145.88837, 126.43706)",
"insert into stock_rank_date_cases values (2026-05-08, '600000', 0.025889968, 115.769394, 121.36002, 145.69579, 126.736595)",
"insert into stock_rank_date_cases values (2026-05-15, '600000', 0.033076074, 115.769394, 119.26673, 145.48964, 127.05406)",
"FLUSH",
"CLEAR ATTRIBUTE CACHE",
};
Expand All @@ -68,7 +84,11 @@ protected static void insertData() {

@BeforeClass
public static void setUp() {
EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(1024 * 1024);
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setSortBufferSize(128 * 1024)
.setMaxTsBlockSizeInByte(4 * 1024);
EnvFactory.getEnv().initClusterEnvironment();
insertData();
}
Expand Down Expand Up @@ -135,6 +155,153 @@ public void testSameWindowFunctionWithDifferentOrdering() {
DATABASE_NAME);
}

@Test
public void testSameWindowFunctionWithDifferentOrderingWithoutPartition() {
String[] expectedHeader =
new String[] {"time", "device", "value", "rank_time_desc", "rank_value", "rank_time_asc"};
String[] retArray =
new String[] {
"2021-01-01T09:05:00.000Z,d1,3.0,6,3,1,",
"2021-01-01T09:07:00.000Z,d1,5.0,5,6,2,",
"2021-01-01T09:08:00.000Z,d2,2.0,4,2,3,",
"2021-01-01T09:09:00.000Z,d1,3.0,3,3,4,",
"2021-01-01T09:10:00.000Z,d1,1.0,2,1,5,",
"2021-01-01T09:15:00.000Z,d2,4.0,1,5,6,",
};
tableResultSetEqualTest(
"SELECT *, rank() OVER (ORDER BY \"time\" DESC) AS rank_time_desc, rank() OVER (ORDER BY value) AS rank_value, rank() OVER (ORDER BY \"time\") AS rank_time_asc FROM demo ORDER BY \"time\"",
expectedHeader,
retArray,
DATABASE_NAME);
}

@Test
public void testSameWindowFunctionWithDifferentFieldOrderingWithoutPartition() {
String[] expectedHeader =
new String[] {"time", "ma_3", "ma_13", "ma_120", "rank_m3", "rank_ma13", "rank_ma120"};
String[] retArray =
new String[] {
"2026-03-19T16:00:00.000Z,129.4694,136.61397,124.21007,2,3,1,",
"2026-03-26T16:00:00.000Z,130.02252,134.97429,124.58377,3,2,2,",
"2026-04-02T16:00:00.000Z,129.42686,133.40334,124.978065,1,1,3,",
};
tableResultSetEqualTest(
"SELECT \"time\", ma_3, ma_13, ma_120, rank() OVER (ORDER BY ma_3) AS rank_m3, rank() OVER (ORDER BY ma_13) AS rank_ma13, rank() OVER (ORDER BY ma_120) AS rank_ma120 FROM stock_rank_cases WHERE \"time\" >= 2026-03-20T00:00:00.000+08:00 AND \"time\" <= 2026-04-03T00:00:00.000+08:00 AND symbol = '600000' ORDER BY \"time\"",
expectedHeader,
retArray,
DATABASE_NAME);
}

@Test
public void testSameWindowFunctionWithDifferentStockOrderingWithoutTimeRank() {
String[] expectedHeader =
new String[] {"time", "amplitude", "close_hfq", "rank_amplitude", "rank_close_hfq"};
String[] retArray =
new String[] {
"2026-04-16T16:00:00.000Z,0.03943377,125.85294,2,1,",
"2026-04-23T16:00:00.000Z,0.048681542,120.61971,1,2,",
"2026-04-30T16:00:00.000Z,0.026455026,118.32219,4,3,",
"2026-05-07T16:00:00.000Z,0.025889968,115.769394,5,4,",
"2026-05-14T16:00:00.000Z,0.033076074,115.769394,3,4,",
};
tableResultSetEqualTest(
"SELECT \"time\", amplitude, close_hfq, rank() OVER (ORDER BY amplitude DESC) AS rank_amplitude, rank() OVER (ORDER BY close_hfq DESC) AS rank_close_hfq FROM stock_rank_cases WHERE \"time\" >= 2026-04-11T00:00:00.000+08:00 AND symbol = '600000' ORDER BY \"time\"",
expectedHeader,
retArray,
DATABASE_NAME);
}

@Test
public void testSameWindowFunctionWithDifferentStockOrderingAfterTimeRank() {
String[] expectedHeader =
new String[] {
"time",
"amplitude",
"close_hfq",
"rank_time_desc",
"rank_amplitude",
"rank_time_asc",
"rank_close_hfq"
};
String[] retArray =
new String[] {
"2026-04-16T16:00:00.000Z,0.03943377,125.85294,5,4,1,5,",
"2026-04-23T16:00:00.000Z,0.048681542,120.61971,4,5,2,4,",
"2026-04-30T16:00:00.000Z,0.026455026,118.32219,3,2,3,3,",
"2026-05-07T16:00:00.000Z,0.025889968,115.769394,2,1,4,1,",
"2026-05-14T16:00:00.000Z,0.033076074,115.769394,1,3,5,1,",
};
tableResultSetEqualTest(
"SELECT \"time\", amplitude, close_hfq, rank() OVER (ORDER BY \"time\" DESC) AS rank_time_desc, rank() OVER (ORDER BY amplitude) AS rank_amplitude, rank() OVER (ORDER BY \"time\") AS rank_time_asc, rank() OVER (ORDER BY close_hfq) AS rank_close_hfq FROM stock_rank_cases WHERE \"time\" >= 2026-04-11T00:00:00.000+08:00 AND symbol = '600000' ORDER BY \"time\"",
expectedHeader,
retArray,
DATABASE_NAME);
}

@Test
public void testSameWindowFunctionWithOriginalStockOrderingAfterTimeRank() {
String[] expectedHeader =
new String[] {
"time",
"amplitude",
"close_hfq",
"ma_5",
"ma_64",
"ma_120",
"rank_time",
"rank_amplitude",
"rank_close_hfq",
"rank_ma5",
"rank_ma64",
"rank_ma120"
};
String[] retArray =
new String[] {
"2026-04-16T16:00:00.000Z,0.03943377,125.85294,128.07388,146.22755,125.76035,1,2,1,5,1,1,",
"2026-04-23T16:00:00.000Z,0.048681542,120.61971,125.955055,146.10107,126.11027,2,1,2,4,2,2,",
"2026-04-30T16:00:00.000Z,0.026455026,118.32219,124.04046,145.88837,126.43706,3,4,3,3,3,3,",
"2026-05-07T16:00:00.000Z,0.025889968,115.769394,121.36002,145.69579,126.736595,4,5,4,2,4,4,",
"2026-05-14T16:00:00.000Z,0.033076074,115.769394,119.26673,145.48964,127.05406,5,3,4,1,5,5,",
};
tableResultSetEqualTest(
"SELECT \"time\", amplitude, close_hfq, ma_5, ma_64, ma_120, rank() OVER (ORDER BY \"time\") AS rank_time, rank() OVER (ORDER BY amplitude DESC) AS rank_amplitude, rank() OVER (ORDER BY close_hfq DESC) AS rank_close_hfq, rank() OVER (ORDER BY ma_5) AS rank_ma5, rank() OVER (ORDER BY ma_64 DESC) AS rank_ma64, rank() OVER (ORDER BY ma_120) AS rank_ma120 FROM stock_rank_cases WHERE \"time\" >= 2026-04-11T00:00:00.000+08:00 AND symbol = '600000' ORDER BY \"time\"",
expectedHeader,
retArray,
DATABASE_NAME);
}

@Test
public void testSameWindowFunctionWithDateOnlyTimeAfterTimeRank() {
String[] expectedHeader =
new String[] {
"time",
"amplitude",
"close_hfq",
"ma_5",
"ma_64",
"ma_120",
"rank_time",
"rank_amplitude",
"rank_close_hfq",
"rank_ma5",
"rank_ma64",
"rank_ma120"
};
String[] retArray =
new String[] {
"2026-04-17T00:00:00.000Z,0.03943377,125.85294,128.07388,146.22755,125.76035,1,2,1,5,1,1,",
"2026-04-24T00:00:00.000Z,0.048681542,120.61971,125.955055,146.10107,126.11027,2,1,2,4,2,2,",
"2026-05-01T00:00:00.000Z,0.026455026,118.32219,124.04046,145.88837,126.43706,3,4,3,3,3,3,",
"2026-05-08T00:00:00.000Z,0.025889968,115.769394,121.36002,145.69579,126.736595,4,5,4,2,4,4,",
"2026-05-15T00:00:00.000Z,0.033076074,115.769394,119.26673,145.48964,127.05406,5,3,4,1,5,5,",
};
tableResultSetEqualTest(
"SELECT \"time\", amplitude, close_hfq, ma_5, ma_64, ma_120, rank() OVER (ORDER BY \"time\") AS rank_time, rank() OVER (ORDER BY amplitude DESC) AS rank_amplitude, rank() OVER (ORDER BY close_hfq DESC) AS rank_close_hfq, rank() OVER (ORDER BY ma_5) AS rank_ma5, rank() OVER (ORDER BY ma_64 DESC) AS rank_ma64, rank() OVER (ORDER BY ma_120) AS rank_ma120 FROM stock_rank_date_cases WHERE \"time\" >= 2026-04-11 AND symbol = '600000' ORDER BY \"time\"",
expectedHeader,
retArray,
DATABASE_NAME);
}

@Test
public void testPushDownFilterIntoWindow() {
String[] expectedHeader = new String[] {"time", "device", "value", "rn"};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import org.apache.iotdb.calc.plan.relational.utils.matching.PropertyPattern;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.commons.queryengine.plan.relational.planner.Assignments;
import org.apache.iotdb.commons.queryengine.plan.relational.planner.DataOrganizationSpecification;
import org.apache.iotdb.commons.queryengine.plan.relational.planner.OrderingScheme;
import org.apache.iotdb.commons.queryengine.plan.relational.planner.SortOrder;
import org.apache.iotdb.commons.queryengine.plan.relational.planner.Symbol;
import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.GroupNode;
import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.ProjectNode;
import org.apache.iotdb.commons.queryengine.plan.relational.planner.node.WindowNode;
import org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Expression;
Expand All @@ -38,7 +41,9 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -50,6 +55,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static org.apache.iotdb.calc.plan.relational.utils.matching.Capture.newCapture;
import static org.apache.iotdb.commons.queryengine.plan.relational.planner.SortOrder.ASC_NULLS_LAST;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.restrictOutputs;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.iterative.rule.Util.transpose;
import static org.apache.iotdb.db.queryengine.plan.relational.planner.node.Patterns.groupNode;
Expand Down Expand Up @@ -224,7 +230,9 @@ public SwapAdjacentWindowsBySpecifications(int numProjects) {
@Override
protected Optional<PlanNode> manipulateAdjacentWindowNodes(
WindowNode parent, WindowNode child, Context context) {
if ((compare(parent, child) < 0) && (!dependsOn(parent, child))) {
if ((compare(parent, child) < 0)
&& (!dependsOn(parent, child))
&& childInputSatisfies(parent, child)) {
PlanNode transposedWindows = transpose(parent, child);
return Optional.of(
restrictOutputs(
Expand All @@ -236,6 +244,55 @@ protected Optional<PlanNode> manipulateAdjacentWindowNodes(
return Optional.empty();
}

private static boolean childInputSatisfies(WindowNode parent, WindowNode child) {
return inputSatisfies(parent.getSpecification(), child.getChild());
}

private static boolean inputSatisfies(
DataOrganizationSpecification specification, PlanNode input) {
if (input instanceof ProjectNode) {
return inputSatisfies(specification, ((ProjectNode) input).getChild());
}
if (!(input instanceof GroupNode)) {
return false;
}
return orderingSatisfies(specification, ((GroupNode) input).getOrderingScheme());
}

private static boolean orderingSatisfies(
DataOrganizationSpecification specification, OrderingScheme orderingScheme) {
List<Symbol> requiredSymbols = new ArrayList<>();
Map<Symbol, SortOrder> requiredOrderings = new HashMap<>();
for (Symbol symbol : specification.getPartitionBy()) {
requiredSymbols.add(symbol);
requiredOrderings.put(symbol, ASC_NULLS_LAST);
}
specification
.getOrderingScheme()
.ifPresent(
scheme -> {
for (Symbol symbol : scheme.getOrderBy()) {
if (!requiredOrderings.containsKey(symbol)) {
requiredSymbols.add(symbol);
requiredOrderings.put(symbol, scheme.getOrdering(symbol));
}
}
});

if (requiredSymbols.size() > orderingScheme.getOrderBy().size()) {
return false;
}
for (int i = 0; i < requiredSymbols.size(); i++) {
Symbol requiredSymbol = requiredSymbols.get(i);
Symbol actualSymbol = orderingScheme.getOrderBy().get(i);
if (!requiredSymbol.equals(actualSymbol)
|| requiredOrderings.get(requiredSymbol) != orderingScheme.getOrdering(actualSymbol)) {
return false;
}
}
return true;
}

private static int compare(WindowNode o1, WindowNode o2) {
int comparison = comparePartitionBy(o1, o2);
if (comparison != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ private void checkPrefixMatch(Context context, List<Symbol> childOrder) {

@Override
public PlanNode visitGroup(GroupNode node, Context context) {
// A GroupNode without partition keys is a pure global ordering requirement.
if (node.getPartitionKeyCount() == 0) {
return new SortNode(
node.getPlanNodeId(),
node.getChild().accept(this, new Context(null, 0)),
node.getOrderingScheme(),
false,
false);
}

checkPrefixMatch(
context, node.getOrderingScheme().getOrderBy().subList(0, node.getPartitionKeyCount()));
Context newContext = new Context(node.getOrderingScheme(), node.getPartitionKeyCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,8 @@ public void testForecastFunction() {
anyTree(
tableFunctionProcessor(
tableFunctionMatcher,
group(
sort(
ImmutableList.of(sort("time_0", ASCENDING, FIRST)),
0,
topK(
1440,
ImmutableList.of(sort("time_0", DESCENDING, LAST)),
Expand All @@ -398,15 +397,15 @@ public void testForecastFunction() {
/*
* └──OutputNode
* └──TableFunctionProcessor
* └──GroupNode
* └──SortNode
* └──TableScan
*/
assertPlan(
planTester.getFragmentPlan(0),
output(
tableFunctionProcessor(
tableFunctionMatcher,
group(ImmutableList.of(sort("time_0", ASCENDING, FIRST)), 0, tableScan))));
sort(ImmutableList.of(sort("time_0", ASCENDING, FIRST)), tableScan))));
}

@Test
Expand Down Expand Up @@ -445,9 +444,8 @@ public void testForecastFunctionWithNoLowerCase() {
anyTree(
tableFunctionProcessor(
tableFunctionMatcher,
group(
sort(
ImmutableList.of(sort("time_0", ASCENDING, FIRST)),
0,
topK(
1440,
ImmutableList.of(sort("time_0", DESCENDING, LAST)),
Expand All @@ -458,15 +456,15 @@ public void testForecastFunctionWithNoLowerCase() {
/*
* └──OutputNode
* └──TableFunctionProcessor
* └──GroupNode
* └──SortNode
* └──TableScan
*/
assertPlan(
planTester.getFragmentPlan(0),
output(
tableFunctionProcessor(
tableFunctionMatcher,
group(ImmutableList.of(sort("time_0", ASCENDING, FIRST)), 0, tableScan))));
sort(ImmutableList.of(sort("time_0", ASCENDING, FIRST)), tableScan))));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,22 +97,29 @@ public void testSwapWindowFunctions() {
"SELECT sum(s1) OVER (PARTITION BY tag1, s1), min(s1) OVER (PARTITION BY tag1) FROM table1";
LogicalQueryPlan logicalQueryPlan2 = planTester.createPlan(sql2);

// Two window functions have swapped. Since the initial sort by (tag1, s1) satisfies both
// windows, no extra sort is needed between them.
// The initial sort by (tag1, s1) satisfies both windows. The second window can therefore
// reuse the grouping over tag1 without an extra sort.
/*
* └──OutputNode
* └──ProjectNode
* └──WindowNode(PARTITION BY tag1, s1)
* └──WindowNode(PARTITION BY tag1)
* └──SortNode
* └──TableScanNode
* └──WindowNode(PARTITION BY tag1)
* └──GroupNode(PARTITION BY tag1)
* └──WindowNode(PARTITION BY tag1, s1)
* └──SortNode
* └──TableScanNode
*/
assertPlan(
logicalQueryPlan2,
output(
project(
window(
ImmutableList.of("tag1", "s1"), ImmutableList.of(), window(sort(tableScan))))));
ImmutableList.of("tag1"),
ImmutableList.of(),
group(
window(
ImmutableList.of("tag1", "s1"),
ImmutableList.of(),
sort(tableScan)))))));
}

@Test
Expand Down
Loading