Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/vec/functions/function_string.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,8 @@ void register_function_string(SimpleFunctionFactory& factory) {
factory.register_function<FunctionToBase64>();
factory.register_function<FunctionFromBase64>();
factory.register_function<FunctionSplitPart>();
factory.register_function<FunctionSplitByString>();
factory.register_function<FunctionSplitByString<SplitByStringTwoArgImpl>>();
factory.register_function<FunctionSplitByString<SplitByStringThreeArgImpl>>();
factory.register_function<FunctionCountSubString<FunctionCountSubStringType::TWO_ARGUMENTS>>();
factory.register_function<
FunctionCountSubString<FunctionCountSubStringType::THREE_ARGUMENTS>>();
Expand Down
210 changes: 152 additions & 58 deletions be/src/vec/functions/function_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -2032,39 +2032,17 @@ class FunctionSubstringIndex : public IFunction {
}
};

class FunctionSplitByString : public IFunction {
class SplitByStringExecutor {
public:
static constexpr auto name = "split_by_string";

static FunctionPtr create() { return std::make_shared<FunctionSplitByString>(); }
using NullMapType = PaddedPODArray<UInt8>;

String get_name() const override { return name; }

bool is_variadic() const override { return false; }

size_t get_number_of_arguments() const override { return 2; }

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
DCHECK(is_string_type(arguments[0]->get_primitive_type()))
<< "first argument for function: " << name << " should be string"
<< " and arguments[0] is " << arguments[0]->get_name();
DCHECK(is_string_type(arguments[1]->get_primitive_type()))
<< "second argument for function: " << name << " should be string"
<< " and arguments[1] is " << arguments[1]->get_name();
return std::make_shared<DataTypeArray>(make_nullable(arguments[0]));
}

Status execute_impl(FunctionContext* /*context*/, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
DCHECK_EQ(arguments.size(), 2);

static Status execute_core(Block& block, const ColumnNumbers& arguments, uint32_t result,
size_t input_rows_count, Int32 limit_value) {
const auto& [src_column, left_const] =
unpack_if_const(block.get_by_position(arguments[0]).column);
const auto& [right_column, right_const] =
unpack_if_const(block.get_by_position(arguments[1]).column);

DataTypePtr right_column_type = block.get_by_position(arguments[1]).type;
DataTypePtr src_column_type = block.get_by_position(arguments[0]).type;
auto dest_column_ptr = ColumnArray::create(make_nullable(src_column_type)->create_column(),
ColumnArray::ColumnOffsets::create());
Expand All @@ -2076,14 +2054,13 @@ class FunctionSplitByString : public IFunction {
auto* dest_nested_column = dest_nullable_col.get_nested_column_ptr().get();

const auto* col_str = assert_cast<const ColumnString*>(src_column.get());

const auto* col_delimiter = assert_cast<const ColumnString*>(right_column.get());

std::visit(
[&](auto src_const, auto delimiter_const) {
_execute<src_const, delimiter_const>(*col_str, *col_delimiter,
*dest_nested_column, dest_offsets,
input_rows_count);
input_rows_count, limit_value);
},
vectorized::make_bool_variant(left_const),
vectorized::make_bool_variant(right_const));
Expand All @@ -2098,9 +2075,9 @@ class FunctionSplitByString : public IFunction {

private:
template <bool src_const, bool delimiter_const>
void _execute(const ColumnString& src_column_string, const ColumnString& delimiter_column,
IColumn& dest_nested_column, ColumnArray::Offsets64& dest_offsets,
size_t size) const {
static void _execute(const ColumnString& src_column_string,
const ColumnString& delimiter_column, IColumn& dest_nested_column,
ColumnArray::Offsets64& dest_offsets, size_t size, Int32 limit_value) {
auto& dest_column_string = assert_cast<ColumnString&>(dest_nested_column);
ColumnString::Chars& column_string_chars = dest_column_string.get_chars();
ColumnString::Offsets& column_string_offsets = dest_column_string.get_offsets();
Expand Down Expand Up @@ -2129,12 +2106,29 @@ class FunctionSplitByString : public IFunction {
}
if (delimiter_ref.size == 0) {
split_empty_delimiter(str_ref, column_string_chars, column_string_offsets,
string_pos, dest_pos);
string_pos, dest_pos, limit_value);
} else {
if constexpr (!delimiter_const) {
search.set_pattern(&delimiter_ref);
}
Int32 split_count = 0;
for (size_t str_pos = 0; str_pos <= str_ref.size;) {
// If limit reached, dump remainder as final token
if (limit_value > 0 && split_count == limit_value - 1) {
const size_t remaining = str_ref.size - str_pos;
const size_t old_size = column_string_chars.size();
if (remaining > 0) {
const size_t new_size = old_size + remaining;
column_string_chars.resize(new_size);
memcpy_small_allow_read_write_overflow15(
column_string_chars.data() + old_size, str_ref.data + str_pos,
remaining);
string_pos += remaining;
}
column_string_offsets.push_back(string_pos);
dest_pos++;
break;
}
const size_t str_offset = str_pos;
const size_t old_size = column_string_chars.size();
// search first match delimter_ref index from src string among str_offset to end
Expand All @@ -2155,6 +2149,7 @@ class FunctionSplitByString : public IFunction {
column_string_offsets.push_back(string_pos);
// array offset + 1
dest_pos++;
split_count++;
// add src string str_pos to next search start
str_pos += split_part_size + delimiter_ref.size;
}
Expand All @@ -2163,44 +2158,143 @@ class FunctionSplitByString : public IFunction {
}
}

void split_empty_delimiter(const StringRef& str_ref, ColumnString::Chars& column_string_chars,
ColumnString::Offsets& column_string_offsets,
ColumnArray::Offset64& string_pos,
ColumnArray::Offset64& dest_pos) const {
static void split_empty_delimiter(const StringRef& str_ref,
ColumnString::Chars& column_string_chars,
ColumnString::Offsets& column_string_offsets,
ColumnArray::Offset64& string_pos,
ColumnArray::Offset64& dest_pos, Int32 limit_value) {
const size_t old_size = column_string_chars.size();
const size_t new_size = old_size + str_ref.size;
column_string_chars.resize(new_size);
memcpy(column_string_chars.data() + old_size, str_ref.data, str_ref.size);
if (simd::VStringFunctions::is_ascii(str_ref)) {
const auto size = str_ref.size;

const auto nested_old_size = column_string_offsets.size();
const auto nested_new_size = nested_old_size + size;
column_string_offsets.resize(nested_new_size);
std::iota(column_string_offsets.data() + nested_old_size,
column_string_offsets.data() + nested_new_size, string_pos + 1);

string_pos += size;
dest_pos += size;
// The above code is equivalent to the code in the following comment.
// for (size_t i = 0; i < str_ref.size; i++) {
// string_pos++;
// column_string_offsets.push_back(string_pos);
// (*dest_nested_null_map).push_back(false);
// dest_pos++;
// }

if (limit_value > 0) {
// With limit: split character by character up to limit-1, then remainder
Int32 split_count = 0;
size_t i = 0;
if (simd::VStringFunctions::is_ascii(str_ref)) {
for (; i < str_ref.size; i++) {
if (split_count == limit_value - 1) {
// remainder
string_pos += str_ref.size - i;
column_string_offsets.push_back(string_pos);
dest_pos++;
return;
}
string_pos++;
column_string_offsets.push_back(string_pos);
dest_pos++;
split_count++;
}
} else {
for (size_t utf8_char_len = 0; i < str_ref.size; i += utf8_char_len) {
utf8_char_len = UTF8_BYTE_LENGTH[(unsigned char)str_ref.data[i]];
if (split_count == limit_value - 1) {
// remainder
string_pos += str_ref.size - i;
column_string_offsets.push_back(string_pos);
dest_pos++;
return;
}
string_pos += utf8_char_len;
column_string_offsets.push_back(string_pos);
dest_pos++;
split_count++;
}
}
} else {
for (size_t i = 0, utf8_char_len = 0; i < str_ref.size; i += utf8_char_len) {
utf8_char_len = UTF8_BYTE_LENGTH[(unsigned char)str_ref.data[i]];
// No limit: original behavior
if (simd::VStringFunctions::is_ascii(str_ref)) {
const auto size = str_ref.size;

const auto nested_old_size = column_string_offsets.size();
const auto nested_new_size = nested_old_size + size;
column_string_offsets.resize(nested_new_size);
std::iota(column_string_offsets.data() + nested_old_size,
column_string_offsets.data() + nested_new_size, string_pos + 1);

string_pos += size;
dest_pos += size;
} else {
for (size_t i = 0, utf8_char_len = 0; i < str_ref.size; i += utf8_char_len) {
utf8_char_len = UTF8_BYTE_LENGTH[(unsigned char)str_ref.data[i]];

string_pos += utf8_char_len;
column_string_offsets.push_back(string_pos);
dest_pos++;
string_pos += utf8_char_len;
column_string_offsets.push_back(string_pos);
dest_pos++;
}
}
}
}
};

struct SplitByStringTwoArgImpl {
static DataTypes get_variadic_argument_types() {
return {std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>()};
}

static Status execute_impl(FunctionContext* /*context*/, Block& block,
const ColumnNumbers& arguments, uint32_t result,
size_t input_rows_count) {
DCHECK_EQ(arguments.size(), 2);
return SplitByStringExecutor::execute_core(block, arguments, result, input_rows_count, -1);
}
};

struct SplitByStringThreeArgImpl {
static DataTypes get_variadic_argument_types() {
return {std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>(),
std::make_shared<DataTypeInt32>()};
}

static Status execute_impl(FunctionContext* /*context*/, Block& block,
const ColumnNumbers& arguments, uint32_t result,
size_t input_rows_count) {
DCHECK_EQ(arguments.size(), 3);
const auto& [limit_column, limit_is_const] =
unpack_if_const(block.get_by_position(arguments[2]).column);
DCHECK(limit_is_const) << "limit argument of split_by_string must be a constant";
auto limit_value = assert_cast<const ColumnInt32&>(*limit_column).get_element(0);
return SplitByStringExecutor::execute_core(block, arguments, result, input_rows_count,
limit_value);
}
};

template <typename Impl>
class FunctionSplitByString : public IFunction {
public:
static constexpr auto name = "split_by_string";

static FunctionPtr create() { return std::make_shared<FunctionSplitByString>(); }

String get_name() const override { return name; }

bool is_variadic() const override { return true; }

size_t get_number_of_arguments() const override {
return get_variadic_argument_types_impl().size();
}

DataTypes get_variadic_argument_types_impl() const override {
return Impl::get_variadic_argument_types();
}

DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
DCHECK(is_string_type(arguments[0]->get_primitive_type()))
<< "first argument for function: " << name << " should be string"
<< " and arguments[0] is " << arguments[0]->get_name();
DCHECK(is_string_type(arguments[1]->get_primitive_type()))
<< "second argument for function: " << name << " should be string"
<< " and arguments[1] is " << arguments[1]->get_name();
return std::make_shared<DataTypeArray>(make_nullable(arguments[0]));
}

Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
uint32_t result, size_t input_rows_count) const override {
return Impl::execute_impl(context, block, arguments, result, input_rows_count);
}
};

enum class FunctionCountSubStringType { TWO_ARGUMENTS, THREE_ARGUMENTS };

template <FunctionCountSubStringType type>
Expand Down
Loading