diff --git a/be/src/exec/operator/olap_scan_operator.cpp b/be/src/exec/operator/olap_scan_operator.cpp index 6e7946a082b249..ae4fca5c8c2363 100644 --- a/be/src/exec/operator/olap_scan_operator.cpp +++ b/be/src/exec/operator/olap_scan_operator.cpp @@ -975,16 +975,6 @@ Status OlapScanLocalState::open(RuntimeState* state) { RETURN_IF_ERROR(virtual_column_expr_ctx->open(state)); _slot_id_to_virtual_column_expr[slot_desc->id()] = virtual_column_expr_ctx; - _slot_id_to_col_type[slot_desc->id()] = slot_desc->get_data_type_ptr(); - int col_pos = p.intermediate_row_desc().get_column_id(slot_desc->id()); - if (col_pos < 0) { - return Status::InternalError( - "Invalid virtual slot, can not find its information. Slot desc:\n{}\nRow " - "desc:\n{}", - slot_desc->debug_string(), p.row_desc().debug_string()); - } else { - _slot_id_to_index_in_block[slot_desc->id()] = col_pos; - } } } diff --git a/be/src/exec/operator/olap_scan_operator.h b/be/src/exec/operator/olap_scan_operator.h index d22aac75052ff2..7e69b8a33dfb9f 100644 --- a/be/src/exec/operator/olap_scan_operator.h +++ b/be/src/exec/operator/olap_scan_operator.h @@ -324,9 +324,6 @@ class OlapScanLocalState final : public ScanLocalState { std::vector _read_sources; std::map _slot_id_to_virtual_column_expr; - std::map _slot_id_to_index_in_block; - // this map is needed for scanner opening. - std::map _slot_id_to_col_type; // ---- Runtime-filter partition pruning ---- // Attaches this per-instance pruner to the shared parse result owned by diff --git a/be/src/exec/scan/olap_scanner.cpp b/be/src/exec/scan/olap_scanner.cpp index 33f225864da7cf..1475869ea1261a 100644 --- a/be/src/exec/scan/olap_scanner.cpp +++ b/be/src/exec/scan/olap_scanner.cpp @@ -89,12 +89,11 @@ OlapScanner::OlapScanner(ScanLocalStateBase* parent, OlapScanner::Params&& param .rs_splits {}, .return_columns {}, .output_columns {}, + .filled_columns {}, .common_expr_ctxs_push_down {}, .topn_filter_source_node_ids {}, .key_group_cluster_key_idxes {}, .virtual_column_exprs {}, - .vir_cid_to_idx_in_block {}, - .vir_col_idx_to_type {}, .score_runtime {}, .collection_statistics {}, .ann_topn_runtime {}, @@ -173,8 +172,6 @@ Status OlapScanner::_prepare_impl() { _slot_id_to_virtual_column_expr[pair.first] = context; } - _slot_id_to_index_in_block = local_state->_slot_id_to_index_in_block; - _slot_id_to_col_type = local_state->_slot_id_to_col_type; _score_runtime = local_state->_score_runtime; // All scanners share the same ann_topn_runtime. _ann_topn_runtime = local_state->_ann_topn_runtime; @@ -330,6 +327,7 @@ Status OlapScanner::_init_tablet_reader_params( push_down_agg_type != TPushAggOp::COUNT_ON_INDEX); } + _tablet_reader_params.filled_columns.clear(); RETURN_IF_ERROR(_init_variant_columns()); RETURN_IF_ERROR(_init_return_columns()); @@ -337,8 +335,6 @@ Status OlapScanner::_init_tablet_reader_params( _tablet_reader_params.common_expr_ctxs_push_down = _common_expr_ctxs_push_down; _tablet_reader_params.virtual_column_exprs = _virtual_column_exprs; - _tablet_reader_params.vir_cid_to_idx_in_block = _vir_cid_to_idx_in_block; - _tablet_reader_params.vir_col_idx_to_type = _vir_col_idx_to_type; _tablet_reader_params.score_runtime = _score_runtime; _tablet_reader_params.output_columns = ((OlapScanLocalState*)_local_state)->_output_column_ids; _tablet_reader_params.ann_topn_runtime = _ann_topn_runtime; @@ -598,17 +594,21 @@ Status OlapScanner::_init_return_columns() { if (slot->get_virtual_column_expr()) { ColumnId virtual_column_cid = index; _virtual_column_exprs[virtual_column_cid] = _slot_id_to_virtual_column_expr[slot->id()]; - size_t idx_in_block = _slot_id_to_index_in_block[slot->id()]; - _vir_cid_to_idx_in_block[virtual_column_cid] = idx_in_block; - _vir_col_idx_to_type[idx_in_block] = _slot_id_to_col_type[slot->id()]; - - VLOG_DEBUG << fmt::format( - "Virtual column, slot id: {}, cid {}, column index: {}, type: {}", slot->id(), - virtual_column_cid, _vir_cid_to_idx_in_block[virtual_column_cid], - _vir_col_idx_to_type[idx_in_block]->get_name()); + + VLOG_DEBUG << fmt::format("Virtual column, slot id: {}, cid {}, type: {}", slot->id(), + virtual_column_cid, slot->get_data_type_ptr()->get_name()); } const auto& column = tablet_schema->column(index); + auto* olap_local_state = static_cast(_local_state); + const auto& olap_scan_node = olap_local_state->olap_scan_node(); + if (olap_scan_node.__isset.filled_key_column_slot_ids && + olap_scan_node.filled_key_column_slot_ids.contains(slot->id())) { + DORIS_CHECK(column.is_key()); + if (_tablet_reader_params.direct_mode) { + _tablet_reader_params.filled_columns.insert(index); + } + } int32_t unique_id = column.unique_id() >= 0 ? column.unique_id() : column.parent_unique_id(); if (!slot->all_access_paths().empty()) { diff --git a/be/src/exec/scan/olap_scanner.h b/be/src/exec/scan/olap_scanner.h index e7c9598b864573..a9bc9b933e705f 100644 --- a/be/src/exec/scan/olap_scanner.h +++ b/be/src/exec/scan/olap_scanner.h @@ -18,9 +18,9 @@ #pragma once #include -#include #include +#include #include #include #include @@ -113,17 +113,11 @@ class OlapScanner : public Scanner { std::unordered_set _tablet_columns_convert_to_null_set; - // This three fields are copied from OlapScanLocalState. + // This field is copied from OlapScanLocalState. std::map _slot_id_to_virtual_column_expr; - std::map _slot_id_to_index_in_block; - std::map _slot_id_to_col_type; // ColumnId of virtual column to its expr context std::map _virtual_column_exprs; - // ColumnId of virtual column to its index in block - std::map _vir_cid_to_idx_in_block; - // The idx of vir_col in block to its data type. - std::map _vir_col_idx_to_type; std::shared_ptr _score_runtime; std::shared_ptr _ann_topn_runtime; diff --git a/be/src/exec/scan/scanner_scheduler.cpp b/be/src/exec/scan/scanner_scheduler.cpp index 7e1deb2bae7f54..230a54048a2813 100644 --- a/be/src/exec/scan/scanner_scheduler.cpp +++ b/be/src/exec/scan/scanner_scheduler.cpp @@ -362,18 +362,17 @@ void ScannerScheduler::_make_sure_virtual_col_is_materialized( continue; } - std::vector vcid_to_idx; - - for (const auto& pair : olap_scanner->_vir_cid_to_idx_in_block) { - vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second)); + std::vector virtual_column_ids; + for (const auto& pair : olap_scanner->_virtual_column_exprs) { + virtual_column_ids.push_back(pair.first); } std::string error_msg = fmt::format( "Column in idx {} is nothing, block columns {}, normal_columns " "{}, " - "vir_cid_to_idx_in_block_msg {}", + "virtual_column_ids [{}]", idx, free_block->columns(), olap_scanner->_return_columns.size(), - fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ","))); + fmt::join(virtual_column_ids, ",")); throw doris::Exception(ErrorCode::INTERNAL_ERROR, error_msg); } #endif diff --git a/be/src/storage/iterator/vcollect_iterator.cpp b/be/src/storage/iterator/vcollect_iterator.cpp index 11c74a6cf7affc..e0e56375852ec6 100644 --- a/be/src/storage/iterator/vcollect_iterator.cpp +++ b/be/src/storage/iterator/vcollect_iterator.cpp @@ -284,22 +284,20 @@ Status VCollectIterator::_topn_next(Block* block) { } auto clone_block = block->clone_empty(); - // Initialize virtual slot columns by schema (avoid runtime type checks): - // use _reader_context.vir_col_idx_to_type to construct real columns for those positions. - if (!_reader->_reader_context.vir_col_idx_to_type.empty()) { - const auto& idx_to_type = _reader->_reader_context.vir_col_idx_to_type; - for (const auto& kv : idx_to_type) { - size_t idx = kv.first; - if (idx < clone_block.columns()) { - clone_block.get_by_position(idx).column = kv.second->create_column(); - } - } - } + // Initialize virtual slot columns by schema (avoid runtime type checks). + for (const auto& [cid, expr_ctx] : _reader->_reader_context.virtual_column_exprs) { + auto it = std::find(_reader->_return_columns.begin(), _reader->_return_columns.end(), cid); + DORIS_CHECK(it != _reader->_return_columns.end()); + auto idx = cast_set(std::distance(_reader->_return_columns.begin(), it)); + DORIS_CHECK(idx < clone_block.columns()); + clone_block.get_by_position(idx).column = expr_ctx->root()->data_type()->create_column(); + } + const size_t clone_block_columns = clone_block.columns(); MutableBlock mutable_block = MutableBlock::build_mutable_block(std::move(clone_block)); const std::vector* sort_columns = _reader->_reader_context.read_orderby_key_columns; for (auto column_idx : *sort_columns) { - DORIS_CHECK(column_idx < clone_block.columns()); + DORIS_CHECK(column_idx < clone_block_columns); } size_t first_sort_column_idx = (*sort_columns)[0]; diff --git a/be/src/storage/iterators.h b/be/src/storage/iterators.h index 7c0b78959d79bc..42ec5ccc4ba6a3 100644 --- a/be/src/storage/iterators.h +++ b/be/src/storage/iterators.h @@ -19,6 +19,7 @@ #include #include +#include #include "common/status.h" #include "core/block/block.h" @@ -129,6 +130,7 @@ class StorageReadOptions { io::IOContext io_ctx; VExprContextSPtrs common_expr_ctxs_push_down; const std::set* output_columns = nullptr; + std::set filled_columns; // runtime state RuntimeState* runtime_state = nullptr; RowsetId rowset_id; @@ -147,8 +149,6 @@ class StorageReadOptions { std::map virtual_column_exprs; std::shared_ptr ann_topn_runtime; - std::map vir_cid_to_idx_in_block; - std::map vir_col_idx_to_type; std::map all_access_paths; std::map predicate_access_paths; diff --git a/be/src/storage/rowset/beta_rowset_reader.cpp b/be/src/storage/rowset/beta_rowset_reader.cpp index 717a555264a0d1..f5b0019c449938 100644 --- a/be/src/storage/rowset/beta_rowset_reader.cpp +++ b/be/src/storage/rowset/beta_rowset_reader.cpp @@ -100,14 +100,13 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context _read_options.stats = _stats; _read_options.push_down_agg_type_opt = _read_context->push_down_agg_type_opt; _read_options.common_expr_ctxs_push_down = _read_context->common_expr_ctxs_push_down; + DORIS_CHECK(_read_context->return_columns != nullptr); _read_options.virtual_column_exprs = _read_context->virtual_column_exprs; _read_options.all_access_paths = _read_context->all_access_paths; _read_options.predicate_access_paths = _read_context->predicate_access_paths; _read_options.ann_topn_runtime = _read_context->ann_topn_runtime; - _read_options.vir_cid_to_idx_in_block = _read_context->vir_cid_to_idx_in_block; - _read_options.vir_col_idx_to_type = _read_context->vir_col_idx_to_type; _read_options.score_runtime = _read_context->score_runtime; _read_options.collection_statistics = _read_context->collection_statistics; _read_options.rowset_id = _rowset->rowset_id(); @@ -151,6 +150,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context // create segment iterators VLOG_NOTICE << "read columns size: " << read_columns.size(); _input_schema = std::make_shared(_read_context->tablet_schema->columns(), read_columns); + _read_options.filled_columns = _read_context->filled_columns; // output_schema only contains return_columns (excludes extra columns like delete-predicate columns). // It is used by merge/union iterators to determine how many columns to copy to the output block. _output_schema = std::make_shared(_read_context->tablet_schema->columns(), diff --git a/be/src/storage/rowset/rowset_reader_context.h b/be/src/storage/rowset/rowset_reader_context.h index d82488424a64c8..5b53dba5d5a18c 100644 --- a/be/src/storage/rowset/rowset_reader_context.h +++ b/be/src/storage/rowset/rowset_reader_context.h @@ -18,6 +18,7 @@ #ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H #define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H +#include #include #include "exprs/score_runtime.h" @@ -76,9 +77,6 @@ struct RowsetReaderContext { // Effective adaptive batch size byte budget. 0 means disabled internally. size_t preferred_block_size_bytes = 8388608UL; - // Points to the "true" output column list before non-direct-mode expansion. - // Used by BlockReader to map expanded storage columns back to the requested output layout. - const std::vector* origin_return_columns = nullptr; bool is_unique = false; //record row num merged in generic iterator uint64_t* merged_rows = nullptr; @@ -89,14 +87,13 @@ struct RowsetReaderContext { RowIdConversion* rowid_conversion = nullptr; bool is_key_column_group = false; const std::set* output_columns = nullptr; + std::set filled_columns; RowsetId rowset_id; // slots that cast may be eliminated in storage layer std::map target_cast_type_for_variants; int64_t ttl_seconds = 0; std::map virtual_column_exprs; - std::map vir_cid_to_idx_in_block; - std::map vir_col_idx_to_type; std::map all_access_paths; std::map predicate_access_paths; diff --git a/be/src/storage/segment/segment_iterator.cpp b/be/src/storage/segment/segment_iterator.cpp index 1bf4dc261db777..1481f2d50a740d 100644 --- a/be/src/storage/segment/segment_iterator.cpp +++ b/be/src/storage/segment/segment_iterator.cpp @@ -17,7 +17,6 @@ #include "storage/segment/segment_iterator.h" -#include #include #include #include @@ -25,6 +24,7 @@ #include #include +#include #include #include #include @@ -69,7 +69,6 @@ #include "io/cache/cached_remote_file_reader.h" #include "io/fs/file_reader.h" #include "io/io_common.h" -#include "runtime/descriptors.h" #include "runtime/query_context.h" #include "runtime/runtime_predicate.h" #include "runtime/runtime_state.h" @@ -109,127 +108,11 @@ #include "storage/utils.h" #include "util/concurrency_stats.h" #include "util/defer_op.h" -#include "util/json/path_in_data.h" #include "util/simd/bits.h" namespace doris { using namespace ErrorCode; namespace segment_v2 { -namespace { - -Status tablet_column_id_by_slot(const TabletSchemaSPtr& tablet_schema, const SlotDescriptor* slot, - ColumnId* cid) { - int32_t field_index = -1; - if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) { - field_index = tablet_schema->field_index( - PathInData(tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(), - slot->column_paths())); - } else { - field_index = slot->col_unique_id() >= 0 ? tablet_schema->field_index(slot->col_unique_id()) - : tablet_schema->field_index(slot->col_name()); - } - if (field_index < 0) { - return Status::InternalError( - "field name is invalid. field={}, field_name_to_index={}, col_unique_id={}", - slot->col_name(), tablet_schema->get_all_field_names(), slot->col_unique_id()); - } - *cid = field_index; - return Status::OK(); -} - -Status rebind_storage_expr_to_reader_schema( - const StorageReadOptions& opts, const VExprSPtr& expr, - const std::unordered_map& cid_to_pos) { - DORIS_CHECK(expr != nullptr); - - if (expr->is_slot_ref()) { - auto slot_ref = std::static_pointer_cast(expr); - auto* slot = opts.runtime_state->desc_tbl().get_slot_descriptor(slot_ref->slot_id()); - if (slot == nullptr) { - return Status::InternalError("slot {} is not found in descriptor table", - slot_ref->slot_id()); - } - - ColumnId cid = 0; - RETURN_IF_ERROR(tablet_column_id_by_slot(opts.tablet_schema, slot, &cid)); - auto pos_it = cid_to_pos.find(cid); - if (pos_it == cid_to_pos.end()) { - return Status::InternalError("slot {} column {} with cid {} is not in reader schema", - slot_ref->slot_id(), slot->col_name(), cid); - } - slot_ref->set_column_id(cast_set(pos_it->second)); - } else if (expr->is_virtual_slot_ref()) { - auto virtual_slot_ref = std::static_pointer_cast(expr); - auto* slot = - opts.runtime_state->desc_tbl().get_slot_descriptor(virtual_slot_ref->slot_id()); - if (slot == nullptr) { - return Status::InternalError("slot {} is not found in descriptor table", - virtual_slot_ref->slot_id()); - } - - ColumnId cid = 0; - RETURN_IF_ERROR(tablet_column_id_by_slot(opts.tablet_schema, slot, &cid)); - auto pos_it = cid_to_pos.find(cid); - if (pos_it == cid_to_pos.end()) { - return Status::InternalError( - "virtual slot {} column {} with cid {} is not in reader schema", - virtual_slot_ref->slot_id(), slot->col_name(), cid); - } - virtual_slot_ref->set_column_id(cast_set(pos_it->second)); - // A virtual slot has its own output position in the reader block, and its - // materialization expression may also contain real slot refs. Rebind both - // sides so evaluating the virtual expression reads from the same block - // layout used by SegmentIterator. - RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema( - opts, virtual_slot_ref->get_virtual_column_expr(), cid_to_pos)); - } - - for (const auto& child : expr->children()) { - RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts, child, cid_to_pos)); - } - return Status::OK(); -} - -Status rebind_storage_exprs_to_reader_schema(const StorageReadOptions& opts, const Schema& schema, - const VExprContextSPtrs& common_exprs, - std::map& virtual_exprs) { - if (common_exprs.empty() && virtual_exprs.empty()) { - return Status::OK(); - } - DORIS_CHECK(opts.runtime_state != nullptr); - DORIS_CHECK(opts.tablet_schema != nullptr); - - const auto keys_type = opts.tablet_schema->keys_type(); - if (keys_type == KeysType::DUP_KEYS || - (keys_type == KeysType::UNIQUE_KEYS && opts.enable_unique_key_merge_on_write)) { - return Status::OK(); - } - - // Storage exprs are prepared with RowDescriptor, so VSlotRef/VirtualSlotRef column_id points to - // the scan tuple column ordinal. SegmentIterator evaluates cloned exprs on a block built from - // the reader schema instead. AGG_KEYS and non-MOW UNIQUE_KEYS readers may expand the reader - // schema, for example by filling all key columns before merging/aggregating rows, so the scan - // tuple ordinal is not always the same as the runtime block ordinal. - // - // DUP_KEYS and UNIQUE_KEYS MOW use direct readers for query scans, so their reader block keeps - // the scan tuple layout and can skip this per-segment expression-tree traversal. For merge/agg - // readers, the reader schema is the source of truth: map tablet column id to reader-block - // position and rebind every storage expr slot to that position. - std::unordered_map cid_to_pos; - for (size_t pos = 0; pos < schema.num_column_ids(); ++pos) { - cid_to_pos.emplace(schema.column_id(cast_set(pos)), pos); - } - - for (const auto& ctx : common_exprs) { - RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts, ctx->root(), cid_to_pos)); - } - for (const auto& [_, ctx] : virtual_exprs) { - RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts, ctx->root(), cid_to_pos)); - } - return Status::OK(); -} - -} // namespace SegmentIterator::~SegmentIterator() = default; @@ -526,9 +409,9 @@ Status SegmentIterator::_init_impl(const StorageReadOptions& opts) { } _virtual_column_exprs = _opts.virtual_column_exprs; - _vir_cid_to_idx_in_block = _opts.vir_cid_to_idx_in_block; _score_runtime = _opts.score_runtime; _ann_topn_runtime = _opts.ann_topn_runtime; + _init_schema_block_id_map(); if (opts.output_columns != nullptr) { _output_columns = *(opts.output_columns); @@ -578,14 +461,20 @@ Status SegmentIterator::_init_impl(const StorageReadOptions& opts) { RETURN_IF_ERROR(_construct_compound_expr_context()); VLOG_DEBUG << fmt::format( - "Segment iterator init, virtual_column_exprs size: {}, " - "_vir_cid_to_idx_in_block size: {}, common_expr_pushdown size: {}", - _opts.virtual_column_exprs.size(), _opts.vir_cid_to_idx_in_block.size(), - _common_expr_ctxs_push_down.size()); + "Segment iterator init, virtual_column_exprs size: {}, common_expr_pushdown size: {}", + _opts.virtual_column_exprs.size(), _common_expr_ctxs_push_down.size()); _initialize_predicate_results(); return Status::OK(); } +void SegmentIterator::_init_schema_block_id_map() { + _schema_block_id_map.assign(_schema->columns().size(), -1); + for (int i = 0; i < _schema->num_column_ids(); i++) { + auto cid = _schema->column_id(i); + _schema_block_id_map[cid] = i; + } +} + void SegmentIterator::_initialize_predicate_results() { // Initialize from _col_predicates for (auto pred : _col_predicates) { @@ -997,6 +886,7 @@ Status SegmentIterator::_apply_ann_topn_predicate() { VLOG_DEBUG << fmt::format("Try apply ann topn: {}", _ann_topn_runtime->debug_string()); size_t src_col_idx = _ann_topn_runtime->get_src_column_idx(); + // AnnTopNRuntime keeps VSlotRef::column_id(), which is the scan schema ordinal. ColumnId src_cid = _schema->column_id(src_col_idx); IndexIterator* ann_index_iterator = _index_iterators[src_cid].get(); bool has_ann_index = _column_has_ann_index(src_cid); @@ -1263,8 +1153,9 @@ Status SegmentIterator::_extract_common_expr_columns(const VExprSPtr& expr) { auto node_type = expr->node_type(); if (node_type == TExprNodeType::SLOT_REF) { auto slot_expr = std::dynamic_pointer_cast(expr); - _is_common_expr_column[_schema->column_id(slot_expr->column_id())] = true; - _common_expr_columns.insert(_schema->column_id(slot_expr->column_id())); + auto cid = _schema->column_id(slot_expr->column_id()); + _is_common_expr_column[cid] = true; + _common_expr_columns.insert(cid); } else if (node_type == TExprNodeType::VIRTUAL_SLOT_REF) { std::shared_ptr virtual_slot_ref = std::dynamic_pointer_cast(expr); @@ -1489,7 +1380,7 @@ bool SegmentIterator::_need_read_data(ColumnId cid) { return true; } // this is a virtual column, we always need to read data - if (this->_vir_cid_to_idx_in_block.contains(cid)) { + if (_virtual_column_exprs.contains(cid)) { return true; } @@ -1654,8 +1545,8 @@ Status SegmentIterator::_init_return_column_iterators() { } #ifndef NDEBUG - for (auto pair : _vir_cid_to_idx_in_block) { - ColumnId vir_col_cid = pair.first; + for (const auto& entry : _virtual_column_exprs) { + ColumnId vir_col_cid = entry.first; DCHECK(_column_iterators[vir_col_cid] != nullptr) << "Virtual column iterator for " << vir_col_cid << " should not be null"; ColumnIterator* column_iter = _column_iterators[vir_col_cid].get(); @@ -2053,19 +1944,6 @@ Status SegmentIterator::_vec_init_lazy_materialization() { _is_need_short_eval = true; } - // ColumnId to column index in block - // ColumnId will contail all columns in tablet schema, including virtual columns and global rowid column, - _schema_block_id_map.resize(_schema->columns().size(), -1); - // Use cols read by query to initialize _schema_block_id_map. - // We need to know the index of each column in the block. - // There is an assumption here that the columns in the block are in the same order as in the read schema. - // TODO: A probelm is that, delete condition columns will exist in _schema->column_ids but not in block if - // delete column is not read by the query. - for (int i = 0; i < _schema->num_column_ids(); i++) { - auto cid = _schema->column_id(i); - _schema_block_id_map[cid] = i; - } - // Step2: extract columns that can execute expr context _is_common_expr_column.resize(_schema->columns().size(), false); if (!_common_expr_ctxs_push_down.empty()) { @@ -2084,8 +1962,8 @@ Status SegmentIterator::_vec_init_lazy_materialization() { } } - for (auto pair : _vir_cid_to_idx_in_block) { - _columns_to_filter.push_back(cast_set(pair.second)); + for (const auto& entry : _virtual_column_exprs) { + _columns_to_filter.push_back(_schema_block_id_map[entry.first]); } } } @@ -2196,15 +2074,8 @@ bool SegmentIterator::_can_evaluated_by_vectorized(std::shared_ptris_nullable()) { auto nullable_col_ptr = reinterpret_cast(column.get()); nullable_col_ptr->get_null_map_column().insert_many_defaults(num_of_defaults); nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(num_of_defaults); @@ -2212,6 +2083,23 @@ bool SegmentIterator::_prune_column(ColumnId cid, MutableColumnPtr& column, bool // assert(column->is_const()); column->insert_many_defaults(num_of_defaults); } +} + +bool SegmentIterator::_prune_column(ColumnId cid, MutableColumnPtr& column, + size_t num_of_defaults) { + if (_opts.filled_columns.contains(cid) && !_is_pred_column.empty()) { + DCHECK_EQ(_is_pred_column.size(), _is_common_expr_column.size()); + DCHECK_LT(cid, _is_pred_column.size()); + if (!_virtual_column_exprs.contains(cid) && !_has_delete_predicate(cid) && + !_is_pred_column[cid] && !_is_common_expr_column[cid]) { + _fill_default_column(column, num_of_defaults); + return true; + } + } + if (_need_read_data(cid)) { + return false; + } + _fill_default_column(column, num_of_defaults); return true; } @@ -2220,7 +2108,7 @@ Status SegmentIterator::_read_columns(const std::vector& column_ids, for (auto cid : column_ids) { auto& column = column_block[cid]; size_t rows_read = nrows; - if (_prune_column(cid, column, true, rows_read)) { + if (_prune_column(cid, column, rows_read)) { continue; } RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column)); @@ -2274,7 +2162,7 @@ Status SegmentIterator::_init_current_block(Block* block, } } - for (auto entry : _virtual_column_exprs) { + for (const auto& entry : _virtual_column_exprs) { auto cid = entry.first; current_columns[cid] = ColumnNothing::create(0); current_columns[cid]->reserve(nrows_read_limit); @@ -2308,7 +2196,7 @@ Status SegmentIterator::_output_non_pred_columns(Block* block) { if (loc < block->columns()) { bool column_in_block_is_nothing = check_and_get_column( block->get_by_position(loc).column.get()); - bool column_is_normal = !_vir_cid_to_idx_in_block.contains(cid); + bool column_is_normal = !_virtual_column_exprs.contains(cid); bool return_column_is_nothing = check_and_get_column(_current_return_columns[cid].get()); VLOG_DEBUG << fmt::format( @@ -2377,7 +2265,7 @@ Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint16 VLOG_DEBUG << fmt::format("Column {} no need to read.", cid); continue; } - if (_prune_column(cid, column, true, nrows_read)) { + if (_prune_column(cid, column, nrows_read)) { VLOG_DEBUG << fmt::format("Column {} is pruned. No need to read data.", cid); continue; } @@ -2761,7 +2649,7 @@ Status SegmentIterator::_read_columns_by_rowids(std::vector& read_colu if (_no_need_read_key_data(cid, colunm, select_size)) { continue; } - if (_prune_column(cid, colunm, true, select_size)) { + if (_prune_column(cid, colunm, select_size)) { continue; } @@ -2822,10 +2710,9 @@ Status SegmentIterator::next_batch(Block* block) { if (res.is()) { // Since we have a type check at the caller. // So a replacement of nothing column with real column is needed. - const auto& idx_to_datatype = _opts.vir_col_idx_to_type; - for (const auto& pair : _vir_cid_to_idx_in_block) { - size_t idx = pair.second; - auto type = idx_to_datatype.find(idx)->second; + for (const auto& [cid, expr_ctx] : _virtual_column_exprs) { + auto idx = _schema_block_id_map[cid]; + auto type = expr_ctx->root()->data_type(); block->replace_by_position(idx, type->create_column()); } @@ -3063,12 +2950,12 @@ Status SegmentIterator::_next_batch_internal(Block* block) { if (!_virtual_column_exprs.empty()) { bool use_sel = _is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval; uint16_t* sel_rowid_idx = use_sel ? _sel_rowid_idx.data() : nullptr; - std::vector vir_ctxs; + VExprContextSPtrs vir_ctxs; vir_ctxs.reserve(_virtual_column_exprs.size()); for (auto& [cid, ctx] : _virtual_column_exprs) { - vir_ctxs.push_back(ctx.get()); + vir_ctxs.push_back(ctx); } - _output_index_result_column(vir_ctxs, sel_rowid_idx, _selected_size, block); + _output_index_result_column(vir_ctxs, sel_rowid_idx, _selected_size); } RETURN_IF_ERROR(_materialization_of_virtual_column(block)); if (_opts.read_limit > 0) { @@ -3091,12 +2978,10 @@ void SegmentIterator::_fill_column_nothing() { // Because: // 1. Before each batch, _init_return_columns is called to initialize _current_return_columns, and virtual columns in _current_return_columns are initialized as ColumnNothing. // 2. When select_size == 0, the read method of VirtualColumnIterator will definitely not be called, so the corresponding Column remains a ColumnNothing - for (const auto pair : _vir_cid_to_idx_in_block) { - auto cid = pair.first; - auto pos = pair.second; + for (const auto& [cid, expr_ctx] : _virtual_column_exprs) { [[maybe_unused]] const auto* nothing_col = assert_cast(_current_return_columns[cid].get()); - _current_return_columns[cid] = _opts.vir_col_idx_to_type[pos]->create_column(); + _current_return_columns[cid] = expr_ctx->root()->data_type()->create_column(); } } @@ -3112,17 +2997,15 @@ Status SegmentIterator::_check_output_block(Block* block) { idx, block->columns(), _schema->num_column_ids(), _virtual_column_exprs.size()); } else if (check_and_get_column(entry.column.get())) { if (rows > 0) { - std::vector vcid_to_idx; - for (const auto& pair : _vir_cid_to_idx_in_block) { - vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second)); + std::vector virtual_column_ids; + for (const auto& pair : _virtual_column_exprs) { + virtual_column_ids.push_back(pair.first); } - std::string vir_cid_to_idx_in_block_msg = - fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ",")); return Status::InternalError( "Column in idx {} is nothing, block columns {}, normal_columns {}, " - "vir_cid_to_idx_in_block_msg {}", + "virtual_column_ids [{}]", idx, block->columns(), _schema->num_column_ids(), - vir_cid_to_idx_in_block_msg); + fmt::join(virtual_column_ids, ",")); } } else if (entry.column->size() != rows) { return Status::InternalError( @@ -3159,32 +3042,10 @@ Status SegmentIterator::_process_eof(Block* block) { Status SegmentIterator::_process_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size, Block* block) { - // Here we just use col0 as row_number indicator. when reach here, we will calculate the predicates first. - // then use the result to reduce our data read(that is, expr push down). there's now row in block means the first - // column is not in common expr. so it's safe to replace it temporarily to provide correct `selected_size`. VLOG_DEBUG << fmt::format("Execute common expr. block rows {}, selected size {}", block->rows(), _selected_size); - bool need_mock_col = block->rows() != selected_size; - MutableColumnPtr col0; - if (need_mock_col) { - col0 = std::move(*block->get_by_position(0).column).mutate(); - block->replace_by_position( - 0, block->get_by_position(0).type->create_column_const_with_default_value( - _selected_size)); - } - - std::vector common_ctxs; - common_ctxs.reserve(_common_expr_ctxs_push_down.size()); - for (auto& ctx : _common_expr_ctxs_push_down) { - common_ctxs.push_back(ctx.get()); - } - _output_index_result_column(common_ctxs, _sel_rowid_idx.data(), _selected_size, block); - RETURN_IF_ERROR(_execute_common_expr(_sel_rowid_idx.data(), _selected_size, block)); - - if (need_mock_col) { - block->replace_by_position(0, std::move(col0)); - } + RETURN_IF_ERROR(_execute_common_expr(sel_rowid_idx, selected_size, block)); VLOG_DEBUG << fmt::format("Execute common expr end. block rows {}, selected size {}", block->rows(), _selected_size); @@ -3195,14 +3056,24 @@ Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& Block* block) { SCOPED_RAW_TIMER(&_opts.stats->expr_filter_ns); DCHECK(!_common_expr_ctxs_push_down.empty()); - DCHECK(block->rows() != 0); - int prev_columns = block->columns(); + _output_index_result_column(_common_expr_ctxs_push_down, sel_rowid_idx, selected_size); + uint16_t original_size = selected_size; _opts.stats->expr_cond_input_rows += original_size; - IColumn::Filter filter; - RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( - _common_expr_ctxs_push_down, block, _columns_to_filter, prev_columns, filter)); + // Some output columns may stay empty until after common expr filtering. Use the + // selected row count instead of Block::rows(), which is derived from the first column. + IColumn::Filter filter(selected_size, 1); + bool can_filter_all = false; + auto* __restrict filter_data = filter.data(); + for (const auto& expr_ctx : _common_expr_ctxs_push_down) { + RETURN_IF_ERROR(expr_ctx->execute_filter(block, filter_data, selected_size, false, + &can_filter_all)); + if (can_filter_all) { + break; + } + } + RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, _columns_to_filter, filter)); selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter); _opts.stats->rows_expr_cond_filtered += original_size - selected_size; @@ -3251,15 +3122,14 @@ uint16_t SegmentIterator::_evaluate_common_expr_filter(uint16_t* sel_rowid_idx, } } -void SegmentIterator::_output_index_result_column(const std::vector& expr_ctxs, - uint16_t* sel_rowid_idx, uint16_t select_size, - Block* block) { +void SegmentIterator::_output_index_result_column(const VExprContextSPtrs& expr_ctxs, + uint16_t* sel_rowid_idx, uint16_t select_size) { SCOPED_RAW_TIMER(&_opts.stats->output_index_result_column_timer); - if (block->rows() == 0) { + if (select_size == 0) { return; } - for (auto* expr_ctx_ptr : expr_ctxs) { - auto index_ctx = expr_ctx_ptr->get_index_context(); + for (const auto& expr_ctx : expr_ctxs) { + auto index_ctx = expr_ctx->get_index_context(); if (index_ctx == nullptr) { continue; } @@ -3269,7 +3139,7 @@ void SegmentIterator::_output_index_result_column(const std::vectorget_data(); - vec_match_pred.resize(block->rows()); + vec_match_pred.resize(select_size); std::fill(vec_match_pred.begin(), vec_match_pred.end(), 0); const auto& null_bitmap = result_bitmap.get_null_bitmap(); @@ -3281,7 +3151,7 @@ void SegmentIterator::_output_index_result_column(const std::vectorget_data(); - null_map_vec.resize(block->rows()); + null_map_vec.resize(select_size); std::fill(null_map_vec.begin(), null_map_vec.end(), 0); null_map_data = &null_map_column->get_data(); } @@ -3298,7 +3168,7 @@ void SegmentIterator::_output_index_result_column(const std::vectorrows() == vec_match_pred.size()); + DCHECK(select_size == vec_match_pred.size()); if (null_map_column) { index_ctx->set_index_result_column_for_expr( @@ -3386,8 +3256,6 @@ Status SegmentIterator::_construct_compound_expr_context() { context->set_index_context(inverted_index_context); expr_ctx = context; } - RETURN_IF_ERROR(rebind_storage_exprs_to_reader_schema( - _opts, *_schema, _common_expr_ctxs_push_down, _virtual_column_exprs)); return Status::OK(); } @@ -3425,8 +3293,8 @@ void SegmentIterator::_calculate_common_expr_index_exec_status() { for (const auto& vir_child : vir_node->children()) { if (vir_child->is_slot_ref()) { auto* inner_slot_ref = assert_cast(vir_child.get()); - _common_expr_index_exec_status[_schema->column_id( - inner_slot_ref->column_id())][expr.get()] = false; + auto cid = _schema->column_id(inner_slot_ref->column_id()); + _common_expr_index_exec_status[cid][expr.get()] = false; _common_expr_to_slotref_map[root_expr_ctx.get()] [inner_slot_ref->column_id()] = expr.get(); @@ -3443,8 +3311,8 @@ void SegmentIterator::_calculate_common_expr_index_exec_status() { auto expr_without_cast = VExpr::expr_without_cast(child); if (expr_without_cast->is_slot_ref() && expr->op() != TExprOpcode::CAST) { auto* column_slot_ref = assert_cast(expr_without_cast.get()); - _common_expr_index_exec_status[_schema->column_id(column_slot_ref->column_id())] - [expr.get()] = false; + auto cid = _schema->column_id(column_slot_ref->column_id()); + _common_expr_index_exec_status[cid][expr.get()] = false; _common_expr_to_slotref_map[root_expr_ctx.get()][column_slot_ref->column_id()] = expr.get(); } @@ -3546,54 +3414,43 @@ bool SegmentIterator::_can_opt_limit_reads() { // Before get next batch. make sure all virtual columns in block has type ColumnNothing. void SegmentIterator::_init_virtual_columns(Block* block) { - for (const auto& pair : _vir_cid_to_idx_in_block) { - auto& col_with_type_and_name = block->get_by_position(pair.second); + for (const auto& [cid, expr_ctx] : _virtual_column_exprs) { + auto idx = _schema_block_id_map[cid]; + auto& col_with_type_and_name = block->get_by_position(idx); col_with_type_and_name.column = ColumnNothing::create(0); - col_with_type_and_name.type = _opts.vir_col_idx_to_type[pair.second]; + col_with_type_and_name.type = expr_ctx->root()->data_type(); } } Status SegmentIterator::_materialization_of_virtual_column(Block* block) { // Some expr can not process empty block, such as function `element_at`. // So materialize virtual column in advance to avoid errors. - if (block->rows() == 0) { - for (const auto& pair : _vir_cid_to_idx_in_block) { - auto& col_with_type_and_name = block->get_by_position(pair.second); - col_with_type_and_name.column = _opts.vir_col_idx_to_type[pair.second]->create_column(); - col_with_type_and_name.type = _opts.vir_col_idx_to_type[pair.second]; + if (_selected_size == 0) { + for (const auto& [cid, expr_ctx] : _virtual_column_exprs) { + auto idx = _schema_block_id_map[cid]; + auto& col_with_type_and_name = block->get_by_position(idx); + col_with_type_and_name.column = expr_ctx->root()->data_type()->create_column(); + col_with_type_and_name.type = expr_ctx->root()->data_type(); } return Status::OK(); } + if (_virtual_column_exprs.empty()) { + return Status::OK(); + } + DCHECK_EQ(block->rows(), _selected_size); for (const auto& cid_and_expr : _virtual_column_exprs) { auto cid = cid_and_expr.first; auto column_expr = cid_and_expr.second; - size_t idx_in_block = _vir_cid_to_idx_in_block[cid]; - if (block->columns() <= idx_in_block) { - return Status::InternalError( - "Virtual column index {} is out of range, block columns {}, " - "virtual columns size {}, virtual column expr {}", - idx_in_block, block->columns(), _vir_cid_to_idx_in_block.size(), - column_expr->root()->debug_string()); - } else if (block->get_by_position(idx_in_block).column.get() == nullptr) { - return Status::InternalError( - "Virtual column index {} is null, block columns {}, virtual columns size {}, " - "virtual column expr {}", - idx_in_block, block->columns(), _vir_cid_to_idx_in_block.size(), - column_expr->root()->debug_string()); - } - if (check_and_get_column( - block->get_by_position(idx_in_block).column.get())) { + auto materialized_pos = _schema_block_id_map[cid]; + auto& column = block->get_by_position(materialized_pos).column; + if (check_and_get_column(column.get())) { VLOG_DEBUG << fmt::format("Virtual column is doing materialization, cid {}, col idx {}", - cid, idx_in_block); + cid, materialized_pos); ColumnPtr result_column; RETURN_IF_ERROR(column_expr->execute(block, result_column)); - block->replace_by_position(idx_in_block, std::move(result_column)); - if (block->get_by_position(idx_in_block).column->size() == 0) { - LOG_WARNING("Result of expr column {} is empty. cid {}, idx_in_block {}", - column_expr->root()->debug_string(), cid, idx_in_block); - } + block->replace_by_position(materialized_pos, std::move(result_column)); } } return Status::OK(); diff --git a/be/src/storage/segment/segment_iterator.h b/be/src/storage/segment/segment_iterator.h index 8fd143867ed97b..74f628c4f7a29e 100644 --- a/be/src/storage/segment/segment_iterator.h +++ b/be/src/storage/segment/segment_iterator.h @@ -18,9 +18,9 @@ #pragma once #include -#include -#include +#include +#include #include #include #include @@ -273,8 +273,8 @@ class SegmentIterator : public RowwiseIterator { bool _can_evaluated_by_vectorized(std::shared_ptr predicate); + void _init_schema_block_id_map(); [[nodiscard]] Status _extract_common_expr_columns(const VExprSPtr& expr); - // same with _extract_common_expr_columns, but only extract columns that can be used for index [[nodiscard]] Status _execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size, Block* block); Status _process_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size, Block* block); @@ -290,12 +290,12 @@ class SegmentIterator : public RowwiseIterator { bool _check_apply_by_inverted_index(std::shared_ptr pred); - void _output_index_result_column(const std::vector& expr_ctxs, - uint16_t* sel_rowid_idx, uint16_t select_size, Block* block); + void _output_index_result_column(const VExprContextSPtrs& expr_ctxs, uint16_t* sel_rowid_idx, + uint16_t select_size); bool _need_read_data(ColumnId cid); - bool _prune_column(ColumnId cid, MutableColumnPtr& column, bool fill_defaults, - size_t num_of_defaults); + void _fill_default_column(MutableColumnPtr& column, size_t num_of_defaults); + bool _prune_column(ColumnId cid, MutableColumnPtr& column, size_t num_of_defaults); Status _construct_compound_expr_context(); @@ -304,7 +304,7 @@ class SegmentIterator : public RowwiseIterator { for (auto cid : col_ids) { auto ord = key.field(cid) <=> (*_seek_block[cid])[0]; if (ord != std::strong_ordering::equal) { - return ord < 0 ? -1 : 1; + return ord == std::strong_ordering::less ? -1 : 1; } } return 0; @@ -471,7 +471,6 @@ class SegmentIterator : public RowwiseIterator { // cid to virtual column expr std::map _virtual_column_exprs; - std::map _vir_cid_to_idx_in_block; IndexQueryContextPtr _index_query_context; diff --git a/be/src/storage/tablet/tablet_reader.cpp b/be/src/storage/tablet/tablet_reader.cpp index 5a7b9a58bb9420..e1f329b7902660 100644 --- a/be/src/storage/tablet/tablet_reader.cpp +++ b/be/src/storage/tablet/tablet_reader.cpp @@ -184,14 +184,13 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { _reader_context.is_key_column_group = read_params.is_key_column_group; _reader_context.common_expr_ctxs_push_down = read_params.common_expr_ctxs_push_down; _reader_context.output_columns = &read_params.output_columns; + _reader_context.filled_columns = read_params.filled_columns; _reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt; _reader_context.ttl_seconds = _tablet->ttl_seconds(); _reader_context.score_runtime = read_params.score_runtime; _reader_context.collection_statistics = read_params.collection_statistics; _reader_context.virtual_column_exprs = read_params.virtual_column_exprs; - _reader_context.vir_cid_to_idx_in_block = read_params.vir_cid_to_idx_in_block; - _reader_context.vir_col_idx_to_type = read_params.vir_col_idx_to_type; _reader_context.ann_topn_runtime = read_params.ann_topn_runtime; _reader_context.condition_cache_digest = read_params.condition_cache_digest; @@ -201,10 +200,6 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { // Propagate general read limit for DUP_KEYS and UNIQUE_KEYS with MOW _reader_context.general_read_limit = read_params.general_read_limit; - // Preserve the original requested output layout so BlockReader can map expanded storage - // columns (for non-direct AGG/UNIQUE paths) back to the final output block. - _reader_context.origin_return_columns = read_params.origin_return_columns; - return Status::OK(); } diff --git a/be/src/storage/tablet/tablet_reader.h b/be/src/storage/tablet/tablet_reader.h index 911bdf4fe50198..ba398197b5474f 100644 --- a/be/src/storage/tablet/tablet_reader.h +++ b/be/src/storage/tablet/tablet_reader.h @@ -158,6 +158,7 @@ class TabletReader { std::vector return_columns; // output_columns only contain columns in OrderByExprs and outputExprs std::set output_columns; + std::set filled_columns; RuntimeProfile* profile = nullptr; RuntimeState* runtime_state = nullptr; @@ -203,8 +204,6 @@ class TabletReader { int64_t batch_size = -1; std::map virtual_column_exprs; - std::map vir_cid_to_idx_in_block; - std::map vir_col_idx_to_type; std::shared_ptr score_runtime; CollectionStatisticsPtr collection_statistics; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index a4212918c38e9f..ac38a1782102ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -36,6 +36,7 @@ import org.apache.doris.catalog.AliasFunction; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.Config; @@ -864,15 +865,20 @@ private PlanFragment getPlanFragmentForPhysicalFileScan(PhysicalFileScan fileSca @Override public PlanFragment visitPhysicalOlapScan(PhysicalOlapScan olapScan, PlanTranslatorContext context) { - return computePhysicalOlapScan(olapScan, context); + return computePhysicalOlapScan(olapScan, context, true); } - private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan, PlanTranslatorContext context) { - List slots = olapScan.getOutput(); + private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan, PlanTranslatorContext context, + boolean projectStorageAlignedScanOutput) { + List outputSlots = olapScan.getOutput(); + StorageAlignedScanSlots storageAlignedScanSlots = computeStorageAlignedScanSlots(olapScan); + List slots = storageAlignedScanSlots.scanSlots; OlapTable olapTable = olapScan.getTable(); // generate real output tuple TupleDescriptor tupleDescriptor = generateTupleDesc(slots, olapTable, context); List slotDescriptors = tupleDescriptor.getSlots(); + Map exprIdToSlotDescriptor = slotDescriptors.stream() + .collect(Collectors.toMap(s -> context.findExprId(s.getId()), s -> s)); // put virtual column expr into slot desc Map slotToVirtualColumnMap = olapScan.getSlotToVirtualColumnMap(); @@ -892,6 +898,12 @@ private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan, PlanTran OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), tupleDescriptor, "OlapScanNode", context.getScanContext()); + Set filledKeyColumnSlotIds = storageAlignedScanSlots.filledKeyExprIds.stream() + .map(exprId -> Objects.requireNonNull(exprIdToSlotDescriptor.get(exprId), + "missing filled key slot descriptor for " + exprId)) + .map(slotDescriptor -> slotDescriptor.getId().asInt()) + .collect(Collectors.toSet()); + olapScanNode.setFilledKeyColumnSlotIds(filledKeyColumnSlotIds); olapScanNode.setNereidsId(olapScan.getId()); context.getNereidsIdToPlanNodeIdMap().put(olapScan.getId(), olapScanNode.getId()); @@ -925,15 +937,18 @@ private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan, PlanTran // because it is whole table cardinality and will break block rules. // olapScanNode.setCardinality((long) olapScan.getStats().getRowCount()); if (context.getSessionVariable() != null && context.getSessionVariable().forbidUnknownColStats) { - for (int i = 0; i < slots.size(); i++) { - SlotReference slot = (SlotReference) slots.get(i); + for (Slot outputSlot : outputSlots) { + SlotReference slot = (SlotReference) outputSlot; boolean inVisibleCol = slot.getOriginalColumn().isPresent() && StatisticConstants.shouldIgnoreCol(olapTable, slot.getOriginalColumn().get()); if (olapScan.getStats().findColumnStatistics(slot).isUnKnown() && !isComplexDataType(slot.getDataType()) && !StatisticConstants.isSystemTable(olapTable) && !inVisibleCol) { - context.addUnknownStatsColumn(olapScanNode, slotDescriptors.get(i).getId()); + SlotDescriptor slotDescriptor = Objects.requireNonNull( + exprIdToSlotDescriptor.get(slot.getExprId()), + "missing output slot descriptor for " + slot.getExprId()); + context.addUnknownStatsColumn(olapScanNode, slotDescriptor.getId()); } } } @@ -964,6 +979,14 @@ private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan, PlanTran context.addScanNode(olapScanNode, olapScan); translateRuntimeFilter(olapScan, olapScanNode, context); + if (projectStorageAlignedScanOutput && !storageAlignedScanSlots.filledKeyExprIds.isEmpty()) { + List projectionExprs = outputSlots.stream() + .map(slot -> context.findSlotRef(slot.getExprId())) + .collect(Collectors.toList()); + TupleDescriptor projectionTuple = generateTupleDesc(outputSlots, olapTable, context); + olapScanNode.setProjectList(projectionExprs); + olapScanNode.setOutputTupleDesc(projectionTuple); + } olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId())); // Create PlanFragment @@ -985,6 +1008,79 @@ private PlanFragment computePhysicalOlapScan(PhysicalOlapScan olapScan, PlanTran return planFragment; } + private StorageAlignedScanSlots computeStorageAlignedScanSlots(PhysicalOlapScan olapScan) { + if (!shouldAlignScanSlotsToStorageSchema(olapScan)) { + return new StorageAlignedScanSlots(olapScan.getOutput(), Collections.emptySet()); + } + + Set outputExprIds = olapScan.getOutput().stream() + .map(Slot::getExprId) + .collect(Collectors.toSet()); + Map slotByColumnUniqueId = new HashMap<>(); + Map slotByColumnName = new HashMap<>(); + Stream.concat(olapScan.getSelectedIndexOutputs().stream(), olapScan.getOutput().stream()) + .forEach(slot -> { + Optional originalColumn = ((SlotReference) slot).getOriginalColumn(); + if (originalColumn.isPresent()) { + Column column = originalColumn.get(); + if (column.getUniqueId() == Column.COLUMN_UNIQUE_ID_INIT_VALUE) { + slotByColumnName.put(column.getName(), slot); + } else { + slotByColumnUniqueId.put(column.getUniqueId(), slot); + } + } + }); + + List storageSlots = new ArrayList<>(); + Set storageExprIds = new HashSet<>(); + Set filledKeyExprIds = new HashSet<>(); + long selectedIndexId = olapScan.getSelectedIndexId() == -1 + ? olapScan.getTable().getBaseIndexId() + : olapScan.getSelectedIndexId(); + for (Column column : olapScan.getTable().getSchemaByIndexId(selectedIndexId, true)) { + if (!column.isKey()) { + break; + } + Slot slot = column.getUniqueId() == Column.COLUMN_UNIQUE_ID_INIT_VALUE + ? slotByColumnName.get(column.getName()) + : slotByColumnUniqueId.get(column.getUniqueId()); + slot = Objects.requireNonNull(slot, "missing scan slot for storage key column " + column.getName()); + if (storageExprIds.add(slot.getExprId())) { + storageSlots.add(slot); + } + if (!outputExprIds.contains(slot.getExprId())) { + filledKeyExprIds.add(slot.getExprId()); + } + } + for (Slot slot : olapScan.getOutput()) { + if (storageExprIds.add(slot.getExprId())) { + storageSlots.add(slot); + } + } + if (filledKeyExprIds.isEmpty()) { + return new StorageAlignedScanSlots(olapScan.getOutput(), Collections.emptySet()); + } + return new StorageAlignedScanSlots(storageSlots, filledKeyExprIds); + } + + private boolean shouldAlignScanSlotsToStorageSchema(PhysicalOlapScan olapScan) { + KeysType keysType = olapScan.getSelectedIndexId() == -1 + ? olapScan.getTable().getKeysType() + : olapScan.getTable().getIndexMetaByIndexId(olapScan.getSelectedIndexId()).getKeysType(); + return keysType == KeysType.AGG_KEYS + || (keysType == KeysType.UNIQUE_KEYS && !olapScan.getTable().getEnableUniqueKeyMergeOnWrite()); + } + + private static class StorageAlignedScanSlots { + private final List scanSlots; + private final Set filledKeyExprIds; + + StorageAlignedScanSlots(List scanSlots, Set filledKeyExprIds) { + this.scanSlots = scanSlots; + this.filledKeyExprIds = filledKeyExprIds; + } + } + private void translateRuntimeFilter(PhysicalRelation physicalRelation, ScanNode scanNode, PlanTranslatorContext context) { if (context.getRuntimeTranslator().isPresent()) { @@ -2835,7 +2931,7 @@ public PlanFragment visitPhysicalLazyMaterializeTVFScan(PhysicalLazyMaterializeT @Override public PlanFragment visitPhysicalLazyMaterializeOlapScan(PhysicalLazyMaterializeOlapScan lazyScan, PlanTranslatorContext context) { - PlanFragment planFragment = computePhysicalOlapScan(lazyScan.getScan(), context); + PlanFragment planFragment = computePhysicalOlapScan(lazyScan.getScan(), context, false); OlapScanNode olapScanNode = (OlapScanNode) planFragment.getPlanRoot(); // set lazy materialized context olapScanNode.setIsTopnLazyMaterialize(true); @@ -2843,9 +2939,18 @@ public PlanFragment visitPhysicalLazyMaterializeOlapScan(PhysicalLazyMaterialize Set scanIds = lazyScan.getOutput().stream().map(NamedExpression::getExprId) .map(context::findSlotRef).filter(Objects::nonNull).map(SlotRef::getSlotId) .collect(Collectors.toSet()); + olapScanNode.getFilledKeyColumnSlotIds().stream().map(SlotId::new).forEach(scanIds::add); olapScanNode.getTupleDesc().getSlots().removeIf(slot -> !scanIds.contains(slot.getId())); context.createSlotDesc(olapScanNode.getTupleDesc(), lazyScan.getRowId()); + if (!olapScanNode.getFilledKeyColumnSlotIds().isEmpty()) { + List projectionExprs = lazyScan.getOutput().stream() + .map(slot -> context.findSlotRef(slot.getExprId())) + .collect(Collectors.toList()); + TupleDescriptor projectionTuple = generateTupleDesc(lazyScan.getOutput(), lazyScan.getTable(), context); + olapScanNode.setProjectList(projectionExprs); + olapScanNode.setOutputTupleDesc(projectionTuple); + } for (Slot slot : lazyScan.getOutput()) { if (((SlotReference) slot).getOriginalColumn().isPresent()) { olapScanNode.addTopnLazyMaterializeOutputColumns(((SlotReference) slot).getOriginalColumn().get()); @@ -2912,6 +3017,11 @@ private void updateScanSlotsMaterialization(ScanNode scanNode, Set requiredSlotIdSet, Set requiredByProjectSlotIdSet, PlanTranslatorContext context) { Set requiredWithVirtualColumns = Sets.newHashSet(requiredSlotIdSet); + if (scanNode instanceof OlapScanNode) { + ((OlapScanNode) scanNode).getFilledKeyColumnSlotIds().stream() + .map(SlotId::new) + .forEach(requiredWithVirtualColumns::add); + } for (SlotDescriptor virtualSlot : scanNode.getTupleDesc().getSlots()) { Expr virtualColumn = virtualSlot.getVirtualColumn(); if (virtualColumn == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java index 48ff1674709245..3d13e1bfa95587 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalOlapScanToPhysicalOlapScan.java @@ -58,6 +58,7 @@ public Rule build() { convertDistribution(olapScan), olapScan.getPreAggStatus(), olapScan.getOutputByIndex(olapScan.getTable().getBaseIndexId()), + olapScan.getOutputByIndex(olapScan.getSelectedIndexId()), Optional.empty(), olapScan.getLogicalProperties(), null, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java index 9205f008cf4b1d..94661d638d81c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOlapScan.java @@ -66,6 +66,7 @@ public class PhysicalOlapScan extends PhysicalCatalogRelation implements OlapSca private final boolean hasPartitionPredicate; private final PreAggStatus preAggStatus; private final List baseOutputs; + private final List selectedIndexOutputs; private final Optional tableSample; private final ImmutableList operativeSlots; @@ -100,7 +101,7 @@ public PhysicalOlapScan(RelationId id, OlapTable olapTable, List qualifi List annOrderKeys, Optional annLimit) { this(id, olapTable, qualifier, selectedIndexId, selectedTabletIds, selectedPartitionIds, false, distributionSpec, - preAggStatus, baseOutputs, + preAggStatus, baseOutputs, baseOutputs, groupExpression, logicalProperties, null, null, tableSample, operativeSlots, virtualColumns, scoreOrderKeys, scoreLimit, scoreRangeInfo, annOrderKeys, annLimit, ""); @@ -119,7 +120,7 @@ public PhysicalOlapScan(RelationId id, OlapTable olapTable, List qualifi List scoreOrderKeys, Optional scoreLimit, Optional scoreRangeInfo, List annOrderKeys, Optional annLimit) { this(id, olapTable, qualifier, selectedIndexId, selectedTabletIds, selectedPartitionIds, false, - distributionSpec, preAggStatus, baseOutputs, groupExpression, logicalProperties, + distributionSpec, preAggStatus, baseOutputs, baseOutputs, groupExpression, logicalProperties, physicalProperties, statistics, tableSample, operativeSlots, virtualColumns, scoreOrderKeys, scoreLimit, scoreRangeInfo, annOrderKeys, annLimit, ""); } @@ -137,7 +138,7 @@ public PhysicalOlapScan(RelationId id, OlapTable olapTable, List qualifi List scoreOrderKeys, Optional scoreLimit, Optional scoreRangeInfo, List annOrderKeys, Optional annLimit, String tableAlias) { this(id, olapTable, qualifier, selectedIndexId, selectedTabletIds, selectedPartitionIds, false, - distributionSpec, preAggStatus, baseOutputs, groupExpression, logicalProperties, + distributionSpec, preAggStatus, baseOutputs, baseOutputs, groupExpression, logicalProperties, physicalProperties, statistics, tableSample, operativeSlots, virtualColumns, scoreOrderKeys, scoreLimit, scoreRangeInfo, annOrderKeys, annLimit, tableAlias); } @@ -155,20 +156,39 @@ public PhysicalOlapScan(RelationId id, OlapTable olapTable, List qualifi List scoreOrderKeys, Optional scoreLimit, Optional scoreRangeInfo, List annOrderKeys, Optional annLimit, String tableAlias) { this(id, olapTable, qualifier, selectedIndexId, selectedTabletIds, selectedPartitionIds, - hasPartitionPredicate, distributionSpec, preAggStatus, baseOutputs, groupExpression, + hasPartitionPredicate, distributionSpec, preAggStatus, baseOutputs, baseOutputs, groupExpression, logicalProperties, physicalProperties, statistics, tableSample, operativeSlots, virtualColumns, scoreOrderKeys, scoreLimit, scoreRangeInfo, annOrderKeys, annLimit, tableAlias, Optional.empty()); } + /** + * Constructor for PhysicalOlapScan. + */ + public PhysicalOlapScan(RelationId id, OlapTable olapTable, List qualifier, long selectedIndexId, + List selectedTabletIds, List selectedPartitionIds, boolean hasPartitionPredicate, + DistributionSpec distributionSpec, PreAggStatus preAggStatus, List baseOutputs, + List selectedIndexOutputs, Optional groupExpression, + LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, + Optional tableSample, Collection operativeSlots, + List virtualColumns, List scoreOrderKeys, Optional scoreLimit, + Optional scoreRangeInfo, List annOrderKeys, Optional annLimit, + String tableAlias) { + this(id, olapTable, qualifier, selectedIndexId, selectedTabletIds, selectedPartitionIds, + hasPartitionPredicate, distributionSpec, preAggStatus, baseOutputs, selectedIndexOutputs, + groupExpression, logicalProperties, physicalProperties, statistics, tableSample, operativeSlots, + virtualColumns, scoreOrderKeys, scoreLimit, scoreRangeInfo, annOrderKeys, annLimit, tableAlias, + Optional.empty()); + } + /** * Ultimate constructor for PhysicalOlapScan. */ public PhysicalOlapScan(RelationId id, OlapTable olapTable, List qualifier, long selectedIndexId, List selectedTabletIds, List selectedPartitionIds, boolean hasPartitionPredicate, DistributionSpec distributionSpec, PreAggStatus preAggStatus, List baseOutputs, - Optional groupExpression, LogicalProperties logicalProperties, - PhysicalProperties physicalProperties, Statistics statistics, + List selectedIndexOutputs, Optional groupExpression, + LogicalProperties logicalProperties, PhysicalProperties physicalProperties, Statistics statistics, Optional tableSample, Collection operativeSlots, List virtualColumns, List scoreOrderKeys, Optional scoreLimit, Optional scoreRangeInfo, @@ -183,6 +203,7 @@ public PhysicalOlapScan(RelationId id, OlapTable olapTable, List qualifi this.distributionSpec = distributionSpec; this.preAggStatus = preAggStatus; this.baseOutputs = ImmutableList.copyOf(baseOutputs); + this.selectedIndexOutputs = ImmutableList.copyOf(selectedIndexOutputs); this.tableSample = tableSample; this.operativeSlots = ImmutableList.copyOf(operativeSlots); this.virtualColumns = ImmutableList.copyOf(virtualColumns); @@ -228,9 +249,10 @@ public PhysicalOlapScan withPartitionPrunablePredicates( Optional partitionPrunablePredicates) { return AbstractPlan.copyWithSameId(this, () -> new PhysicalOlapScan(relationId, getTable(), qualifier, selectedIndexId, selectedTabletIds, selectedPartitionIds, hasPartitionPredicate, - distributionSpec, preAggStatus, baseOutputs, groupExpression, getLogicalProperties(), - getPhysicalProperties(), statistics, tableSample, operativeSlots, virtualColumns, scoreOrderKeys, - scoreLimit, scoreRangeInfo, annOrderKeys, annLimit, tableAlias, partitionPrunablePredicates)); + distributionSpec, preAggStatus, baseOutputs, selectedIndexOutputs, groupExpression, + getLogicalProperties(), getPhysicalProperties(), statistics, tableSample, operativeSlots, + virtualColumns, scoreOrderKeys, scoreLimit, scoreRangeInfo, annOrderKeys, annLimit, tableAlias, + partitionPrunablePredicates)); } @Override @@ -256,6 +278,10 @@ public List getBaseOutputs() { return baseOutputs; } + public List getSelectedIndexOutputs() { + return selectedIndexOutputs; + } + public List getVirtualColumns() { return virtualColumns; } @@ -351,6 +377,7 @@ public boolean equals(Object o) { && Objects.equals(selectedPartitionIds, olapScan.selectedPartitionIds) && Objects.equals(preAggStatus, olapScan.preAggStatus) && Objects.equals(baseOutputs, olapScan.baseOutputs) + && Objects.equals(selectedIndexOutputs, olapScan.selectedIndexOutputs) && Objects.equals(tableSample, olapScan.tableSample) && Objects.equals(operativeSlots, olapScan.operativeSlots) && Objects.equals(virtualColumns, olapScan.virtualColumns) @@ -376,8 +403,9 @@ public R accept(PlanVisitor visitor, C context) { public PhysicalOlapScan withGroupExpression(Optional groupExpression) { return AbstractPlan.copyWithSameId(this, () -> new PhysicalOlapScan(relationId, getTable(), qualifier, selectedIndexId, selectedTabletIds, selectedPartitionIds, hasPartitionPredicate, - distributionSpec, preAggStatus, baseOutputs, groupExpression, getLogicalProperties(), null, null, - tableSample, operativeSlots, virtualColumns, scoreOrderKeys, scoreLimit, scoreRangeInfo, + distributionSpec, preAggStatus, baseOutputs, selectedIndexOutputs, groupExpression, + getLogicalProperties(), null, null, tableSample, operativeSlots, virtualColumns, scoreOrderKeys, + scoreLimit, scoreRangeInfo, annOrderKeys, annLimit, tableAlias, partitionPrunablePredicates)); } @@ -386,8 +414,9 @@ public Plan withGroupExprLogicalPropChildren(Optional groupExpr Optional logicalProperties, List children) { return AbstractPlan.copyWithSameId(this, () -> new PhysicalOlapScan(relationId, getTable(), qualifier, selectedIndexId, selectedTabletIds, selectedPartitionIds, hasPartitionPredicate, - distributionSpec, preAggStatus, baseOutputs, groupExpression, logicalProperties.get(), null, null, - tableSample, operativeSlots, virtualColumns, scoreOrderKeys, scoreLimit, scoreRangeInfo, + distributionSpec, preAggStatus, baseOutputs, selectedIndexOutputs, groupExpression, + logicalProperties.get(), null, null, tableSample, operativeSlots, virtualColumns, scoreOrderKeys, + scoreLimit, scoreRangeInfo, annOrderKeys, annLimit, tableAlias, partitionPrunablePredicates)); } @@ -396,9 +425,10 @@ public PhysicalOlapScan withPhysicalPropertiesAndStats( PhysicalProperties physicalProperties, Statistics statistics) { return AbstractPlan.copyWithSameId(this, () -> new PhysicalOlapScan(relationId, getTable(), qualifier, selectedIndexId, selectedTabletIds, selectedPartitionIds, hasPartitionPredicate, - distributionSpec, preAggStatus, baseOutputs, groupExpression, getLogicalProperties(), - physicalProperties, statistics, tableSample, operativeSlots, virtualColumns, scoreOrderKeys, - scoreLimit, scoreRangeInfo, annOrderKeys, annLimit, tableAlias, partitionPrunablePredicates)); + distributionSpec, preAggStatus, baseOutputs, selectedIndexOutputs, groupExpression, + getLogicalProperties(), physicalProperties, statistics, tableSample, operativeSlots, virtualColumns, + scoreOrderKeys, scoreLimit, scoreRangeInfo, annOrderKeys, annLimit, tableAlias, + partitionPrunablePredicates)); } @Override @@ -423,10 +453,10 @@ public Optional getTableSample() { public CatalogRelation withOperativeSlots(Collection operativeSlots) { return AbstractPlan.copyWithSameId(this, () -> new PhysicalOlapScan(relationId, (OlapTable) table, qualifier, selectedIndexId, selectedTabletIds, selectedPartitionIds, hasPartitionPredicate, - distributionSpec, preAggStatus, baseOutputs, groupExpression, getLogicalProperties(), - getPhysicalProperties(), statistics, tableSample, operativeSlots, virtualColumns, scoreOrderKeys, - scoreLimit, - scoreRangeInfo, annOrderKeys, annLimit, tableAlias, partitionPrunablePredicates)); + distributionSpec, preAggStatus, baseOutputs, selectedIndexOutputs, groupExpression, + getLogicalProperties(), getPhysicalProperties(), statistics, tableSample, operativeSlots, + virtualColumns, scoreOrderKeys, scoreLimit, scoreRangeInfo, annOrderKeys, annLimit, tableAlias, + partitionPrunablePredicates)); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 11f8a24cca1ec9..92d3a7361be936 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -178,6 +178,7 @@ public class OlapScanNode extends ScanNode { private SortInfo sortInfo = null; private Set outputColumnUniqueIds = new HashSet<>(); + private Set filledKeyColumnSlotIds = new HashSet<>(); // When scan match sort_info, we can push limit into OlapScanNode. // It's limit for scanner instead of scanNode so we add a new limit. @@ -256,6 +257,14 @@ public void setTableSample(TableSample tSample) { this.tableSample = tSample; } + public void setFilledKeyColumnSlotIds(Set filledKeyColumnSlotIds) { + this.filledKeyColumnSlotIds = filledKeyColumnSlotIds; + } + + public Set getFilledKeyColumnSlotIds() { + return filledKeyColumnSlotIds; + } + public void setNereidsPrunedTabletIds(Set nereidsPrunedTabletIds) { this.nereidsPrunedTabletIds = nereidsPrunedTabletIds; } @@ -1283,6 +1292,9 @@ protected void toThrift(TPlanNode msg) { if (outputColumnUniqueIds != null) { msg.olap_scan_node.setOutputColumnUniqueIds(outputColumnUniqueIds); } + if (!filledKeyColumnSlotIds.isEmpty()) { + msg.olap_scan_node.setFilledKeyColumnSlotIds(filledKeyColumnSlotIds); + } msg.olap_scan_node.setDistributeColumnIds(new ArrayList<>(distributionColumnIds)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java index c31f96792f28e7..21c10d07470970 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslatorTest.java @@ -102,9 +102,12 @@ public void testOlapPrune() throws Exception { List qualifier = new ArrayList<>(); qualifier.add("test"); List t1Output = new ArrayList<>(); - SlotReference col1 = new SlotReference("col1", IntegerType.INSTANCE); - SlotReference col2 = new SlotReference("col2", IntegerType.INSTANCE); - SlotReference col3 = new SlotReference("col2", IntegerType.INSTANCE); + SlotReference col1 = SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), + t1, t1.getBaseSchema().get(0), qualifier); + SlotReference col2 = SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), + t1, t1.getBaseSchema().get(1), qualifier); + SlotReference col3 = SlotReference.fromColumn(StatementScopeIdGenerator.newExprId(), + t1, t1.getBaseSchema().get(1), qualifier); t1Output.add(col1); t1Output.add(col2); t1Output.add(col3); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index c7954c423af73c..c0248be53412e1 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -996,6 +996,8 @@ struct TOlapScanNode { // Only partitions that are candidates for pruning are included; partitions FE // does not want pruned (e.g. default catch-all) are omitted from this list. 27: optional list partition_boundaries + // Slot ids of storage key columns filled by FE only to align the scan tuple with storage schema. + 28: optional set filled_key_column_slot_ids } struct TEqJoinCondition {