Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -434,10 +434,6 @@ struct Column {
struct Schema {
std::vector<Column> columns;
std::vector<std::string> primary_keys;
/// When set (via FromArrow), the table's column types come from this Arrow
/// schema instead of `columns` — the only way to express nested MAP/ROW
/// columns. `columns` stays empty in that case.
std::shared_ptr<arrow::Schema> arrow_schema;

class Builder {
public:
Expand All @@ -459,15 +455,6 @@ struct Schema {
};

static Builder NewBuilder() { return Builder(); }

/// Build a Schema whose column types come from an Arrow schema. Use this
/// for tables with nested MAP/ROW columns (`arrow::map()`, `arrow::struct_()`);
/// `Admin::CreateTable` routes Arrow-backed schemas through the C Data
/// Interface automatically.
static Schema FromArrow(std::shared_ptr<arrow::Schema> arrow_schema,
std::vector<std::string> primary_keys = {}) {
return Schema{{}, std::move(primary_keys), std::move(arrow_schema)};
}
};

struct TableDescriptor {
Expand Down Expand Up @@ -1457,9 +1444,9 @@ class Admin {

bool Available() const;

/// Creates a table. For nested MAP/ROW columns, build `descriptor`'s schema
/// via `Schema::FromArrow(...)``CreateTable` routes Arrow-backed schemas
/// through the C Data Interface automatically.
/// Creates a table. Column types — including nested ARRAY/MAP/ROW built
/// with `DataType::Array`/`Map`/`Row`are carried to the server exactly
/// as declared (precision, scale, length, nullability, and field names).
Result CreateTable(const TablePath& table_path, const TableDescriptor& descriptor,
bool ignore_if_exists = false);

Expand Down
30 changes: 0 additions & 30 deletions bindings/cpp/src/admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,36 +60,6 @@ Result Admin::CreateTable(const TablePath& table_path, const TableDescriptor& de
}

auto ffi_path = utils::to_ffi_table_path(table_path);

// A MAP/ROW column can't go through the flat FFI encoding, so the schema is
// sent over Arrow instead (explicit via FromArrow, or lowered from native
// columns). Rust derives the columns from it, so the flat columns are dropped
// here; primary keys / metadata still come from the descriptor.
std::shared_ptr<arrow::Schema> arrow_schema = descriptor.schema.arrow_schema;
if (!arrow_schema) {
for (const auto& col : descriptor.schema.columns) {
if (detail::is_compound(col.data_type)) {
arrow_schema = detail::columns_to_arrow_schema(descriptor.schema.columns);
break;
}
}
}

if (arrow_schema) {
TableDescriptor arrow_desc = descriptor;
arrow_desc.schema.columns.clear();
auto ffi_desc = utils::to_ffi_table_descriptor(arrow_desc);
size_t schema_ptr = 0;
try {
schema_ptr = detail::export_arrow_schema(*arrow_schema);
} catch (const std::exception& e) {
return utils::make_client_error(e.what());
}
auto ffi_result =
admin_->create_table_arrow(ffi_path, ffi_desc, schema_ptr, ignore_if_exists);
return utils::from_ffi_result(ffi_result);
}

auto ffi_desc = utils::to_ffi_table_descriptor(descriptor);
auto ffi_result = admin_->create_table(ffi_path, ffi_desc, ignore_if_exists);
return utils::from_ffi_result(ffi_result);
Expand Down
259 changes: 132 additions & 127 deletions bindings/cpp/src/ffi_converter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,77 +28,159 @@
namespace fluss {
namespace utils {

/// Compact FFI representation of a (possibly nested) array type.
///
/// `nesting` counts the number of ARRAY wrappers stripped to reach the leaf
/// element type. `leaf_type`/`leaf_precision`/`leaf_scale` describe that leaf
/// scalar. A non-array input produces a zero-initialised value (nesting == 0).
/// `array_nullability` has `nesting + 1` entries: one per ARRAY wrapper
/// (outermost first) plus a trailing entry for the leaf scalar's nullability.
///
/// Using a flat representation — rather than serialising a recursive
/// `DataType` — keeps the cxx bridge contract small while preserving schema
/// fidelity across the FFI boundary when paired with rebuild_array_type().
/// Flattened view of an `ARRAY<ARRAY<...<scalar>>>` element type, used by the
/// data-writer path: `nesting` counts the ARRAY wrappers stripped to reach the
/// leaf scalar, which `leaf_*` describe. A non-array input yields `nesting == 0`
/// and callers use the type's own id/precision/scale.
struct FlattenedArrayType {
int32_t nesting{0};
int32_t leaf_type{0};
int32_t leaf_precision{0};
int32_t leaf_scale{0};
std::vector<uint8_t> array_nullability;
};

/// Flattens an `ARRAY<ARRAY<...<leaf>>>` DataType into a FlattenedArrayType.
///
/// Contract:
/// - If `data_type` is not an ARRAY, returns a zero-valued FlattenedArrayType
/// and callers must use the column's own `id/precision/scale` instead.
/// - If `data_type` is an ARRAY but has a null element_type() chain (which
/// should only happen on malformed input), returns a zero-valued result to
/// signal the caller to reject the schema.
/// - Otherwise, `nesting >= 1`, array_nullability has `nesting + 1` entries
/// (last = leaf scalar nullability), and leaf_* describe the innermost scalar.
inline FlattenedArrayType flatten_array_type(const DataType& data_type) {
FlattenedArrayType out;
if (data_type.id() != TypeId::Array) {
return out;
}

const DataType* current = &data_type;
while (current && current->id() == TypeId::Array) {
out.nesting += 1;
out.array_nullability.push_back(current->nullable() ? 1 : 0);
current = current->element_type();
}
if (!current) {
return FlattenedArrayType{};
}

out.leaf_type = static_cast<int32_t>(current->id());
out.leaf_precision = current->precision();
out.leaf_scale = current->scale();
out.array_nullability.push_back(current->nullable() ? 1 : 0);
return out;
}

/// Inverse of flatten_array_type: rebuilds an `ARRAY<ARRAY<...<leaf>>>` type
/// from the compact flat form. Requires `flat.nesting >= 1`; callers handle
/// the `nesting == 0` case by using a plain scalar DataType directly.
/// `array_nullability` must have `nesting + 1` entries (last = leaf).
inline DataType rebuild_array_type(const FlattenedArrayType& flat) {
bool leaf_nullable = (static_cast<size_t>(flat.nesting) < flat.array_nullability.size())
? (flat.array_nullability[static_cast<size_t>(flat.nesting)] != 0)
: true;
DataType dt(static_cast<TypeId>(flat.leaf_type), flat.leaf_precision, flat.leaf_scale,
leaf_nullable);
for (int32_t i = flat.nesting - 1; i >= 0; --i) {
bool nullable = (static_cast<size_t>(i) < flat.array_nullability.size())
? (flat.array_nullability[static_cast<size_t>(i)] != 0)
: true;
auto arr = DataType::Array(std::move(dt));
if (!nullable) {
arr = arr.NotNull();
/// Serialize a type tree into a preorder node arena (see `FfiTypeNode`): the
/// node itself, then ARRAY's element / MAP's key+value / ROW's field subtrees.
/// `field_name` is set only for ROW field nodes. Inverse of `nodes_to_data_type`.
inline void data_type_to_nodes(const DataType& dt, const std::string& field_name,
rust::Vec<ffi::FfiTypeNode>& out) {
ffi::FfiTypeNode node;
node.type_id = static_cast<int32_t>(dt.id());
node.nullable = dt.nullable();
node.precision = dt.precision();
node.scale = dt.scale();
node.field_name = rust::String(field_name);
switch (dt.id()) {
case TypeId::Array:
node.child_count = 1;
break;
case TypeId::Map:
node.child_count = 2;
break;
case TypeId::Row:
node.child_count = static_cast<uint32_t>(dt.field_count());
break;
default:
node.child_count = 0;
break;
}
out.push_back(std::move(node));

switch (dt.id()) {
case TypeId::Array:
data_type_to_nodes(*dt.element_type(), "", out);
break;
case TypeId::Map:
data_type_to_nodes(*dt.key_type(), "", out);
data_type_to_nodes(*dt.value_type(), "", out);
break;
case TypeId::Row:
for (size_t i = 0; i < dt.field_count(); ++i) {
data_type_to_nodes(*dt.field_type(i), dt.field_name(i), out);
}
break;
default:
break;
}
}

/// Builds a scalar DataType from a leaf node, restoring precision/scale/length.
inline DataType scalar_node_to_data_type(const ffi::FfiTypeNode& node) {
switch (static_cast<TypeId>(node.type_id)) {
case TypeId::Boolean:
return DataType::Boolean();
case TypeId::TinyInt:
return DataType::TinyInt();
case TypeId::SmallInt:
return DataType::SmallInt();
case TypeId::Int:
return DataType::Int();
case TypeId::BigInt:
return DataType::BigInt();
case TypeId::Float:
return DataType::Float();
case TypeId::Double:
return DataType::Double();
case TypeId::String:
return DataType::String();
case TypeId::Bytes:
return DataType::Bytes();
case TypeId::Date:
return DataType::Date();
case TypeId::Time:
return DataType::Time();
case TypeId::Timestamp:
return DataType::Timestamp(node.precision);
case TypeId::TimestampLtz:
return DataType::TimestampLtz(node.precision);
case TypeId::Decimal:
return DataType::Decimal(node.precision, node.scale);
case TypeId::Char:
return DataType::Char(node.precision);
case TypeId::Binary:
return DataType::Binary(node.precision);
default:
throw std::runtime_error("Unknown scalar type id in type tree: " +
std::to_string(node.type_id));
}
}

/// Inverse of `data_type_to_nodes`: reconstruct one type from the arena,
/// advancing `cursor` past the nodes it consumes.
inline DataType nodes_to_data_type(const rust::Vec<ffi::FfiTypeNode>& nodes, size_t& cursor) {
if (cursor >= nodes.size()) {
throw std::runtime_error("type tree ended before all nodes were read");
}
const ffi::FfiTypeNode& node = nodes[cursor++];
DataType dt = [&]() -> DataType {
switch (static_cast<TypeId>(node.type_id)) {
case TypeId::Array: {
DataType element = nodes_to_data_type(nodes, cursor);
return DataType::Array(std::move(element));
}
case TypeId::Map: {
DataType key = nodes_to_data_type(nodes, cursor);
DataType value = nodes_to_data_type(nodes, cursor);
return DataType::Map(std::move(key), std::move(value));
}
case TypeId::Row: {
std::vector<std::pair<std::string, DataType>> fields;
fields.reserve(node.child_count);
for (uint32_t i = 0; i < node.child_count; ++i) {
if (cursor >= nodes.size()) {
throw std::runtime_error("ROW field missing from type tree");
}
std::string fname(nodes[cursor].field_name);
DataType ftype = nodes_to_data_type(nodes, cursor);
fields.emplace_back(std::move(fname), std::move(ftype));
}
return DataType::Row(std::move(fields));
}
default:
return scalar_node_to_data_type(node);
}
dt = std::move(arr);
}();
if (!node.nullable) {
dt = dt.NotNull();
}
return dt;
}
Expand Down Expand Up @@ -168,25 +250,8 @@ inline ffi::FfiConfig to_ffi_config(const Configuration& config) {
inline ffi::FfiColumn to_ffi_column(const Column& col) {
ffi::FfiColumn ffi_col;
ffi_col.name = rust::String(col.name);
ffi_col.data_type = static_cast<int32_t>(col.data_type.id());
ffi_col.nullable = col.data_type.nullable();
ffi_col.comment = rust::String(col.comment);
ffi_col.precision = col.data_type.precision();
ffi_col.scale = col.data_type.scale();
auto flat = flatten_array_type(col.data_type);
ffi_col.array_nesting = flat.nesting;
for (auto nullable : flat.array_nullability) {
ffi_col.array_nullability.push_back(nullable);
}
if (flat.nesting > 0 && flat.leaf_type != 0) {
ffi_col.element_data_type = flat.leaf_type;
ffi_col.element_precision = flat.leaf_precision;
ffi_col.element_scale = flat.leaf_scale;
} else {
ffi_col.element_data_type = 0;
ffi_col.element_precision = 0;
ffi_col.element_scale = 0;
}
data_type_to_nodes(col.data_type, "", ffi_col.type_nodes);
return ffi_col;
}

Expand Down Expand Up @@ -251,72 +316,12 @@ inline ffi::FfiTableDescriptor to_ffi_table_descriptor(const TableDescriptor& de
}

inline Column from_ffi_column(const ffi::FfiColumn& ffi_col) {
auto type_id = static_cast<TypeId>(ffi_col.data_type);
if (type_id == TypeId::Array) {
auto is_supported_leaf_type = [](int32_t leaf_type) {
switch (static_cast<TypeId>(leaf_type)) {
case TypeId::Boolean:
case TypeId::TinyInt:
case TypeId::SmallInt:
case TypeId::Int:
case TypeId::BigInt:
case TypeId::Float:
case TypeId::Double:
case TypeId::String:
case TypeId::Bytes:
case TypeId::Date:
case TypeId::Time:
case TypeId::Timestamp:
case TypeId::TimestampLtz:
case TypeId::Decimal:
case TypeId::Char:
case TypeId::Binary:
return true;
default:
return false;
}
};
// ROW/MAP element schema can't pass through the flat FFI column; give the
// array a non-null element of the right kind so element_type() is safe to deref.
auto element_id = static_cast<TypeId>(ffi_col.element_data_type);
if (element_id == TypeId::Map || element_id == TypeId::Row) {
return Column{std::string(ffi_col.name), DataType::Array(DataType(element_id)),
std::string(ffi_col.comment)};
}
if (ffi_col.element_data_type == 0) {
throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) +
"': missing element_data_type");
}
if (ffi_col.array_nesting < 0) {
throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) +
"': array_nesting must be non-negative");
}
if (ffi_col.element_data_type == static_cast<int32_t>(TypeId::Array)) {
throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) +
"': leaf element_data_type cannot be ARRAY");
}
if (!is_supported_leaf_type(ffi_col.element_data_type)) {
throw std::runtime_error("Malformed ARRAY column '" + std::string(ffi_col.name) +
"': unsupported leaf element_data_type " +
std::to_string(ffi_col.element_data_type));
}

int32_t nesting = ffi_col.array_nesting > 0 ? ffi_col.array_nesting : 1;
std::vector<uint8_t> array_nullability;
for (auto nullable : ffi_col.array_nullability) {
array_nullability.push_back(nullable);
}
auto dt = rebuild_array_type(
FlattenedArrayType{
nesting,
ffi_col.element_data_type,
ffi_col.element_precision,
ffi_col.element_scale,
std::move(array_nullability),
});
return Column{std::string(ffi_col.name), std::move(dt), std::string(ffi_col.comment)};
size_t cursor = 0;
DataType dt = nodes_to_data_type(ffi_col.type_nodes, cursor);
if (cursor != ffi_col.type_nodes.size()) {
throw std::runtime_error("Column '" + std::string(ffi_col.name) +
"': type tree has trailing nodes");
}
DataType dt(type_id, ffi_col.precision, ffi_col.scale, ffi_col.nullable);
return Column{std::string(ffi_col.name), std::move(dt), std::string(ffi_col.comment)};
}

Expand Down
Loading
Loading