From 9711a75daa28fdfe7ea12f51580249a917ba3665 Mon Sep 17 00:00:00 2001 From: liaoxin Date: Sat, 28 Feb 2026 14:49:00 +0000 Subject: [PATCH 1/3] [opt](csv reader) optimize stream load CSV read performance Cache nullable string column pointers per-batch to eliminate per-row assert_cast, inline the write path to bypass StringSerDe layer, and pre-reserve ColumnStr/NullMap capacity to reduce realloc overhead. --- be/src/vec/exec/format/csv/csv_reader.cpp | 66 ++++++++++++++++++----- be/src/vec/exec/format/csv/csv_reader.h | 10 ++++ 2 files changed, 64 insertions(+), 12 deletions(-) diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp index 4496f300d769c6..2643e4c04f61d0 100644 --- a/be/src/vec/exec/format/csv/csv_reader.cpp +++ b/be/src/vec/exec/format/csv/csv_reader.cpp @@ -48,6 +48,7 @@ #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" #include "vec/data_types/data_type_factory.hpp" +#include "vec/data_types/serde/data_type_string_serde.h" #include "vec/exec/format/file_reader/new_plain_binary_line_reader.h" #include "vec/exec/format/file_reader/new_plain_text_line_reader.h" #include "vec/exec/scan/scanner.h" @@ -355,6 +356,7 @@ Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { block->set_columns(std::move(mutate_columns)); } else { auto columns = block->mutate_columns(); + _nullable_str_col_cache.clear(); while (rows < batch_size && !_line_reader_eof) { const uint8_t* ptr = nullptr; size_t size = 0; @@ -643,6 +645,32 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, std::vector& columns, size_t* rows) { bool is_success = false; + // Initialize cached column pointers on first call per batch to avoid per-row assert_cast. + // Also pre-reserve capacity for offsets/chars/null_map to eliminate realloc during row loop. + if (UNLIKELY(_nullable_str_col_cache.empty())) { + _nullable_str_col_cache.resize(_file_slot_descs.size()); + _has_escape_char = (_options.escape_char != 0); + const size_t batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE); + for (int i = 0; i < _file_slot_descs.size(); ++i) { + if (_use_nullable_string_opt[i]) { + IColumn* col_ptr = + _is_load + ? columns[i].get() + : const_cast(block->get_by_position(_file_slot_idx_map[i]) + .column.get()); + auto& null_col = assert_cast(*col_ptr); + auto* str_col = assert_cast*>(&null_col.get_nested_column()); + auto* null_map = &null_col.get_null_map_data(); + _nullable_str_col_cache[i].nested_str_col = str_col; + _nullable_str_col_cache[i].null_map = null_map; + // Pre-reserve to avoid repeated realloc inside insert_data/push_back. + str_col->get_offsets().reserve(str_col->get_offsets().size() + batch_size); + str_col->get_chars().reserve(str_col->get_chars().size() + batch_size * 64); + null_map->reserve(null_map->size() + batch_size); + } + } + } + RETURN_IF_ERROR(_line_split_to_values(line, &is_success)); if (UNLIKELY(!is_success)) { // If not success, which means we met an invalid row, filter this row and return. @@ -656,20 +684,34 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, ? _split_values[col_idx] : Slice(_options.null_format, _options.null_len); - IColumn* col_ptr = columns[i].get(); - if (!_is_load) { - // block is a Block*, and get_by_position returns a ColumnPtr, - // which is a const pointer. Therefore, using const_cast is permissible. - col_ptr = const_cast( - block->get_by_position(_file_slot_idx_map[i]).column.get()); - } - if (_use_nullable_string_opt[i]) { - // For load task, we always read "string" from file. - // So serdes[i] here must be DataTypeNullableSerDe, and DataTypeNullableSerDe -> nested_serde must be DataTypeStringSerDe. - // So we use deserialize_nullable_string and stringSerDe to reduce virtual function calls. - RETURN_IF_ERROR(_deserialize_nullable_string(*col_ptr, value)); + // Inline fast path: bypass StringSerDe and per-row assert_cast entirely. + auto& cache = _nullable_str_col_cache[i]; + if (_empty_field_as_null && value.size == 0) { + cache.nested_str_col->insert_default(); + cache.null_map->push_back(1); + continue; + } + if (_options.null_len > 0 && + !(_options.converted_from_string && value.trim_double_quotes())) { + if (value.compare(Slice(_options.null_format, _options.null_len)) == 0) { + cache.nested_str_col->insert_default(); + cache.null_map->push_back(1); + continue; + } + } + if (UNLIKELY(_has_escape_char)) { + escape_string_for_csv(value.data, &value.size, _options.escape_char, + _options.quote_char); + } + cache.nested_str_col->insert_data(value.data, value.size); + cache.null_map->push_back(0); } else { + IColumn* col_ptr = columns[i].get(); + if (!_is_load) { + col_ptr = const_cast( + block->get_by_position(_file_slot_idx_map[i]).column.get()); + } RETURN_IF_ERROR(_deserialize_one_cell(_serdes[i], *col_ptr, value)); } } diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h index e452b8a7af29db..89d4b42626671c 100644 --- a/be/src/vec/exec/format/csv/csv_reader.h +++ b/be/src/vec/exec/format/csv/csv_reader.h @@ -34,6 +34,8 @@ #include "io/file_factory.h" #include "io/fs/file_reader_writer_fwd.h" #include "util/slice.h" +#include "vec/columns/column_nullable.h" +#include "vec/columns/column_string.h" #include "vec/data_types/data_type.h" #include "vec/exec/format/file_reader/new_plain_text_line_reader.h" #include "vec/exec/format/generic_reader.h" @@ -283,6 +285,14 @@ class CsvReader : public GenericReader { // save source text which have been splitted. std::vector _split_values; std::vector _use_nullable_string_opt; + + // Cached column pointers for nullable string fast path, avoiding per-row assert_cast. + struct NullableStringColumnCache { + ColumnStr* nested_str_col = nullptr; + NullMap* null_map = nullptr; + }; + std::vector _nullable_str_col_cache; + bool _has_escape_char = false; }; } // namespace vectorized #include "common/compile_check_end.h" From 19a8736cdf706af42ad4a73eea9622bbe1a2d1bf Mon Sep 17 00:00:00 2001 From: liaoxin Date: Wed, 11 Mar 2026 15:35:34 +0000 Subject: [PATCH 2/3] [opt](csv reader) address code review feedback - Add DCHECK bounds check before cache access for safety - Add comment explaining 64-byte string length estimate - Add comment for code structure in else block - Remove unused data_type_string_serde.h include --- be/src/format/csv/csv_reader.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/be/src/format/csv/csv_reader.cpp b/be/src/format/csv/csv_reader.cpp index 5428404b21ca65..0c0242814991e3 100644 --- a/be/src/format/csv/csv_reader.cpp +++ b/be/src/format/csv/csv_reader.cpp @@ -36,7 +36,6 @@ #include "core/block/block.h" #include "core/block/column_with_type_and_name.h" #include "core/data_type/data_type_factory.hpp" -#include "core/data_type/serde/data_type_string_serde.h" #include "exec/scan/scanner.h" #include "format/file_reader/new_plain_binary_line_reader.h" #include "format/file_reader/new_plain_text_line_reader.h" @@ -661,6 +660,7 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, _nullable_str_col_cache[i].nested_str_col = str_col; _nullable_str_col_cache[i].null_map = null_map; // Pre-reserve to avoid repeated realloc inside insert_data/push_back. + // Estimate 64 bytes per string as a reasonable average for typical CSV data. str_col->get_offsets().reserve(str_col->get_offsets().size() + batch_size); str_col->get_chars().reserve(str_col->get_chars().size() + batch_size * 64); null_map->reserve(null_map->size() + batch_size); @@ -691,6 +691,7 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, if (_use_nullable_string_opt[i]) { // Inline fast path: bypass StringSerDe and per-row assert_cast entirely. + DCHECK_LT(i, _nullable_str_col_cache.size()); auto& cache = _nullable_str_col_cache[i]; if (_empty_field_as_null && value.size == 0) { cache.nested_str_col->insert_default(); @@ -712,6 +713,7 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, cache.nested_str_col->insert_data(value.data, value.size); cache.null_map->push_back(0); } else { + // Non-optimized path: col_ptr only needed here since fast path uses cached pointers IColumn* col_ptr = columns[i].get(); if (!_is_load) { col_ptr = const_cast( From 75cf95f0a1df0820460e05e9cbbb5a2030f37890 Mon Sep 17 00:00:00 2001 From: liaoxin Date: Wed, 11 Mar 2026 16:12:15 +0000 Subject: [PATCH 3/3] [opt](csv reader) fix GitHub review issues - Remove dead col_ptr computation that was executed on every row - Fix C-style cast to use std::max template parameter --- be/src/format/csv/csv_reader.cpp | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/be/src/format/csv/csv_reader.cpp b/be/src/format/csv/csv_reader.cpp index 0c0242814991e3..9c763235e797bb 100644 --- a/be/src/format/csv/csv_reader.cpp +++ b/be/src/format/csv/csv_reader.cpp @@ -646,7 +646,7 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, if (UNLIKELY(_nullable_str_col_cache.empty())) { _nullable_str_col_cache.resize(_file_slot_descs.size()); _has_escape_char = (_options.escape_char != 0); - const size_t batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE); + const size_t batch_size = std::max(_state->batch_size(), _MIN_BATCH_SIZE); for (int i = 0; i < _file_slot_descs.size(); ++i) { if (_use_nullable_string_opt[i]) { IColumn* col_ptr = @@ -681,14 +681,6 @@ Status CsvReader::_fill_dest_columns(const Slice& line, Block* block, ? _split_values[col_idx] : Slice(_options.null_format, _options.null_len); - IColumn* col_ptr = columns[i].get(); - if (!_is_load) { - // block is a Block*, and get_by_position returns a ColumnPtr, - // which is a const pointer. Therefore, using const_cast is permissible. - col_ptr = const_cast( - block->get_by_position(_file_slot_idx_map[i]).column.get()); - } - if (_use_nullable_string_opt[i]) { // Inline fast path: bypass StringSerDe and per-row assert_cast entirely. DCHECK_LT(i, _nullable_str_col_cache.size());