From 3c761e651bdf63917574244f16206d01331f55d1 Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 27 Feb 2026 23:09:56 +0800 Subject: [PATCH 1/3] [opt](maxcompute) opt performance for query with limit --- .../maxcompute/source/MaxComputeScanNode.java | 126 +++++- .../org/apache/doris/qe/SessionVariable.java | 13 + .../source/MaxComputeScanNodeTest.java | 397 ++++++++++++++++++ .../test_mc_limit_split_optimization.groovy | 257 ++++++++++++ 4 files changed, 789 insertions(+), 4 deletions(-) create mode 100644 fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java create mode 100644 regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java index 5f73f5cab8d1a7..17214156c88184 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java @@ -54,6 +54,7 @@ import com.aliyun.odps.PartitionSpec; import com.aliyun.odps.table.configuration.ArrowOptions; import com.aliyun.odps.table.configuration.ArrowOptions.TimestampUnit; +import com.aliyun.odps.table.configuration.SplitOptions; import com.aliyun.odps.table.optimizer.predicate.Predicate; import com.aliyun.odps.table.read.TableBatchReadSession; import com.aliyun.odps.table.read.TableReadSessionBuilder; @@ -100,6 +101,8 @@ public class MaxComputeScanNode extends FileQueryScanNode { private int readTimeout; private int retryTimes; + private boolean onlyPartitionEqualityPredicate = false; + @Setter private SelectedPartitions selectedPartitions = null; @@ -177,6 +180,12 @@ private void createRequiredColumns() { */ TableBatchReadSession createTableBatchReadSession(List requiredPartitionSpecs) throws IOException { MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog(); + return createTableBatchReadSession(requiredPartitionSpecs, mcCatalog.getSplitOption()); + } + + TableBatchReadSession createTableBatchReadSession( + List requiredPartitionSpecs, SplitOptions splitOptions) throws IOException { + MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog(); readTimeout = mcCatalog.getReadTimeout(); connectTimeout = mcCatalog.getConnectTimeout(); @@ -186,7 +195,7 @@ TableBatchReadSession createTableBatchReadSession(List requiredPa return scanBuilder.identifier(table.getTableIdentifier()) .withSettings(mcCatalog.getSettings()) - .withSplitOptions(mcCatalog.getSplitOption()) + .withSplitOptions(splitOptions) .requiredPartitionColumns(requiredPartitionColumns) .requiredDataColumns(orderedRequiredDataColumns) .withFilterPredicate(filterPredicate) @@ -315,6 +324,33 @@ protected void convertPredicate() { } this.filterPredicate = filterPredicate; } + + this.onlyPartitionEqualityPredicate = checkOnlyPartitionEqualityPredicate(); + } + + private boolean checkOnlyPartitionEqualityPredicate() { + if (conjuncts.isEmpty()) { + return true; + } + Set partitionColumns = + table.getPartitionColumns().stream().map(Column::getName).collect(Collectors.toSet()); + for (Expr expr : conjuncts) { + if (!(expr instanceof BinaryPredicate)) { + return false; + } + BinaryPredicate bp = (BinaryPredicate) expr; + if (bp.getOp() != BinaryPredicate.Operator.EQ) { + return false; + } + if (!(bp.getChild(0) instanceof SlotRef) || !(bp.getChild(1) instanceof LiteralExpr)) { + return false; + } + String colName = ((SlotRef) bp.getChild(0)).getColumnName(); + if (!partitionColumns.contains(colName)) { + return false; + } + } + return true; } private Predicate convertExprToOdpsPredicate(Expr expr) throws AnalysisException { @@ -576,14 +612,23 @@ protected Map getLocationProperties() throws UserException { private List getSplitByTableSession(TableBatchReadSession tableBatchReadSession) throws IOException { List result = new ArrayList<>(); + + long t0 = System.currentTimeMillis(); String scanSessionSerialize = serializeSession(tableBatchReadSession); + long t1 = System.currentTimeMillis(); + LOG.info("MaxComputeScanNode getSplitByTableSession: serializeSession cost {} ms, " + + "serialized size: {} bytes", t1 - t0, scanSessionSerialize.length()); + InputSplitAssigner assigner = tableBatchReadSession.getInputSplitAssigner(); + long t2 = System.currentTimeMillis(); + LOG.info("MaxComputeScanNode getSplitByTableSession: getInputSplitAssigner cost {} ms", t2 - t1); + long modificationTime = table.getOdpsTable().getLastDataModifiedTime().getTime(); MaxComputeExternalCatalog mcCatalog = (MaxComputeExternalCatalog) table.getCatalog(); if (mcCatalog.getSplitStrategy().equals(MCProperties.SPLIT_BY_BYTE_SIZE_STRATEGY)) { - + long t3 = System.currentTimeMillis(); for (com.aliyun.odps.table.read.split.InputSplit split : assigner.getAllSplits()) { MaxComputeSplit maxComputeSplit = new MaxComputeSplit(BYTE_SIZE_PATH, @@ -599,7 +644,10 @@ private List getSplitByTableSession(TableBatchReadSession tableBatchReadS result.add(maxComputeSplit); } + LOG.info("MaxComputeScanNode getSplitByTableSession: byte_size getAllSplits+build cost {} ms, " + + "splits size: {}", System.currentTimeMillis() - t3, result.size()); } else { + long t3 = System.currentTimeMillis(); long totalRowCount = assigner.getTotalRowCount(); long recordsPerSplit = mcCatalog.getSplitRowCount(); @@ -619,17 +667,27 @@ private List getSplitByTableSession(TableBatchReadSession tableBatchReadS result.add(maxComputeSplit); } + LOG.info("MaxComputeScanNode getSplitByTableSession: row_offset getSplitByRowOffset+build cost {} ms, " + + "splits size: {}, totalRowCount: {}", System.currentTimeMillis() - t3, result.size(), + totalRowCount); } + return result; } @Override public List getSplits(int numBackends) throws UserException { + long startTime = System.currentTimeMillis(); List result = new ArrayList<>(); com.aliyun.odps.Table odpsTable = table.getOdpsTable(); + long getOdpsTableTime = System.currentTimeMillis(); + LOG.info("MaxComputeScanNode getSplits: getOdpsTable cost {} ms", getOdpsTableTime - startTime); + if (desc.getSlots().isEmpty() || odpsTable.getFileNum() <= 0) { return result; } + long getFileNumTime = System.currentTimeMillis(); + LOG.info("MaxComputeScanNode getSplits: getFileNum cost {} ms", getFileNumTime - getOdpsTableTime); createRequiredColumns(); @@ -649,11 +707,71 @@ public List getSplits(int numBackends) throws UserException { } try { - TableBatchReadSession tableBatchReadSession = createTableBatchReadSession(requiredPartitionSpecs); - result = getSplitByTableSession(tableBatchReadSession); + long beforeSession = System.currentTimeMillis(); + if (sessionVariable.enableMcLimitSplitOptimization + && onlyPartitionEqualityPredicate && hasLimit()) { + result = getSplitsWithLimitOptimization(requiredPartitionSpecs); + } else { + TableBatchReadSession tableBatchReadSession = createTableBatchReadSession(requiredPartitionSpecs); + long afterSession = System.currentTimeMillis(); + LOG.info("MaxComputeScanNode getSplits: createTableBatchReadSession cost {} ms, " + + "partitionSpecs size: {}", afterSession - beforeSession, requiredPartitionSpecs.size()); + + result = getSplitByTableSession(tableBatchReadSession); + long afterSplit = System.currentTimeMillis(); + LOG.info("MaxComputeScanNode getSplits: getSplitByTableSession cost {} ms, " + + "splits size: {}", afterSplit - afterSession, result.size()); + } } catch (IOException e) { throw new RuntimeException(e); } + LOG.info("MaxComputeScanNode getSplits: total cost {} ms", System.currentTimeMillis() - startTime); + return result; + } + + private List getSplitsWithLimitOptimization( + List requiredPartitionSpecs) throws IOException { + long startTime = System.currentTimeMillis(); + + SplitOptions rowOffsetOptions = SplitOptions.newBuilder() + .SplitByRowOffset() + .withCrossPartition(false) + .build(); + + TableBatchReadSession tableBatchReadSession = + createTableBatchReadSession(requiredPartitionSpecs, rowOffsetOptions); + long afterSession = System.currentTimeMillis(); + LOG.info("MaxComputeScanNode getSplitsWithLimitOptimization: " + + "createTableBatchReadSession cost {} ms", afterSession - startTime); + + String scanSessionSerialize = serializeSession(tableBatchReadSession); + InputSplitAssigner assigner = tableBatchReadSession.getInputSplitAssigner(); + long totalRowCount = assigner.getTotalRowCount(); + + LOG.info("MaxComputeScanNode getSplitsWithLimitOptimization: " + + "totalRowCount={}, limit={}", totalRowCount, getLimit()); + + List result = new ArrayList<>(); + if (totalRowCount <= 0) { + return result; + } + + long rowsToRead = Math.min(getLimit(), totalRowCount); + long modificationTime = table.getOdpsTable().getLastDataModifiedTime().getTime(); + com.aliyun.odps.table.read.split.InputSplit split = + assigner.getSplitByRowOffset(0, rowsToRead); + + MaxComputeSplit maxComputeSplit = new MaxComputeSplit( + ROW_OFFSET_PATH, 0, rowsToRead, totalRowCount, + modificationTime, null, Collections.emptyList()); + maxComputeSplit.scanSerialize = scanSessionSerialize; + maxComputeSplit.splitType = SplitType.ROW_OFFSET; + maxComputeSplit.sessionId = split.getSessionId(); + result.add(maxComputeSplit); + + LOG.info("MaxComputeScanNode getSplitsWithLimitOptimization: " + + "total cost {} ms, 1 split with {} rows", + System.currentTimeMillis() - startTime, rowsToRead); return result; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 3fc7d3a76340aa..892dcf6ae5cb39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -2787,6 +2787,8 @@ public boolean isEnableHboNonStrictMatchingMode() { public static final String IGNORE_RUNTIME_FILTER_IDS = "ignore_runtime_filter_ids"; public static final String ENABLE_EXTERNAL_TABLE_BATCH_MODE = "enable_external_table_batch_mode"; + + public static final String ENABLE_MC_LIMIT_SPLIT_OPTIMIZATION = "enable_mc_limit_split_optimization"; @VariableMgr.VarAttr( name = ENABLE_EXTERNAL_TABLE_BATCH_MODE, fuzzy = true, @@ -2794,6 +2796,17 @@ public boolean isEnableHboNonStrictMatchingMode() { needForward = true) public boolean enableExternalTableBatchMode = true; + @VariableMgr.VarAttr( + name = ENABLE_MC_LIMIT_SPLIT_OPTIMIZATION, + fuzzy = true, + description = {"开启 MaxCompute 表 LIMIT 查询的 split 优化。当查询仅包含分区等值条件且带有 LIMIT 时," + + "使用 row_offset 策略减少 split 数量以加速查询。", + "Enable split optimization for LIMIT queries on MaxCompute tables. " + + "When the query contains only partition equality predicates with LIMIT, " + + "use row_offset strategy to reduce split count for faster query execution."}, + needForward = true) + public boolean enableMcLimitSplitOptimization = false; + @VariableMgr.VarAttr(name = SKEW_REWRITE_AGG_BUCKET_NUM, needForward = true, description = {"bucketNum 参数控制 count(distinct) 倾斜优化的数据分布。决定不同值在 worker 间的分配方式," + "值越大越能处理极端倾斜但增加 shuffle 开销,值越小网络开销越低但可能无法完全解决倾斜。", diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java new file mode 100644 index 00000000000000..5713785008c100 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java @@ -0,0 +1,397 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.maxcompute.source; + +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; +import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; +import org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType; +import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions; +import org.apache.doris.planner.PlanNode; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.spi.Split; + +import com.aliyun.odps.table.DataFormat; +import com.aliyun.odps.table.DataSchema; +import com.aliyun.odps.table.SessionStatus; +import com.aliyun.odps.table.TableIdentifier; +import com.aliyun.odps.table.read.TableBatchReadSession; +import com.aliyun.odps.table.read.split.InputSplitAssigner; +import com.google.common.collect.Lists; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class MaxComputeScanNodeTest { + + @Mock + private MaxComputeExternalTable table; + + @Mock + private MaxComputeExternalCatalog catalog; + + @Mock + private com.aliyun.odps.Table odpsTable; + + private SessionVariable sv; + private TupleDescriptor desc; + private MaxComputeScanNode node; + + private List partitionColumns; + + @Before + public void setUp() { + partitionColumns = Arrays.asList( + new Column("dt", PrimitiveType.VARCHAR), + new Column("hr", PrimitiveType.VARCHAR) + ); + Mockito.when(table.getPartitionColumns()).thenReturn(partitionColumns); + Mockito.when(table.getCatalog()).thenReturn(catalog); + Mockito.when(table.getOdpsTable()).thenReturn(odpsTable); + + desc = Mockito.mock(TupleDescriptor.class); + Mockito.when(desc.getTable()).thenReturn(table); + Mockito.when(desc.getId()).thenReturn(new TupleId(0)); + Mockito.when(desc.getSlots()).thenReturn(new ArrayList<>()); + + sv = new SessionVariable(); + node = new MaxComputeScanNode(new PlanNodeId(0), desc, + SelectedPartitions.NOT_PRUNED, false, sv); + } + + // ==================== Reflection Helpers ==================== + + private void setConjuncts(PlanNode target, List conjuncts) throws Exception { + Field f = PlanNode.class.getDeclaredField("conjuncts"); + f.setAccessible(true); + f.set(target, conjuncts); + } + + private void setLimit(PlanNode target, long limit) throws Exception { + Field f = PlanNode.class.getDeclaredField("limit"); + f.setAccessible(true); + f.setLong(target, limit); + } + + private void setOnlyPartitionEqualityPredicate(MaxComputeScanNode target, boolean value) throws Exception { + Field f = MaxComputeScanNode.class.getDeclaredField("onlyPartitionEqualityPredicate"); + f.setAccessible(true); + f.setBoolean(target, value); + } + + private boolean invokeCheckOnlyPartitionEqualityPredicate(MaxComputeScanNode target) throws Exception { + Method m = MaxComputeScanNode.class.getDeclaredMethod("checkOnlyPartitionEqualityPredicate"); + m.setAccessible(true); + return (boolean) m.invoke(target); + } + + // ==================== Group 1: checkOnlyPartitionEqualityPredicate ==================== + + @Test + public void testCheckOnlyPartEq_emptyConjuncts() throws Exception { + setConjuncts(node, new ArrayList<>()); + Assert.assertTrue(invokeCheckOnlyPartitionEqualityPredicate(node)); + } + + @Test + public void testCheckOnlyPartEq_singlePartitionEquality() throws Exception { + SlotRef dtSlot = new SlotRef(null, "dt"); + StringLiteral val = new StringLiteral("2026-02-26"); + BinaryPredicate eq = new BinaryPredicate(BinaryPredicate.Operator.EQ, dtSlot, val); + setConjuncts(node, Lists.newArrayList(eq)); + Assert.assertTrue(invokeCheckOnlyPartitionEqualityPredicate(node)); + } + + @Test + public void testCheckOnlyPartEq_multiPartitionEquality() throws Exception { + SlotRef dtSlot = new SlotRef(null, "dt"); + SlotRef hrSlot = new SlotRef(null, "hr"); + BinaryPredicate eq1 = new BinaryPredicate(BinaryPredicate.Operator.EQ, dtSlot, new StringLiteral("x")); + BinaryPredicate eq2 = new BinaryPredicate(BinaryPredicate.Operator.EQ, hrSlot, new StringLiteral("10")); + setConjuncts(node, Lists.newArrayList(eq1, eq2)); + Assert.assertTrue(invokeCheckOnlyPartitionEqualityPredicate(node)); + } + + @Test + public void testCheckOnlyPartEq_nonPartitionColumn() throws Exception { + SlotRef statusSlot = new SlotRef(null, "status"); + BinaryPredicate eq = new BinaryPredicate(BinaryPredicate.Operator.EQ, statusSlot, new StringLiteral("active")); + setConjuncts(node, Lists.newArrayList(eq)); + Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node)); + } + + @Test + public void testCheckOnlyPartEq_nonEqOperator() throws Exception { + SlotRef dtSlot = new SlotRef(null, "dt"); + BinaryPredicate gt = new BinaryPredicate(BinaryPredicate.Operator.GT, dtSlot, new StringLiteral("2026-01-01")); + setConjuncts(node, Lists.newArrayList(gt)); + Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node)); + } + + @Test + public void testCheckOnlyPartEq_inPredicate() throws Exception { + SlotRef dtSlot = new SlotRef(null, "dt"); + List inList = Lists.newArrayList(new StringLiteral("a"), new StringLiteral("b")); + InPredicate inPred = new InPredicate(dtSlot, inList, false); + setConjuncts(node, Lists.newArrayList(inPred)); + Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node)); + } + + @Test + public void testCheckOnlyPartEq_leftSideNotSlotRef() throws Exception { + StringLiteral left = new StringLiteral("x"); + StringLiteral right = new StringLiteral("x"); + BinaryPredicate eq = new BinaryPredicate(BinaryPredicate.Operator.EQ, left, right); + setConjuncts(node, Lists.newArrayList(eq)); + Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node)); + } + + @Test + public void testCheckOnlyPartEq_rightSideNotLiteral() throws Exception { + SlotRef dtSlot = new SlotRef(null, "dt"); + SlotRef hrSlot = new SlotRef(null, "hr"); + BinaryPredicate eq = new BinaryPredicate(BinaryPredicate.Operator.EQ, dtSlot, hrSlot); + setConjuncts(node, Lists.newArrayList(eq)); + Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node)); + } + + // ==================== Serializable Stub for TableBatchReadSession ==================== + + private static class StubTableBatchReadSession implements TableBatchReadSession { + private static final long serialVersionUID = 1L; + private transient InputSplitAssigner assigner; + + StubTableBatchReadSession(InputSplitAssigner assigner) { + this.assigner = assigner; + } + + @Override + public InputSplitAssigner getInputSplitAssigner() throws IOException { + return assigner; + } + + @Override + public DataSchema readSchema() { + return null; + } + + @Override + public boolean supportsDataFormat(DataFormat dataFormat) { + return false; + } + + @Override + public String getId() { + return "stub-session"; + } + + @Override + public TableIdentifier getTableIdentifier() { + return null; + } + + @Override + public SessionStatus getStatus() { + return SessionStatus.NORMAL; + } + + @Override + public String toJson() { + return "{}"; + } + } + + // ==================== Mock Session Helper ==================== + + private MaxComputeScanNode createSpyNodeWithMockSession(long totalRowCount) throws Exception { + MaxComputeScanNode spyNode = Mockito.spy(node); + + InputSplitAssigner mockAssigner = Mockito.mock(InputSplitAssigner.class); + com.aliyun.odps.table.read.split.InputSplit mockInputSplit = + Mockito.mock(com.aliyun.odps.table.read.split.InputSplit.class); + + Mockito.when(mockAssigner.getTotalRowCount()).thenReturn(totalRowCount); + Mockito.when(mockAssigner.getSplitByRowOffset(Mockito.anyLong(), Mockito.anyLong())) + .thenReturn(mockInputSplit); + Mockito.when(mockInputSplit.getSessionId()).thenReturn("test-session-id"); + + StubTableBatchReadSession stubSession = new StubTableBatchReadSession(mockAssigner); + + Mockito.doReturn(stubSession).when(spyNode) + .createTableBatchReadSession(Mockito.anyList(), Mockito.any( + com.aliyun.odps.table.configuration.SplitOptions.class)); + Mockito.doReturn(stubSession).when(spyNode) + .createTableBatchReadSession(Mockito.anyList()); + + Mockito.when(odpsTable.getLastDataModifiedTime()).thenReturn(new Date(1000L)); + + return spyNode; + } + + // ==================== Group 2: getSplitsWithLimitOptimization ==================== + + private List invokeGetSplitsWithLimitOptimization( + MaxComputeScanNode target) throws Exception { + Method m = MaxComputeScanNode.class.getDeclaredMethod( + "getSplitsWithLimitOptimization", List.class); + m.setAccessible(true); + @SuppressWarnings("unchecked") + List result = (List) m.invoke(target, Collections.emptyList()); + return result; + } + + @Test + public void testLimitOpt_limitLessThanTotal() throws Exception { + MaxComputeScanNode spyNode = createSpyNodeWithMockSession(10000L); + setLimit(spyNode, 100L); + + List result = invokeGetSplitsWithLimitOptimization(spyNode); + + Assert.assertEquals(1, result.size()); + MaxComputeSplit split = (MaxComputeSplit) result.get(0); + Assert.assertEquals(SplitType.ROW_OFFSET, split.splitType); + Assert.assertEquals(100L, split.getLength()); + } + + @Test + public void testLimitOpt_limitGreaterThanTotal() throws Exception { + MaxComputeScanNode spyNode = createSpyNodeWithMockSession(200L); + setLimit(spyNode, 50000L); + + List result = invokeGetSplitsWithLimitOptimization(spyNode); + + Assert.assertEquals(1, result.size()); + MaxComputeSplit split = (MaxComputeSplit) result.get(0); + Assert.assertEquals(SplitType.ROW_OFFSET, split.splitType); + Assert.assertEquals(200L, split.getLength()); + } + + @Test + public void testLimitOpt_totalRowCountZero() throws Exception { + MaxComputeScanNode spyNode = createSpyNodeWithMockSession(0L); + setLimit(spyNode, 100L); + + List result = invokeGetSplitsWithLimitOptimization(spyNode); + + Assert.assertTrue(result.isEmpty()); + } + + // ==================== Group 3: getSplits gating conditions ==================== + + private MaxComputeScanNode createSpyNodeForGetSplits(long totalRowCount) throws Exception { + // Need non-empty slots so getSplits doesn't return early + SlotDescriptor mockSlotDesc = Mockito.mock(SlotDescriptor.class); + Column dataCol = new Column("value", PrimitiveType.VARCHAR); + Mockito.when(mockSlotDesc.getColumn()).thenReturn(dataCol); + Mockito.when(desc.getSlots()).thenReturn(Lists.newArrayList(mockSlotDesc)); + + // Need fileNum > 0 + Mockito.when(odpsTable.getFileNum()).thenReturn(10L); + + // For normal path: use row_count strategy + Mockito.when(catalog.getSplitStrategy()).thenReturn("row_count"); + Mockito.when(catalog.getSplitRowCount()).thenReturn(totalRowCount); + + // Need table.getColumns() for createRequiredColumns() + List allColumns = Lists.newArrayList( + new Column("dt", PrimitiveType.VARCHAR), + new Column("hr", PrimitiveType.VARCHAR), + new Column("value", PrimitiveType.VARCHAR) + ); + Mockito.when(table.getColumns()).thenReturn(allColumns); + + return createSpyNodeWithMockSession(totalRowCount); + } + + @Test + public void testGetSplits_allConditionsMet_optimizationPath() throws Exception { + MaxComputeScanNode spyNode = createSpyNodeForGetSplits(10000L); + sv.enableMcLimitSplitOptimization = true; + setOnlyPartitionEqualityPredicate(spyNode, true); + setLimit(spyNode, 100L); + + List result = spyNode.getSplits(1); + + Assert.assertEquals(1, result.size()); + MaxComputeSplit split = (MaxComputeSplit) result.get(0); + Assert.assertEquals(SplitType.ROW_OFFSET, split.splitType); + Assert.assertEquals(100L, split.getLength()); + } + + @Test + public void testGetSplits_optimizationDisabled_normalPath() throws Exception { + MaxComputeScanNode spyNode = createSpyNodeForGetSplits(1000L); + sv.enableMcLimitSplitOptimization = false; + setOnlyPartitionEqualityPredicate(spyNode, true); + setLimit(spyNode, 100L); + + List result = spyNode.getSplits(1); + + // Normal path with row_count strategy: totalRowCount=1000, splitRowCount=1000 → 1 split + // but the split length equals splitRowCount, not limit + Assert.assertFalse(result.isEmpty()); + } + + @Test + public void testGetSplits_nonPartitionPredicate_normalPath() throws Exception { + MaxComputeScanNode spyNode = createSpyNodeForGetSplits(1000L); + sv.enableMcLimitSplitOptimization = true; + setOnlyPartitionEqualityPredicate(spyNode, false); + setLimit(spyNode, 100L); + + List result = spyNode.getSplits(1); + + Assert.assertFalse(result.isEmpty()); + } + + @Test + public void testGetSplits_noLimit_normalPath() throws Exception { + MaxComputeScanNode spyNode = createSpyNodeForGetSplits(1000L); + sv.enableMcLimitSplitOptimization = true; + setOnlyPartitionEqualityPredicate(spyNode, true); + // limit defaults to -1 (no limit), don't set it + + List result = spyNode.getSplits(1); + + Assert.assertFalse(result.isEmpty()); + } +} diff --git a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy new file mode 100644 index 00000000000000..14a70d833a0cff --- /dev/null +++ b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy @@ -0,0 +1,257 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_mc_limit_split_optimization", "p2,external,maxcompute,external_remote,external_remote_maxcompute") { + String enabled = context.config.otherConfigs.get("enableMaxComputeTest") + if (enabled == null || !enabled.equalsIgnoreCase("true")) { + logger.info("disable MaxCompute test.") + return + } + + String ak = context.config.otherConfigs.get("ak") + String sk = context.config.otherConfigs.get("sk") + String mc_catalog_name = "test_mc_limit_split_opt" + String defaultProject = "mc_doris_test_write"" + + sql """drop catalog if exists ${mc_catalog_name}""" + sql """ + CREATE CATALOG IF NOT EXISTS ${mc_catalog_name} PROPERTIES ( + "type" = "max_compute", + "mc.default.project" = "${defaultProject}", + "mc.access_key" = "${ak}", + "mc.secret_key" = "${sk}", + "mc.endpoint" = "http://service.cn-beijing-vpc.maxcompute.aliyun-inc.com/api", + "mc.quota" = "pay-as-you-go", + "mc.enable.namespace.schema" = "true" + ); + """ + + sql """switch ${mc_catalog_name}""" + + def uuid = UUID.randomUUID().toString().replace("-", "").substring(0, 8) + String db = "mc_limit_opt_${uuid}" + + sql """drop database if exists ${db}""" + sql """create database ${db}""" + sql """use ${db}""" + + try { + // ==================== Step 1: Create partition table and insert data ==================== + String tb = "many_part_${uuid}" + sql """DROP TABLE IF EXISTS ${tb}""" + sql """ + CREATE TABLE ${tb} ( + id INT, + name STRING, + pt STRING + ) + PARTITION BY (pt)() + """ + + // Generate 50 partitions, each with 20 rows = 1000 rows total + sql """ + INSERT INTO ${tb} + SELECT + c1 * 50 + c2 + 1 AS id, + CONCAT('name_', CAST(c1 * 50 + c2 + 1 AS STRING)) AS name, + CAST(c2 + 1 AS STRING) AS pt + FROM (SELECT 1) t + LATERAL VIEW EXPLODE_NUMBERS(20) t1 AS c1 + LATERAL VIEW EXPLODE_NUMBERS(50) t2 AS c2 + """ + + // Verify total row count + def totalCount = sql """SELECT count(*) FROM ${tb}""" + logger.info("Total rows inserted: ${totalCount}") + assert totalCount[0][0] == 1000 + + // ==================== Step 2: Multi-partition column table ==================== + String tb2 = "multi_part_${uuid}" + sql """DROP TABLE IF EXISTS ${tb2}""" + sql """ + CREATE TABLE ${tb2} ( + id INT, + val STRING, + dt STRING, + region STRING + ) + PARTITION BY (dt, region)() + """ + + // 5 dt x 4 region = 20 partitions, 5 rows each = 100 rows + sql """ + INSERT INTO ${tb2} + SELECT + c1 * 20 + c2 * 4 + c3 + 1 AS id, + CONCAT('val_', CAST(c1 * 20 + c2 * 4 + c3 + 1 AS STRING)) AS val, + CONCAT('2026-01-0', CAST(c2 + 1 AS STRING)) AS dt, + CONCAT('r', CAST(c3 + 1 AS STRING)) AS region + FROM (SELECT 1) t + LATERAL VIEW EXPLODE_NUMBERS(5) t1 AS c1 + LATERAL VIEW EXPLODE_NUMBERS(5) t2 AS c2 + LATERAL VIEW EXPLODE_NUMBERS(4) t3 AS c3 + """ + + def totalCount2 = sql """SELECT count(*) FROM ${tb2}""" + logger.info("Multi-part total rows: ${totalCount2}") + assert totalCount2[0][0] == 100 + + // ==================== Step 3: Test single-partition table ==================== + // Helper: run query with opt ON and OFF, assert results are equal + def compareWithAndWithoutOpt = { String queryLabel, String query -> + sql """set enable_mc_limit_split_optimization = true""" + def resultOn = sql "${query}" + sql """set enable_mc_limit_split_optimization = false""" + def resultOff = sql "${query}" + logger.info("${queryLabel}: opt_on=${resultOn.size()} rows, opt_off=${resultOff.size()} rows") + assert resultOn == resultOff : "${queryLabel}: results differ between opt ON and OFF" + } + + // --- Case 1: partition equality + LIMIT (optimization should kick in) --- + compareWithAndWithoutOpt("case1_part_eq_limit", + "SELECT * FROM ${tb} WHERE pt = '1' ORDER BY id LIMIT 5") + + // --- Case 2: partition equality + small LIMIT --- + compareWithAndWithoutOpt("case2_part_eq_limit1", + "SELECT * FROM ${tb} WHERE pt = '10' ORDER BY id LIMIT 1") + + // --- Case 3: partition equality + LIMIT larger than partition data --- + compareWithAndWithoutOpt("case3_part_eq_limit_large", + "SELECT * FROM ${tb} WHERE pt = '5' ORDER BY id LIMIT 1000") + + // --- Case 4: no predicate + LIMIT (empty conjuncts → optimization eligible) --- + compareWithAndWithoutOpt("case4_no_pred_limit", + "SELECT * FROM ${tb} ORDER BY id LIMIT 10") + + // --- Case 5: non-partition predicate + LIMIT (optimization should NOT kick in) --- + compareWithAndWithoutOpt("case5_non_part_pred_limit", + "SELECT * FROM ${tb} WHERE name = 'name_1' ORDER BY id LIMIT 5") + + // --- Case 6: partition equality + non-partition predicate + LIMIT (mixed → no opt) --- + compareWithAndWithoutOpt("case6_mixed_pred_limit", + "SELECT * FROM ${tb} WHERE pt = '1' AND name = 'name_1' ORDER BY id LIMIT 5") + + // --- Case 7: partition range predicate + LIMIT (non-equality → no opt) --- + compareWithAndWithoutOpt("case7_range_pred_limit", + "SELECT * FROM ${tb} WHERE pt > '3' ORDER BY id LIMIT 10") + + // --- Case 8: partition IN predicate + LIMIT (IN is not EQ → no opt) --- + compareWithAndWithoutOpt("case8_in_pred_limit", + "SELECT * FROM ${tb} WHERE pt IN ('1', '2', '3') ORDER BY id LIMIT 10") + + // --- Case 9: partition equality + no LIMIT (no opt) --- + compareWithAndWithoutOpt("case9_part_eq_no_limit", + "SELECT * FROM ${tb} WHERE pt = '1' ORDER BY id") + + // --- Case 10: count(*) with partition equality + LIMIT --- + compareWithAndWithoutOpt("case10_count_part_eq_limit", + "SELECT count(*) FROM ${tb} WHERE pt = '1' LIMIT 1") + + // ==================== Step 4: Test multi-partition column table ==================== + + // --- Case 11: both partition columns equality + LIMIT (opt eligible) --- + compareWithAndWithoutOpt("case11_multi_part_eq_limit", + "SELECT * FROM ${tb2} WHERE dt = '2026-01-01' AND region = 'r1' ORDER BY id LIMIT 3") + + // --- Case 12: single partition column equality + LIMIT (opt eligible) --- + compareWithAndWithoutOpt("case12_single_part_eq_limit", + "SELECT * FROM ${tb2} WHERE dt = '2026-01-03' ORDER BY id LIMIT 5") + + // --- Case 13: partition equality + non-partition predicate + LIMIT (no opt) --- + compareWithAndWithoutOpt("case13_multi_mixed_limit", + "SELECT * FROM ${tb2} WHERE dt = '2026-01-01' AND val = 'val_1' ORDER BY id LIMIT 5") + + // --- Case 14: no predicate + LIMIT on multi-part table --- + compareWithAndWithoutOpt("case14_multi_no_pred_limit", + "SELECT * FROM ${tb2} ORDER BY id LIMIT 10") + + // --- Case 15: partition range on multi-part + LIMIT (no opt) --- + compareWithAndWithoutOpt("case15_multi_range_limit", + "SELECT * FROM ${tb2} WHERE dt >= '2026-01-03' ORDER BY id LIMIT 10") + + // ==================== Step 4b: Complex queries (JOIN / aggregation / subquery) ==================== + + // --- Case 16: self-JOIN with partition equality + LIMIT --- + compareWithAndWithoutOpt("case16_self_join_limit", + "SELECT a.id, a.name, b.name AS name2 FROM ${tb} a JOIN ${tb} b ON a.id = b.id WHERE a.pt = '1' ORDER BY a.id LIMIT 5") + + // --- Case 17: JOIN between two MC tables with partition equality + LIMIT --- + compareWithAndWithoutOpt("case17_cross_table_join_limit", + "SELECT a.id, a.name, b.val FROM ${tb} a JOIN ${tb2} b ON a.id = b.id WHERE a.pt = '1' AND b.dt = '2026-01-01' ORDER BY a.id LIMIT 5") + + // --- Case 18: SUM + GROUP BY with partition equality + LIMIT --- + compareWithAndWithoutOpt("case18_sum_group_limit", + "SELECT pt, SUM(id) AS total_id, COUNT(*) AS cnt FROM ${tb} WHERE pt = '1' GROUP BY pt LIMIT 5") + + // --- Case 19: aggregation across all partitions + LIMIT --- + compareWithAndWithoutOpt("case19_agg_all_part_limit", + "SELECT pt, COUNT(*) AS cnt, MIN(id) AS min_id, MAX(id) AS max_id FROM ${tb} GROUP BY pt ORDER BY pt LIMIT 10") + + // --- Case 20: SUM + GROUP BY on multi-part table with partition equality + LIMIT --- + compareWithAndWithoutOpt("case20_multi_part_agg_limit", + "SELECT dt, region, SUM(id) AS total_id FROM ${tb2} WHERE dt = '2026-01-01' GROUP BY dt, region ORDER BY region LIMIT 5") + + // --- Case 21: subquery with partition equality + LIMIT --- + compareWithAndWithoutOpt("case21_subquery_limit", + "SELECT * FROM (SELECT id, name, pt FROM ${tb} WHERE pt = '2') sub ORDER BY id LIMIT 5") + + // --- Case 22: JOIN + aggregation + LIMIT --- + compareWithAndWithoutOpt("case22_join_agg_limit", + "SELECT a.pt, COUNT(*) AS cnt, SUM(b.id) AS sum_b_id FROM ${tb} a JOIN ${tb2} b ON a.id = b.id WHERE a.pt = '1' GROUP BY a.pt LIMIT 5") + + // --- Case 23: UNION ALL with partition equality + LIMIT --- + compareWithAndWithoutOpt("case23_union_limit", + "SELECT * FROM (SELECT id, name FROM ${tb} WHERE pt = '1' UNION ALL SELECT id, name FROM ${tb} WHERE pt = '2') u ORDER BY id LIMIT 10") + + // --- Case 24: window function with partition equality + LIMIT --- + compareWithAndWithoutOpt("case24_window_func_limit", + "SELECT id, name, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM ${tb} WHERE pt = '1' ORDER BY id LIMIT 5") + + // --- Case 25: HAVING + aggregation with partition equality + LIMIT --- + compareWithAndWithoutOpt("case25_having_limit", + "SELECT dt, COUNT(*) AS cnt FROM ${tb2} WHERE dt = '2026-01-01' GROUP BY dt HAVING COUNT(*) > 0 ORDER BY dt LIMIT 5") + + // ==================== Step 5: Verify optimization ON produces correct data ==================== + // Spot-check: opt ON result must match known data + sql """set enable_mc_limit_split_optimization = true""" + + // Single partition has exactly 20 rows + def partCount = sql """SELECT count(*) FROM ${tb} WHERE pt = '1'""" + assert partCount[0][0] == 20 + + // LIMIT 5 on a single partition should return exactly 5 rows + def limitResult = sql """SELECT * FROM ${tb} WHERE pt = '1' ORDER BY id LIMIT 5""" + assert limitResult.size() == 5 + + // LIMIT larger than data should return all rows in that partition + def limitLargeResult = sql """SELECT * FROM ${tb} WHERE pt = '1' ORDER BY id LIMIT 100""" + assert limitLargeResult.size() == 20 + + // Multi-part: dt='2026-01-01' AND region='r1' → 5 rows + def multiPartResult = sql """SELECT count(*) FROM ${tb2} WHERE dt = '2026-01-01' AND region = 'r1'""" + assert multiPartResult[0][0] == 5 + + def multiLimitResult = sql """SELECT * FROM ${tb2} WHERE dt = '2026-01-01' AND region = 'r1' ORDER BY id LIMIT 3""" + assert multiLimitResult.size() == 3 + + // Reset session variable + sql """set enable_mc_limit_split_optimization = false""" + + } finally { + sql """drop database if exists ${mc_catalog_name}.${db}""" + } +} From dfb60731327e1a8cdbf45d7ee8d744f3318bc2ba Mon Sep 17 00:00:00 2001 From: morningman Date: Fri, 27 Feb 2026 23:24:31 +0800 Subject: [PATCH 2/3] 3 --- .../maxcompute/write/test_mc_limit_split_optimization.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy index 14a70d833a0cff..d6c78a335e61e7 100644 --- a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy @@ -25,7 +25,7 @@ suite("test_mc_limit_split_optimization", "p2,external,maxcompute,external_remot String ak = context.config.otherConfigs.get("ak") String sk = context.config.otherConfigs.get("sk") String mc_catalog_name = "test_mc_limit_split_opt" - String defaultProject = "mc_doris_test_write"" + String defaultProject = "mc_doris_test_write" sql """drop catalog if exists ${mc_catalog_name}""" sql """ From e46f793bfafe4e81ca257cc5df01afd60ac4ebb7 Mon Sep 17 00:00:00 2001 From: morningman Date: Sat, 28 Feb 2026 11:06:24 +0800 Subject: [PATCH 3/3] support in --- .../maxcompute/source/MaxComputeScanNode.java | 42 ++++++++++++------ .../source/MaxComputeScanNodeTest.java | 43 ++++++++++++++++++- .../test_mc_limit_split_optimization.groovy | 11 ++++- 3 files changed, 82 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java index 17214156c88184..898022b2dcf953 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java @@ -335,18 +335,36 @@ private boolean checkOnlyPartitionEqualityPredicate() { Set partitionColumns = table.getPartitionColumns().stream().map(Column::getName).collect(Collectors.toSet()); for (Expr expr : conjuncts) { - if (!(expr instanceof BinaryPredicate)) { - return false; - } - BinaryPredicate bp = (BinaryPredicate) expr; - if (bp.getOp() != BinaryPredicate.Operator.EQ) { - return false; - } - if (!(bp.getChild(0) instanceof SlotRef) || !(bp.getChild(1) instanceof LiteralExpr)) { - return false; - } - String colName = ((SlotRef) bp.getChild(0)).getColumnName(); - if (!partitionColumns.contains(colName)) { + if (expr instanceof BinaryPredicate) { + BinaryPredicate bp = (BinaryPredicate) expr; + if (bp.getOp() != BinaryPredicate.Operator.EQ) { + return false; + } + if (!(bp.getChild(0) instanceof SlotRef) || !(bp.getChild(1) instanceof LiteralExpr)) { + return false; + } + String colName = ((SlotRef) bp.getChild(0)).getColumnName(); + if (!partitionColumns.contains(colName)) { + return false; + } + } else if (expr instanceof InPredicate) { + InPredicate inPredicate = (InPredicate) expr; + if (inPredicate.isNotIn()) { + return false; + } + if (!(inPredicate.getChild(0) instanceof SlotRef)) { + return false; + } + String colName = ((SlotRef) inPredicate.getChild(0)).getColumnName(); + if (!partitionColumns.contains(colName)) { + return false; + } + for (int i = 1; i < inPredicate.getChildren().size(); i++) { + if (!(inPredicate.getChild(i) instanceof LiteralExpr)) { + return false; + } + } + } else { return false; } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java index 5713785008c100..b00adb1a2e63b9 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNodeTest.java @@ -168,14 +168,55 @@ public void testCheckOnlyPartEq_nonEqOperator() throws Exception { } @Test - public void testCheckOnlyPartEq_inPredicate() throws Exception { + public void testCheckOnlyPartEq_inPredicateOnPartitionColumn() throws Exception { SlotRef dtSlot = new SlotRef(null, "dt"); List inList = Lists.newArrayList(new StringLiteral("a"), new StringLiteral("b")); InPredicate inPred = new InPredicate(dtSlot, inList, false); setConjuncts(node, Lists.newArrayList(inPred)); + Assert.assertTrue(invokeCheckOnlyPartitionEqualityPredicate(node)); + } + + @Test + public void testCheckOnlyPartEq_notInPredicate() throws Exception { + SlotRef dtSlot = new SlotRef(null, "dt"); + List inList = Lists.newArrayList(new StringLiteral("a"), new StringLiteral("b")); + InPredicate notInPred = new InPredicate(dtSlot, inList, true); + setConjuncts(node, Lists.newArrayList(notInPred)); Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node)); } + @Test + public void testCheckOnlyPartEq_inPredicateOnNonPartitionColumn() throws Exception { + SlotRef statusSlot = new SlotRef(null, "status"); + List inList = Lists.newArrayList(new StringLiteral("a"), new StringLiteral("b")); + InPredicate inPred = new InPredicate(statusSlot, inList, false); + setConjuncts(node, Lists.newArrayList(inPred)); + Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node)); + } + + @Test + public void testCheckOnlyPartEq_inPredicateWithNonLiteralValue() throws Exception { + SlotRef dtSlot = new SlotRef(null, "dt"); + SlotRef hrSlot = new SlotRef(null, "hr"); + List inList = Lists.newArrayList(hrSlot); + InPredicate inPred = new InPredicate(dtSlot, inList, false); + setConjuncts(node, Lists.newArrayList(inPred)); + Assert.assertFalse(invokeCheckOnlyPartitionEqualityPredicate(node)); + } + + @Test + public void testCheckOnlyPartEq_mixedEqAndInOnPartitionColumns() throws Exception { + SlotRef dtSlot = new SlotRef(null, "dt"); + BinaryPredicate eq = new BinaryPredicate(BinaryPredicate.Operator.EQ, dtSlot, new StringLiteral("2026-01-01")); + + SlotRef hrSlot = new SlotRef(null, "hr"); + List inList = Lists.newArrayList(new StringLiteral("10"), new StringLiteral("11")); + InPredicate inPred = new InPredicate(hrSlot, inList, false); + + setConjuncts(node, Lists.newArrayList(eq, inPred)); + Assert.assertTrue(invokeCheckOnlyPartitionEqualityPredicate(node)); + } + @Test public void testCheckOnlyPartEq_leftSideNotSlotRef() throws Exception { StringLiteral left = new StringLiteral("x"); diff --git a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy index d6c78a335e61e7..e2e4972e36a35d 100644 --- a/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy +++ b/regression-test/suites/external_table_p2/maxcompute/write/test_mc_limit_split_optimization.groovy @@ -26,6 +26,7 @@ suite("test_mc_limit_split_optimization", "p2,external,maxcompute,external_remot String sk = context.config.otherConfigs.get("sk") String mc_catalog_name = "test_mc_limit_split_opt" String defaultProject = "mc_doris_test_write" + // String defaultProject = "doris_test_schema" sql """drop catalog if exists ${mc_catalog_name}""" sql """ @@ -149,10 +150,18 @@ suite("test_mc_limit_split_optimization", "p2,external,maxcompute,external_remot compareWithAndWithoutOpt("case7_range_pred_limit", "SELECT * FROM ${tb} WHERE pt > '3' ORDER BY id LIMIT 10") - // --- Case 8: partition IN predicate + LIMIT (IN is not EQ → no opt) --- + // --- Case 8: partition IN predicate + LIMIT (optimization should kick in) --- compareWithAndWithoutOpt("case8_in_pred_limit", "SELECT * FROM ${tb} WHERE pt IN ('1', '2', '3') ORDER BY id LIMIT 10") + // --- Case 8b: partition NOT IN predicate + LIMIT (no opt) --- + compareWithAndWithoutOpt("case8b_not_in_pred_limit", + "SELECT * FROM ${tb} WHERE pt NOT IN ('1', '2') ORDER BY id LIMIT 10") + + // --- Case 8c: partition IN + partition EQ mixed + LIMIT (opt eligible) --- + compareWithAndWithoutOpt("case8c_in_and_eq_limit", + "SELECT * FROM ${tb2} WHERE dt IN ('2026-01-01', '2026-01-02') AND region = 'r1' ORDER BY id LIMIT 5") + // --- Case 9: partition equality + no LIMIT (no opt) --- compareWithAndWithoutOpt("case9_part_eq_no_limit", "SELECT * FROM ${tb} WHERE pt = '1' ORDER BY id")