Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions be/src/exec/operator/olap_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -975,16 +975,6 @@ Status OlapScanLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(virtual_column_expr_ctx->open(state));

_slot_id_to_virtual_column_expr[slot_desc->id()] = virtual_column_expr_ctx;
_slot_id_to_col_type[slot_desc->id()] = slot_desc->get_data_type_ptr();
int col_pos = p.intermediate_row_desc().get_column_id(slot_desc->id());
if (col_pos < 0) {
return Status::InternalError(
"Invalid virtual slot, can not find its information. Slot desc:\n{}\nRow "
"desc:\n{}",
slot_desc->debug_string(), p.row_desc().debug_string());
} else {
_slot_id_to_index_in_block[slot_desc->id()] = col_pos;
}
}
}

Expand Down
3 changes: 0 additions & 3 deletions be/src/exec/operator/olap_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,6 @@ class OlapScanLocalState final : public ScanLocalState<OlapScanLocalState> {
std::vector<TabletReadSource> _read_sources;

std::map<SlotId, VExprContextSPtr> _slot_id_to_virtual_column_expr;
std::map<SlotId, size_t> _slot_id_to_index_in_block;
// this map is needed for scanner opening.
std::map<SlotId, DataTypePtr> _slot_id_to_col_type;

// ---- Runtime-filter partition pruning ----
// Attaches this per-instance pruner to the shared parse result owned by
Expand Down
28 changes: 14 additions & 14 deletions be/src/exec/scan/olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,11 @@ OlapScanner::OlapScanner(ScanLocalStateBase* parent, OlapScanner::Params&& param
.rs_splits {},
.return_columns {},
.output_columns {},
.filled_columns {},
.common_expr_ctxs_push_down {},
.topn_filter_source_node_ids {},
.key_group_cluster_key_idxes {},
.virtual_column_exprs {},
.vir_cid_to_idx_in_block {},
.vir_col_idx_to_type {},
.score_runtime {},
.collection_statistics {},
.ann_topn_runtime {},
Expand Down Expand Up @@ -173,8 +172,6 @@ Status OlapScanner::_prepare_impl() {
_slot_id_to_virtual_column_expr[pair.first] = context;
}

_slot_id_to_index_in_block = local_state->_slot_id_to_index_in_block;
_slot_id_to_col_type = local_state->_slot_id_to_col_type;
_score_runtime = local_state->_score_runtime;
// All scanners share the same ann_topn_runtime.
_ann_topn_runtime = local_state->_ann_topn_runtime;
Expand Down Expand Up @@ -330,15 +327,14 @@ Status OlapScanner::_init_tablet_reader_params(
push_down_agg_type != TPushAggOp::COUNT_ON_INDEX);
}

_tablet_reader_params.filled_columns.clear();
RETURN_IF_ERROR(_init_variant_columns());
RETURN_IF_ERROR(_init_return_columns());

_tablet_reader_params.push_down_agg_type_opt = _local_state->get_push_down_agg_type();

_tablet_reader_params.common_expr_ctxs_push_down = _common_expr_ctxs_push_down;
_tablet_reader_params.virtual_column_exprs = _virtual_column_exprs;
_tablet_reader_params.vir_cid_to_idx_in_block = _vir_cid_to_idx_in_block;
_tablet_reader_params.vir_col_idx_to_type = _vir_col_idx_to_type;
_tablet_reader_params.score_runtime = _score_runtime;
_tablet_reader_params.output_columns = ((OlapScanLocalState*)_local_state)->_output_column_ids;
_tablet_reader_params.ann_topn_runtime = _ann_topn_runtime;
Expand Down Expand Up @@ -598,17 +594,21 @@ Status OlapScanner::_init_return_columns() {
if (slot->get_virtual_column_expr()) {
ColumnId virtual_column_cid = index;
_virtual_column_exprs[virtual_column_cid] = _slot_id_to_virtual_column_expr[slot->id()];
size_t idx_in_block = _slot_id_to_index_in_block[slot->id()];
_vir_cid_to_idx_in_block[virtual_column_cid] = idx_in_block;
_vir_col_idx_to_type[idx_in_block] = _slot_id_to_col_type[slot->id()];

VLOG_DEBUG << fmt::format(
"Virtual column, slot id: {}, cid {}, column index: {}, type: {}", slot->id(),
virtual_column_cid, _vir_cid_to_idx_in_block[virtual_column_cid],
_vir_col_idx_to_type[idx_in_block]->get_name());

VLOG_DEBUG << fmt::format("Virtual column, slot id: {}, cid {}, type: {}", slot->id(),
virtual_column_cid, slot->get_data_type_ptr()->get_name());
}

const auto& column = tablet_schema->column(index);
auto* olap_local_state = static_cast<OlapScanLocalState*>(_local_state);
const auto& olap_scan_node = olap_local_state->olap_scan_node();
if (olap_scan_node.__isset.filled_key_column_slot_ids &&
olap_scan_node.filled_key_column_slot_ids.contains(slot->id())) {
DORIS_CHECK(column.is_key());
if (_tablet_reader_params.direct_mode) {
_tablet_reader_params.filled_columns.insert(index);
}
}
int32_t unique_id =
column.unique_id() >= 0 ? column.unique_id() : column.parent_unique_id();
if (!slot->all_access_paths().empty()) {
Expand Down
10 changes: 2 additions & 8 deletions be/src/exec/scan/olap_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
#pragma once

#include <gen_cpp/PaloInternalService_types.h>
#include <stdint.h>

#include <cstddef>
#include <cstdint>
#include <map>
#include <memory>
#include <string>
Expand Down Expand Up @@ -113,17 +113,11 @@ class OlapScanner : public Scanner {

std::unordered_set<uint32_t> _tablet_columns_convert_to_null_set;

// This three fields are copied from OlapScanLocalState.
// This field is copied from OlapScanLocalState.
std::map<SlotId, VExprContextSPtr> _slot_id_to_virtual_column_expr;
std::map<SlotId, size_t> _slot_id_to_index_in_block;
std::map<SlotId, DataTypePtr> _slot_id_to_col_type;

// ColumnId of virtual column to its expr context
std::map<ColumnId, VExprContextSPtr> _virtual_column_exprs;
// ColumnId of virtual column to its index in block
std::map<ColumnId, size_t> _vir_cid_to_idx_in_block;
// The idx of vir_col in block to its data type.
std::map<size_t, DataTypePtr> _vir_col_idx_to_type;
std::shared_ptr<ScoreRuntime> _score_runtime;

std::shared_ptr<segment_v2::AnnTopNRuntime> _ann_topn_runtime;
Expand Down
11 changes: 5 additions & 6 deletions be/src/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -362,18 +362,17 @@ void ScannerScheduler::_make_sure_virtual_col_is_materialized(
continue;
}

std::vector<std::string> vcid_to_idx;

for (const auto& pair : olap_scanner->_vir_cid_to_idx_in_block) {
vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second));
std::vector<ColumnId> virtual_column_ids;
for (const auto& pair : olap_scanner->_virtual_column_exprs) {
virtual_column_ids.push_back(pair.first);
}

std::string error_msg = fmt::format(
"Column in idx {} is nothing, block columns {}, normal_columns "
"{}, "
"vir_cid_to_idx_in_block_msg {}",
"virtual_column_ids [{}]",
idx, free_block->columns(), olap_scanner->_return_columns.size(),
fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ",")));
fmt::join(virtual_column_ids, ","));
throw doris::Exception(ErrorCode::INTERNAL_ERROR, error_msg);
}
#endif
Expand Down
22 changes: 10 additions & 12 deletions be/src/storage/iterator/vcollect_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,22 +284,20 @@ Status VCollectIterator::_topn_next(Block* block) {
}

auto clone_block = block->clone_empty();
// Initialize virtual slot columns by schema (avoid runtime type checks):
// use _reader_context.vir_col_idx_to_type to construct real columns for those positions.
if (!_reader->_reader_context.vir_col_idx_to_type.empty()) {
const auto& idx_to_type = _reader->_reader_context.vir_col_idx_to_type;
for (const auto& kv : idx_to_type) {
size_t idx = kv.first;
if (idx < clone_block.columns()) {
clone_block.get_by_position(idx).column = kv.second->create_column();
}
}
}
// Initialize virtual slot columns by schema (avoid runtime type checks).
for (const auto& [cid, expr_ctx] : _reader->_reader_context.virtual_column_exprs) {
auto it = std::find(_reader->_return_columns.begin(), _reader->_return_columns.end(), cid);
DORIS_CHECK(it != _reader->_return_columns.end());
auto idx = cast_set<size_t>(std::distance(_reader->_return_columns.begin(), it));
DORIS_CHECK(idx < clone_block.columns());
clone_block.get_by_position(idx).column = expr_ctx->root()->data_type()->create_column();
}
const size_t clone_block_columns = clone_block.columns();
MutableBlock mutable_block = MutableBlock::build_mutable_block(std::move(clone_block));

const std::vector<uint32_t>* sort_columns = _reader->_reader_context.read_orderby_key_columns;
for (auto column_idx : *sort_columns) {
DORIS_CHECK(column_idx < clone_block.columns());
DORIS_CHECK(column_idx < clone_block_columns);
}
size_t first_sort_column_idx = (*sort_columns)[0];

Expand Down
4 changes: 2 additions & 2 deletions be/src/storage/iterators.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <cstddef>
#include <memory>
#include <set>

#include "common/status.h"
#include "core/block/block.h"
Expand Down Expand Up @@ -129,6 +130,7 @@ class StorageReadOptions {
io::IOContext io_ctx;
VExprContextSPtrs common_expr_ctxs_push_down;
const std::set<int32_t>* output_columns = nullptr;
std::set<ColumnId> filled_columns;
// runtime state
RuntimeState* runtime_state = nullptr;
RowsetId rowset_id;
Expand All @@ -147,8 +149,6 @@ class StorageReadOptions {

std::map<ColumnId, VExprContextSPtr> virtual_column_exprs;
std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
std::map<size_t, DataTypePtr> vir_col_idx_to_type;

std::map<int32_t, TColumnAccessPaths> all_access_paths;
std::map<int32_t, TColumnAccessPaths> predicate_access_paths;
Expand Down
4 changes: 2 additions & 2 deletions be/src/storage/rowset/beta_rowset_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,13 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.stats = _stats;
_read_options.push_down_agg_type_opt = _read_context->push_down_agg_type_opt;
_read_options.common_expr_ctxs_push_down = _read_context->common_expr_ctxs_push_down;
DORIS_CHECK(_read_context->return_columns != nullptr);
_read_options.virtual_column_exprs = _read_context->virtual_column_exprs;

_read_options.all_access_paths = _read_context->all_access_paths;
_read_options.predicate_access_paths = _read_context->predicate_access_paths;

_read_options.ann_topn_runtime = _read_context->ann_topn_runtime;
_read_options.vir_cid_to_idx_in_block = _read_context->vir_cid_to_idx_in_block;
_read_options.vir_col_idx_to_type = _read_context->vir_col_idx_to_type;
_read_options.score_runtime = _read_context->score_runtime;
_read_options.collection_statistics = _read_context->collection_statistics;
_read_options.rowset_id = _rowset->rowset_id();
Expand Down Expand Up @@ -151,6 +150,7 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
// create segment iterators
VLOG_NOTICE << "read columns size: " << read_columns.size();
_input_schema = std::make_shared<Schema>(_read_context->tablet_schema->columns(), read_columns);
_read_options.filled_columns = _read_context->filled_columns;
// output_schema only contains return_columns (excludes extra columns like delete-predicate columns).
// It is used by merge/union iterators to determine how many columns to copy to the output block.
_output_schema = std::make_shared<Schema>(_read_context->tablet_schema->columns(),
Expand Down
7 changes: 2 additions & 5 deletions be/src/storage/rowset/rowset_reader_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H
#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_READER_CONTEXT_H

#include <set>
#include <vector>

#include "exprs/score_runtime.h"
Expand Down Expand Up @@ -76,9 +77,6 @@ struct RowsetReaderContext {
// Effective adaptive batch size byte budget. 0 means disabled internally.
size_t preferred_block_size_bytes = 8388608UL;

// Points to the "true" output column list before non-direct-mode expansion.
// Used by BlockReader to map expanded storage columns back to the requested output layout.
const std::vector<ColumnId>* origin_return_columns = nullptr;
bool is_unique = false;
//record row num merged in generic iterator
uint64_t* merged_rows = nullptr;
Expand All @@ -89,14 +87,13 @@ struct RowsetReaderContext {
RowIdConversion* rowid_conversion = nullptr;
bool is_key_column_group = false;
const std::set<int32_t>* output_columns = nullptr;
std::set<ColumnId> filled_columns;
RowsetId rowset_id;
// slots that cast may be eliminated in storage layer
std::map<std::string, DataTypePtr> target_cast_type_for_variants;
int64_t ttl_seconds = 0;

std::map<ColumnId, VExprContextSPtr> virtual_column_exprs;
std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
std::map<size_t, DataTypePtr> vir_col_idx_to_type;

std::map<int32_t, TColumnAccessPaths> all_access_paths;
std::map<int32_t, TColumnAccessPaths> predicate_access_paths;
Expand Down
Loading
Loading