Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions be/src/olap/rowset/segment_v2/condition_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,32 @@ void ConditionCache::insert(const CacheKey& key, std::shared_ptr<std::vector<boo
result->capacity(), result->capacity(), CachePriority::NORMAL));
}

bool ConditionCache::lookup(const ExternalCacheKey& key, ConditionCacheHandle* handle) {
auto encoded = key.encode();
if (encoded.empty()) {
return false;
}
auto* lru_handle = LRUCachePolicy::lookup(encoded);
if (lru_handle == nullptr) {
return false;
}
*handle = ConditionCacheHandle(this, lru_handle);
return true;
}

void ConditionCache::insert(const ExternalCacheKey& key,
std::shared_ptr<std::vector<bool>> result) {
auto encoded = key.encode();
if (encoded.empty()) {
return;
}
std::unique_ptr<ConditionCache::CacheValue> cache_value_ptr =
std::make_unique<ConditionCache::CacheValue>();
cache_value_ptr->filter_result = result;

ConditionCacheHandle(this, LRUCachePolicy::insert(encoded, (void*)cache_value_ptr.release(),
result->capacity(), result->capacity(),
CachePriority::NORMAL));
}

} // namespace doris::segment_v2
34 changes: 34 additions & 0 deletions be/src/olap/rowset/segment_v2/condition_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,36 @@ class ConditionCache : public LRUCachePolicy {
std::shared_ptr<std::vector<bool>> filter_result;
};

// Cache key for external tables (Hive ORC/Parquet)
struct ExternalCacheKey {
ExternalCacheKey(const std::string& path_, int64_t modification_time_, int64_t file_size_,
uint64_t digest_, int64_t start_offset_, int64_t size_)
: path(path_),
modification_time(modification_time_),
file_size(file_size_),
digest(digest_),
start_offset(start_offset_),
size(size_) {}
std::string path;
int64_t modification_time;
int64_t file_size;
uint64_t digest;
int64_t start_offset;
int64_t size;

[[nodiscard]] std::string encode() const {
std::string key = path;
char buf[40];
memcpy(buf, &modification_time, 8);
memcpy(buf + 8, &file_size, 8);
memcpy(buf + 16, &digest, 8);
memcpy(buf + 24, &start_offset, 8);
memcpy(buf + 32, &size, 8);
key.append(buf, 40);
return key;
}
};

// Create global instance of this class
static ConditionCache* create_global_cache(size_t capacity, uint32_t num_shards = 16) {
auto* res = new ConditionCache(capacity, num_shards);
Expand All @@ -90,6 +120,10 @@ class ConditionCache : public LRUCachePolicy {
bool lookup(const CacheKey& key, ConditionCacheHandle* handle);

void insert(const CacheKey& key, std::shared_ptr<std::vector<bool>> filter_result);

bool lookup(const ExternalCacheKey& key, ConditionCacheHandle* handle);

void insert(const ExternalCacheKey& key, std::shared_ptr<std::vector<bool>> filter_result);
};

class ConditionCacheHandle {
Expand Down
64 changes: 64 additions & 0 deletions be/src/olap/rowset/segment_v2/row_ranges.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,52 @@ class RowRanges {
*result = std::move(tmp_range);
}

// Calculates the exception (set difference) of the two specified RowRanges objects: left \ right.
// The result contains all row indexes that are in the left ranges but NOT in the right ranges.
// For example:
// [100, 300) \ [150, 200) = [100, 150), [200, 300)
// [100, 300) \ [0, 150) = [150, 300)
// [100, 300) \ [250, 400) = [100, 250)
// [100, 200) \ [200, 300) = [100, 200)
// [100, 300) \ [0, 400) = <EMPTY>
// [100, 200), [300, 400) \ [150, 350) = [100, 150), [350, 400)
static void ranges_exception(const RowRanges& left, const RowRanges& right, RowRanges* result) {
RowRanges tmp_range;
int right_index = 0;
for (auto it1 = left._ranges.begin(); it1 != left._ranges.end(); ++it1) {
int64_t current_from = it1->from();
int64_t current_to = it1->to();
for (int i = right_index; i < right._ranges.size(); ++i) {
const RowRange& range2 = right._ranges[i];
if (current_from >= current_to) {
// Current range fully consumed
break;
}
if (current_to <= range2.from()) {
// Current remaining range is entirely before range2, no more subtraction needed
break;
}
if (current_from >= range2.to()) {
// range2 is entirely before the current remaining range, advance right_index
right_index = i + 1;
continue;
}
// There is overlap between [current_from, current_to) and range2
if (current_from < range2.from()) {
// Left portion before the overlap: [current_from, range2.from())
tmp_range.add(RowRange(current_from, range2.from()));
}
// Advance current_from past the overlap
current_from = range2.to();
}
// Add whatever remains of the current left range
if (current_from < current_to) {
tmp_range.add(RowRange(current_from, current_to));
}
}
*result = std::move(tmp_range);
}

static roaring::Roaring ranges_to_roaring(const RowRanges& ranges) {
roaring::Roaring result;
for (auto it = ranges._ranges.begin(); it != ranges._ranges.end(); ++it) {
Expand Down Expand Up @@ -275,6 +321,24 @@ class RowRanges {
_count += range_to_add.count();
}

// Returns the row index (within the original row space) of the pos-th element
// across all ranges. For example, if ranges are [0,3000) and [8000,11000),
// pos=0 returns 0, pos=2999 returns 2999, pos=3000 returns 8000.
int64_t get_row_index_by_pos(int64_t pos) const {
DORIS_CHECK(pos < _count);
size_t remaining = pos;
for (const auto& range : _ranges) {
size_t range_len = range.count();
if (remaining < range_len) {
return range.from() + remaining;
}
remaining -= range_len;
}
// pos is out of bounds; return -1 to indicate invalid
DCHECK(false) << "pos " << pos << " is out of bounds for RowRanges with count " << _count;
return -1;
}

uint64_t get_digest(uint64_t seed) const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Bug risk] DCHECK(false) vanishes in RELEASE builds, causing silent return of -1 for out-of-bounds positions. Per AGENTS.md coding standards, invariant violations should crash rather than silently continue.

If pos >= _count is truly an invariant that should never be violated, this should use DORIS_CHECK(false) (or equivalently, DORIS_CHECK(pos < _count) at the function entry) to catch violations in all build types. A silent return of -1 in production could cause incorrect granule marking and data correctness issues.

// Suggested: replace DCHECK with DORIS_CHECK
DORIS_CHECK(false) << "pos " << pos << " is out of bounds for RowRanges with count " << _count;

for (auto range : _ranges) {
seed = range.get_digest(seed);
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/memory/cache_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class CachePolicy {
{"ForUTCacheNumber", CacheType::FOR_UT_CACHE_NUMBER},
{"QueryCache", CacheType::QUERY_CACHE},
{"TabletColumnObjectPool", CacheType::TABLET_COLUMN_OBJECT_POOL},
};
{"ConditionCache", CacheType::CONDITION_CACHE}};

static CacheType string_to_type(std::string type) {
if (StringToType.contains(type)) {
Expand Down
25 changes: 24 additions & 1 deletion be/src/vec/exec/format/generic_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ namespace doris::vectorized {

class Block;
class VSlotRef;

// Context passed from FileScanner to readers for condition cache integration.
// On MISS: readers populate filter_result per-granule during predicate evaluation.
// On HIT: readers skip granules where filter_result[granule] == false.
struct ConditionCacheContext {
bool is_hit = false;
std::shared_ptr<std::vector<bool>> filter_result; // per-granule: true = has surviving rows
static constexpr int GRANULE_SIZE = 2048;
};

// This a reader interface for all file readers.
// A GenericReader is responsible for reading a file and return
// a set of blocks with specified schema,
Expand Down Expand Up @@ -103,7 +113,20 @@ class GenericReader : public ProfileCollector {
bool _fill_all_columns = false;
TPushAggOp::type _push_down_agg_type {};

// For TopN queries, rows will be read according to row ids produced by TopN result.
public:
// Pass condition cache context to the reader for HIT/MISS tracking.
virtual void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {}

// Returns the total number of rows the reader will produce.
// Used to pre-allocate condition cache with the correct number of granules.
virtual int64_t get_total_rows() const { return 0; }

// Returns true if this reader has delete operations (e.g. Iceberg position/equality deletes,
// Hive ACID deletes). Used to disable condition cache when deletes are present, since cached
// granule results may become stale if delete files change between queries.
virtual bool has_delete_operations() const { return false; }

protected:
bool _read_by_rows = false;
std::list<int64_t> _row_ids;

Expand Down
90 changes: 90 additions & 0 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <algorithm>
#include <cctype>
#include <limits>
#include <list>

#include "vec/exprs/vdirect_in_predicate.h"
Expand Down Expand Up @@ -281,6 +282,8 @@ void OrcReader::_collect_profile_before_close() {
if (_file_input_stream != nullptr) {
_file_input_stream->collect_profile_before_close();
}
COUNTER_UPDATE(_orc_profile.condition_cache_filtered_rows_counter,
_statistics.condition_cache_filtered_rows);
}
}

Expand Down Expand Up @@ -320,6 +323,8 @@ void OrcReader::_init_profile() {
ADD_COUNTER_WITH_LEVEL(_profile, "FileFooterReadCalls", TUnit::UNIT, 1);
_orc_profile.file_footer_hit_cache =
ADD_COUNTER_WITH_LEVEL(_profile, "FileFooterHitCache", TUnit::UNIT, 1);
_orc_profile.condition_cache_filtered_rows_counter =
ADD_COUNTER_WITH_LEVEL(_profile, "ConditionCacheFilteredRows", TUnit::UNIT, 1);
}
}

Expand Down Expand Up @@ -2223,6 +2228,35 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
return Status::OK();
}

void OrcReader::_filter_rows_by_condition_cache(size_t* read_rows, bool* eof) {
// Condition cache HIT: skip consecutive false granules before reading.
// Uses _current_read_position which tracks where the *next* batch will
// start, as opposed to _last_read_row_number which is the start of the
// most recently read batch (set after nextBatch returns).
if (_condition_cache_ctx && _condition_cache_ctx->is_hit) {
auto& cache = *_condition_cache_ctx->filter_result;
uint64_t granule = _current_read_position / ConditionCacheContext::GRANULE_SIZE;
auto max_granule = cache.size();
while (granule < max_granule && !cache[granule]) {
granule++;
}
if (granule >= max_granule) {
// Cache is pre-allocated with the total number of
// granules, so no more surviving rows exist in this file.
*eof = true;
*read_rows = 0;
_statistics.condition_cache_filtered_rows += get_total_rows() - _current_read_position;
return;
}
uint64_t target_row = granule * ConditionCacheContext::GRANULE_SIZE;
if (target_row > _current_read_position) {
_row_reader->seekToRow(target_row);
_statistics.condition_cache_filtered_rows += target_row - _current_read_position;
_current_read_position = target_row;
}
}
}

Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eof) {
if (_io_ctx && _io_ctx->should_stop) {
*eof = true;
Expand Down Expand Up @@ -2264,12 +2298,19 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
// reset decimal_scale_params_index;
_decimal_scale_params_index = 0;
try {
_filter_rows_by_condition_cache(read_rows, eof);
if (*eof) {
return Status::OK();
}
rr = _row_reader->nextBatch(*_batch, block);
if (rr == 0 || _batch->numElements == 0) {
*eof = true;
*read_rows = 0;
return Status::OK();
}
// After nextBatch(), getRowNumber() returns the start of the batch just read.
_last_read_row_number = _row_reader->getRowNumber();
_current_read_position = _last_read_row_number + rr;
} catch (std::exception& e) {
std::string _err_msg = e.what();
if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
Expand All @@ -2283,6 +2324,24 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
}
}

// Condition cache MISS: mark granules with surviving rows (lazy-read path).
// This is done here (after nextBatch) instead of in the filter() callback because
// getRowNumber() only returns the correct batch-start row after nextBatch() returns.
if (_condition_cache_ctx && !_condition_cache_ctx->is_hit && _filter) {
auto& cache = *_condition_cache_ctx->filter_result;
auto* filter_data = _filter->data();
size_t filter_size = _filter->size();
for (size_t i = 0; i < filter_size; i++) {
if (filter_data[i]) {
size_t granule =
(_last_read_row_number + i) / ConditionCacheContext::GRANULE_SIZE;
if (granule < cache.size()) {
cache[granule] = true;
}
}
}
}

std::vector<orc::ColumnVectorBatch*> batch_vec;
_fill_batch_vec(batch_vec, _batch.get(), 0);

Expand Down Expand Up @@ -2363,12 +2422,19 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
// reset decimal_scale_params_index;
_decimal_scale_params_index = 0;
try {
_filter_rows_by_condition_cache(read_rows, eof);
if (*eof) {
return Status::OK();
}
rr = _row_reader->nextBatch(*_batch, block);
if (rr == 0 || _batch->numElements == 0) {
*eof = true;
*read_rows = 0;
return Status::OK();
}
// After nextBatch(), getRowNumber() returns the start of the batch just read.
_last_read_row_number = _row_reader->getRowNumber();
_current_read_position = _last_read_row_number + rr;
} catch (std::exception& e) {
std::string _err_msg = e.what();
if (_io_ctx && _io_ctx->should_stop && _err_msg == "stop") {
Expand Down Expand Up @@ -2480,6 +2546,23 @@ Status OrcReader::_get_next_block_impl(Block* block, size_t* read_rows, bool* eo
bool can_filter_all = false;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
filter_conjuncts, &filters, block, &result_filter, &can_filter_all));

// Condition cache MISS: mark granules with surviving rows (non-lazy path)
if (_condition_cache_ctx && !_condition_cache_ctx->is_hit) {
auto& cache = *_condition_cache_ctx->filter_result;
auto* filter_data = result_filter.data();
size_t num_rows = block->rows();
for (size_t i = 0; i < num_rows; i++) {
if (filter_data[i]) {
size_t granule = (_last_read_row_number + i) /
ConditionCacheContext::GRANULE_SIZE;
if (granule < cache.size()) {
cache[granule] = true;
}
}
}
}

if (can_filter_all) {
for (auto& col : columns_to_filter) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
Expand Down Expand Up @@ -2697,6 +2780,13 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
sel[new_size] = i;
new_size += result_filter_data[i] ? 1 : 0;
}

// NOTE: Condition cache MISS marking for the lazy-read path is done
// in _get_next_block_impl after nextBatch() returns, where
// _last_read_row_number has been correctly set via getRowNumber().
// We cannot do it here because this callback fires *during* nextBatch()
// when getRowNumber() still returns the previous batch's start row.

_statistics.lazy_read_filtered_rows += static_cast<int64_t>(size - new_size);
data.numElements = new_size;
return Status::OK();
Expand Down
Loading
Loading