From 74cc7c6e220bfcc8d7ee24069ced759452aa79cc Mon Sep 17 00:00:00 2001 From: Mryange Date: Sat, 28 Feb 2026 10:33:28 +0800 Subject: [PATCH] upd --- .../pipeline/exec/analytic_sink_operator.cpp | 5 +---- be/src/pipeline/exec/analytic_sink_operator.h | 3 ++- .../exec/nested_loop_join_probe_operator.h | 19 +++++-------------- be/src/vec/runtime/partitioner.cpp | 16 +++++++--------- be/src/vec/runtime/partitioner.h | 7 ------- 5 files changed, 15 insertions(+), 35 deletions(-) diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 33ba9053103fad..a924b57487b13c 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -780,8 +780,6 @@ Status AnalyticSinkOperatorX::_add_input_block(doris::RuntimeState* state, size_t block_rows = input_block->rows(); local_state._input_total_rows += block_rows; - // record origin columns, maybe be after this, could cast some column but no need to output - auto column_to_keep = input_block->columns(); { SCOPED_TIMER(local_state._compute_agg_data_timer); //insert _agg_input_columns, execute calculate for its, and those columns maybe could remove have used data @@ -817,7 +815,6 @@ Status AnalyticSinkOperatorX::_add_input_block(doris::RuntimeState* state, local_state._range_result_columns[i].get(), block_rows)); } } - vectorized::Block::erase_useless_column(input_block, column_to_keep); COUNTER_UPDATE(local_state._memory_used_counter, input_block->allocated_bytes()); COUNTER_UPDATE(local_state._blocks_memory_usage, input_block->allocated_bytes()); local_state._input_blocks.emplace_back(std::move(*input_block)); @@ -894,7 +891,7 @@ size_t AnalyticSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos return local_state._reserve_mem_size; } -Status AnalyticSinkOperatorX::_insert_range_column(vectorized::Block* block, +Status AnalyticSinkOperatorX::_insert_range_column(const vectorized::Block* block, const vectorized::VExprContextSPtr& expr, vectorized::IColumn* dst_column, size_t length) { vectorized::ColumnPtr column; diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index c4168a33c4acb2..62c4c6774ddf7b 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -230,7 +230,8 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorXget_by_position(i).column->assume_mutable()->clear(); \ - } Status init(RuntimeState* state, LocalStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state) override; @@ -83,7 +79,6 @@ class NestedLoopJoinProbeLocalState final void _append_probe_data_with_null(vectorized::Block& block) const; template void _do_filtering_and_update_visited_flags_impl(vectorized::Block* block, - uint32_t column_to_keep, size_t build_block_idx, size_t processed_blocks_num, bool materialize, Filter& filter) { @@ -127,17 +122,15 @@ class NestedLoopJoinProbeLocalState final if (materialize) { SCOPED_TIMER(_filtered_by_join_conjuncts_timer); - vectorized::Block::filter_block_internal(block, filter, column_to_keep); + vectorized::Block::filter_block_internal(block, filter); } else { - CLEAR_BLOCK + block->clear_column_data(); } } // need exception safety template Status _do_filtering_and_update_visited_flags(vectorized::Block* block, bool materialize) { - // The number of columns will not exceed the range of u32. - uint32_t column_to_keep = cast_set(block->columns()); // If we need to set visited flags for build side, // 1. Execute conjuncts and get a column with bool type to do filtering. // 2. Use bool column to update build-side visited flags. @@ -155,7 +148,7 @@ class NestedLoopJoinProbeLocalState final } if (can_filter_all) { - CLEAR_BLOCK + block->clear_column_data(); std::stack empty1; _probe_offset_stack.swap(empty1); @@ -164,8 +157,7 @@ class NestedLoopJoinProbeLocalState final } else { _do_filtering_and_update_visited_flags_impl( - block, column_to_keep, build_block_idx, processed_blocks_num, materialize, - filter); + block, build_block_idx, processed_blocks_num, materialize, filter); } } else if (block->rows() > 0) { if constexpr (SetBuildSideFlag) { @@ -189,10 +181,9 @@ class NestedLoopJoinProbeLocalState final 1); } if (!materialize) { - CLEAR_BLOCK + block->clear_column_data(); } } - vectorized::Block::erase_useless_column(block, column_to_keep); return Status::OK(); } diff --git a/be/src/vec/runtime/partitioner.cpp b/be/src/vec/runtime/partitioner.cpp index 5095c7a7dbbd37..28ae2c96d09d52 100644 --- a/be/src/vec/runtime/partitioner.cpp +++ b/be/src/vec/runtime/partitioner.cpp @@ -32,16 +32,16 @@ Status Crc32HashPartitioner::do_partitioning(RuntimeState* state, Bl size_t rows = block->rows(); if (rows > 0) { - auto column_to_keep = block->columns(); - - int result_size = cast_set(_partition_expr_ctxs.size()); - std::vector result(result_size); + Columns columns(_partition_expr_ctxs.size()); + for (size_t i = 0; i < _partition_expr_ctxs.size(); ++i) { + RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(block, columns[i])); + } _initialize_hash_vals(rows); auto* __restrict hashes = _hash_vals.data(); - RETURN_IF_ERROR(_get_partition_column_result(block, result)); - for (int j = 0; j < result_size; ++j) { - const auto& [col, is_const] = unpack_if_const(block->get_by_position(result[j]).column); + + for (int j = 0; j < _partition_expr_ctxs.size(); ++j) { + const auto& [col, is_const] = unpack_if_const(columns[j]); if (is_const) { continue; } @@ -51,8 +51,6 @@ Status Crc32HashPartitioner::do_partitioning(RuntimeState* state, Bl for (size_t i = 0; i < rows; i++) { hashes[i] = ChannelIds()(hashes[i], _partition_count); } - - Block::erase_useless_column(block, column_to_keep); } return Status::OK(); } diff --git a/be/src/vec/runtime/partitioner.h b/be/src/vec/runtime/partitioner.h index dfe0e79f988e01..97f7adaa54ba63 100644 --- a/be/src/vec/runtime/partitioner.h +++ b/be/src/vec/runtime/partitioner.h @@ -81,13 +81,6 @@ class Crc32HashPartitioner : public PartitionerBase { Status clone(RuntimeState* state, std::unique_ptr& partitioner) override; protected: - Status _get_partition_column_result(Block* block, std::vector& result) const { - int counter = 0; - for (auto ctx : _partition_expr_ctxs) { - RETURN_IF_ERROR(ctx->execute(block, &result[counter++])); - } - return Status::OK(); - } Status _clone_expr_ctxs(RuntimeState* state, VExprContextSPtrs& new_partition_expr_ctxs) const { new_partition_expr_ctxs.resize(_partition_expr_ctxs.size());