diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index 846bfdf1c125dd..c78cd3111ac618 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -448,7 +448,7 @@ Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* blo RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos)); local_state.make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg - RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block)); local_state.do_agg_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp b/be/src/pipeline/exec/analytic_source_operator.cpp index 5170711acc3220..86188f45fafbb1 100644 --- a/be/src/pipeline/exec/analytic_source_operator.cpp +++ b/be/src/pipeline/exec/analytic_source_operator.cpp @@ -60,8 +60,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block local_state._shared_state->blocks_buffer.pop(); output_rows = output_block->rows(); //if buffer have no data and sink not eos, block reading and wait for signal again - RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block, - output_block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block)); if (local_state._shared_state->blocks_buffer.empty() && !local_state._shared_state->sink_eos) { // add this mutex to check, as in some case maybe is doing block(), and the sink is doing set eos. diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp b/be/src/pipeline/exec/assert_num_rows_operator.cpp index 47a97e0af649bf..dca10ec02b8a84 100644 --- a/be/src/pipeline/exec/assert_num_rows_operator.cpp +++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp @@ -120,7 +120,7 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc _node_id, to_string_lambda(_assertion), _desired_num_rows, num_rows_returned, _subquery_string); } - RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block)); return Status::OK(); } diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 9cafe75ec76c7a..fc383fe8e27fd8 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -403,7 +403,7 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc local_state._make_nullable_output_key(block); if (!_is_streaming_preagg) { // dispose the having clause, should not be execute in prestreaming agg - RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block)); } local_state.add_num_rows_returned(block->rows()); // If the limit is not reached, it is important to ensure that _aggregated_block is empty diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp b/be/src/pipeline/exec/hashjoin_probe_operator.cpp index cb2c0df540c942..bb54947423bb4f 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp +++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp @@ -382,7 +382,7 @@ Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state } { SCOPED_TIMER(_join_filter_timer); - RETURN_IF_ERROR(filter_block(_conjuncts, temp_block, temp_block->columns())); + RETURN_IF_ERROR(filter_block(_conjuncts, temp_block)); } RETURN_IF_ERROR(_build_output_block(temp_block, output_block)); diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp index 74f0b4c5a36f5d..5f455789894ca6 100644 --- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp @@ -654,9 +654,8 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block { SCOPED_TIMER(local_state._join_filter_timer); - RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, - &local_state._join_block, - local_state._join_block.columns())); + RETURN_IF_ERROR( + local_state.filter_block(local_state._conjuncts, &local_state._join_block)); } RETURN_IF_ERROR(local_state._build_output_block(&local_state._join_block, block)); } diff --git a/be/src/pipeline/exec/operator.cpp b/be/src/pipeline/exec/operator.cpp index 13d9b5a58517ca..b1bdd08e3c375b 100644 --- a/be/src/pipeline/exec/operator.cpp +++ b/be/src/pipeline/exec/operator.cpp @@ -290,8 +290,8 @@ void PipelineXLocalStateBase::clear_origin_block() { } Status PipelineXLocalStateBase::filter_block(const vectorized::VExprContextSPtrs& expr_contexts, - vectorized::Block* block, size_t column_to_keep) { - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(expr_contexts, block, column_to_keep)); + vectorized::Block* block) { + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(expr_contexts, block, block->columns())); _estimate_memory_usage += vectorized::VExprContext::get_memory_usage(expr_contexts); return Status::OK(); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 10e780c07f7710..d6244f850c04ca 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -266,7 +266,7 @@ class PipelineXLocalStateBase { virtual std::vector execution_dependencies() { return {}; } Status filter_block(const vectorized::VExprContextSPtrs& expr_contexts, - vectorized::Block* block, size_t column_to_keep); + vectorized::Block* block); int64_t& estimate_memory_usage() { return _estimate_memory_usage; } diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp b/be/src/pipeline/exec/partition_sort_source_operator.cpp index 127498c18e30cb..795b1a223c54b7 100644 --- a/be/src/pipeline/exec/partition_sort_source_operator.cpp +++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp @@ -74,8 +74,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized:: if (!output_block->empty()) { //if buffer have no data and sink not eos, block reading and wait for signal again - RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block, - output_block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block)); local_state._num_rows_returned += output_block->rows(); } return Status::OK(); diff --git a/be/src/pipeline/exec/rec_cte_scan_operator.h b/be/src/pipeline/exec/rec_cte_scan_operator.h index 5b03766c1639ef..6579d72957f2c8 100644 --- a/be/src/pipeline/exec/rec_cte_scan_operator.h +++ b/be/src/pipeline/exec/rec_cte_scan_operator.h @@ -77,7 +77,7 @@ class RecCTEScanOperatorX final : public OperatorX { return Status::OK(); } *block = std::move(local_state._blocks.back()); - RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block, block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block)); local_state._blocks.pop_back(); return Status::OK(); } diff --git a/be/src/pipeline/exec/rec_cte_source_operator.h b/be/src/pipeline/exec/rec_cte_source_operator.h index 7f42495116e004..0bd5810614692f 100644 --- a/be/src/pipeline/exec/rec_cte_source_operator.h +++ b/be/src/pipeline/exec/rec_cte_source_operator.h @@ -121,8 +121,7 @@ class RecCTESourceOperatorX : public OperatorX { *eos = true; } else { block->swap(ctx->blocks.back()); - RETURN_IF_ERROR( - local_state.filter_block(local_state.conjuncts(), block, block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block)); ctx->blocks.pop_back(); } } diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index 79987c001de112..362f32699a3a8f 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -257,8 +257,8 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl *src_block.safe_get_by_position(_slot_offsets[i]).column, 0, src_block.rows()); } - RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, - _dest_tuple_desc->slots().size())); + DCHECK_EQ(block->columns(), _dest_tuple_desc->slots().size()); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block)); src_block.clear(); } } while (block->rows() == 0 && !*eos); diff --git a/be/src/pipeline/exec/select_operator.h b/be/src/pipeline/exec/select_operator.h index f033c7c0de81a8..bcdb8924068212 100644 --- a/be/src/pipeline/exec/select_operator.h +++ b/be/src/pipeline/exec/select_operator.h @@ -47,7 +47,7 @@ class SelectOperatorX final : public StreamingOperatorX { auto& local_state = get_local_state(state); SCOPED_TIMER(local_state.exec_time_counter()); RETURN_IF_CANCELLED(state); - RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block)); local_state.reached_limit(block, eos); return Status::OK(); } diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_operator.cpp index fe9aef1662a6e3..898be928121a81 100644 --- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp @@ -1024,7 +1024,7 @@ Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block RETURN_IF_ERROR(local_state._get_results_with_serialized_key(state, block, eos)); local_state.make_nullable_output_key(block); // dispose the having clause, should not be execute in prestreaming agg - RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns())); + RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block)); } local_state.reached_limit(block, eos);