Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* blo
RETURN_IF_ERROR(local_state._executor.get_result(state, block, eos));
local_state.make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns()));
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block));
local_state.do_agg_limit(block, eos);
return Status::OK();
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState* state, vectorized::Block
local_state._shared_state->blocks_buffer.pop();
output_rows = output_block->rows();
//if buffer have no data and sink not eos, block reading and wait for signal again
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block,
output_block->columns()));
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block));
if (local_state._shared_state->blocks_buffer.empty() &&
!local_state._shared_state->sink_eos) {
// add this mutex to check, as in some case maybe is doing block(), and the sink is doing set eos.
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/assert_num_rows_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, vectorized::Bloc
_node_id, to_string_lambda(_assertion), _desired_num_rows, num_rows_returned,
_subquery_string);
}
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns()));
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block));
return Status::OK();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ Status DistinctStreamingAggOperatorX::pull(RuntimeState* state, vectorized::Bloc
local_state._make_nullable_output_key(block);
if (!_is_streaming_preagg) {
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns()));
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block));
}
local_state.add_num_rows_returned(block->rows());
// If the limit is not reached, it is important to ensure that _aggregated_block is empty
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ Status HashJoinProbeLocalState::filter_data_and_build_output(RuntimeState* state
}
{
SCOPED_TIMER(_join_filter_timer);
RETURN_IF_ERROR(filter_block(_conjuncts, temp_block, temp_block->columns()));
RETURN_IF_ERROR(filter_block(_conjuncts, temp_block));
}

RETURN_IF_ERROR(_build_output_block(temp_block, output_block));
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,8 @@ Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, vectorized::Block
{
SCOPED_TIMER(local_state._join_filter_timer);

RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts,
&local_state._join_block,
local_state._join_block.columns()));
RETURN_IF_ERROR(
local_state.filter_block(local_state._conjuncts, &local_state._join_block));
}
RETURN_IF_ERROR(local_state._build_output_block(&local_state._join_block, block));
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ void PipelineXLocalStateBase::clear_origin_block() {
}

Status PipelineXLocalStateBase::filter_block(const vectorized::VExprContextSPtrs& expr_contexts,
vectorized::Block* block, size_t column_to_keep) {
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(expr_contexts, block, column_to_keep));
vectorized::Block* block) {
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(expr_contexts, block, block->columns()));

_estimate_memory_usage += vectorized::VExprContext::get_memory_usage(expr_contexts);
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class PipelineXLocalStateBase {
virtual std::vector<Dependency*> execution_dependencies() { return {}; }

Status filter_block(const vectorized::VExprContextSPtrs& expr_contexts,
vectorized::Block* block, size_t column_to_keep);
vectorized::Block* block);

int64_t& estimate_memory_usage() { return _estimate_memory_usage; }

Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/partition_sort_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, vectorized::

if (!output_block->empty()) {
//if buffer have no data and sink not eos, block reading and wait for signal again
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block,
output_block->columns()));
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, output_block));
local_state._num_rows_returned += output_block->rows();
}
return Status::OK();
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/rec_cte_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class RecCTEScanOperatorX final : public OperatorX<RecCTEScanLocalState> {
return Status::OK();
}
*block = std::move(local_state._blocks.back());
RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block, block->columns()));
RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block));
local_state._blocks.pop_back();
return Status::OK();
}
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/rec_cte_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> {
*eos = true;
} else {
block->swap(ctx->blocks.back());
RETURN_IF_ERROR(
local_state.filter_block(local_state.conjuncts(), block, block->columns()));
RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block));
ctx->blocks.pop_back();
}
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/schema_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl
*src_block.safe_get_by_position(_slot_offsets[i]).column, 0,
src_block.rows());
}
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block,
_dest_tuple_desc->slots().size()));
DCHECK_EQ(block->columns(), _dest_tuple_desc->slots().size());
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block));
src_block.clear();
}
} while (block->rows() == 0 && !*eos);
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/select_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class SelectOperatorX final : public StreamingOperatorX<SelectLocalState> {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns()));
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block));
local_state.reached_limit(block, eos);
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/streaming_aggregation_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ Status StreamingAggOperatorX::pull(RuntimeState* state, vectorized::Block* block
RETURN_IF_ERROR(local_state._get_results_with_serialized_key(state, block, eos));
local_state.make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block, block->columns()));
RETURN_IF_ERROR(local_state.filter_block(local_state._conjuncts, block));
}
local_state.reached_limit(block, eos);

Expand Down
Loading