Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bindings/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ target_link_libraries(fluss_cpp_kv_example PRIVATE Arrow::arrow_shared)
target_compile_definitions(fluss_cpp_kv_example PRIVATE ARROW_FOUND)
target_include_directories(fluss_cpp_kv_example PUBLIC ${CPP_INCLUDE_DIR})

add_executable(fluss_cpp_kv_changelog_example examples/kv_changelog_example.cpp)
target_link_libraries(fluss_cpp_kv_changelog_example PRIVATE fluss_cpp)
target_link_libraries(fluss_cpp_kv_changelog_example PRIVATE Arrow::arrow_shared)
target_compile_definitions(fluss_cpp_kv_changelog_example PRIVATE ARROW_FOUND)
target_include_directories(fluss_cpp_kv_changelog_example PUBLIC ${CPP_INCLUDE_DIR})

if (CARGO_TARGET_DIR)
set_target_properties(fluss_cpp
PROPERTIES ADDITIONAL_CLEAN_FILES "${CARGO_TARGET_DIR}"
Expand Down
130 changes: 130 additions & 0 deletions bindings/cpp/examples/kv_changelog_example.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Primary-key table changelog (CDC) example: subscribe to a KV table's
// changelog and print each row-level change as a +I / -U / +U / -D event.

#include <iostream>
#include <string>

#include "fluss.hpp"

static void check(const char* step, const fluss::Result& r) {
if (!r.Ok()) {
std::cerr << step << " failed: code=" << r.error_code << " msg=" << r.error_message
<< std::endl;
std::exit(1);
}
}

static const char* change_symbol(fluss::ChangeType ct) {
switch (ct) {
case fluss::ChangeType::AppendOnly:
return "+A";
case fluss::ChangeType::Insert:
return "+I";
case fluss::ChangeType::UpdateBefore:
return "-U";
case fluss::ChangeType::UpdateAfter:
return "+U";
case fluss::ChangeType::Delete:
return "-D";
}
return "?";
}

int main() {
fluss::Configuration config;
config.bootstrap_servers = "127.0.0.1:9123";

fluss::Connection conn;
check("create", fluss::Connection::Create(config, conn));

fluss::Admin admin;
check("get_admin", conn.GetAdmin(admin));

fluss::TablePath table_path("fluss", "kv_changelog_cpp");
admin.DropTable(table_path, true);

// A single bucket keeps the changelog on one bucket and in order, which
// makes the CDC output easy to follow.
auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int())
.AddColumn("name", fluss::DataType::String())
.SetPrimaryKeys({"id"})
.Build();

auto descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetBucketCount(1)
.SetComment("cpp kv changelog example")
.Build();

check("create_table", admin.CreateTable(table_path, descriptor, false));
std::cout << "Created PK table: " << table_path.ToString() << std::endl;

fluss::Table table;
check("get_table", conn.GetTable(table_path, table));

fluss::UpsertWriter writer;
check("new_upsert_writer", table.NewUpsert().CreateWriter(writer));

// Insert three keys (+I), update one (-U / +U) and delete one (-D).
for (const auto& kv : {std::pair<int32_t, const char*>{1, "alice"},
{2, "bob"}, {3, "carol"}}) {
fluss::GenericRow row(2);
row.SetInt32(0, kv.first);
row.SetString(1, kv.second);
check("upsert", writer.Upsert(row));
}
{
fluss::GenericRow row(2);
row.SetInt32(0, 2);
row.SetString(1, "bob-v2");
check("update", writer.Upsert(row));
}
{
fluss::GenericRow del(2);
del.SetInt32(0, 3);
check("delete", writer.Delete(del));
}
check("flush", writer.Flush());

// Subscribe from the start of the changelog and print each CDC event until
// we reach the end of the log.
auto table_scan = table.NewScan();
fluss::LogScanner log_scanner;
check("create_log_scanner", table_scan.CreateLogScanner(log_scanner));
check("subscribe", log_scanner.Subscribe(0, fluss::EARLIEST_OFFSET));

std::cout << "Changelog (change_type id name):" << std::endl;
while (true) {
fluss::ScanRecords records;
check("poll", log_scanner.Poll(3000, records));
if (records.IsEmpty()) {
break;
}
for (auto rec : records) {
std::cout << " " << change_symbol(rec.change_type) << " " << rec.row.GetInt32(0) << " "
<< rec.row.GetString(1) << std::endl;
}
}

check("drop_table", admin.DropTable(table_path, true));
std::cout << "\nKV changelog example completed successfully!" << std::endl;
return 0;
}
14 changes: 14 additions & 0 deletions bindings/cpp/include/fluss.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1630,7 +1630,21 @@ class TableScan {

TableScan& Limit(int32_t row_number);

/// Creates a record-mode log scanner, polled for individual `ScanRecord`s.
///
/// Works on log tables and on primary-key (KV) tables. For a primary-key
/// table this subscribes to its CDC changelog: each `ScanRecord` carries a
/// `ChangeType` -- `+I` (insert), `-U` (update-before), `+U` (update-after)
/// or `-D` (delete). A log table yields `+A` (append-only). Requires the
/// ARROW log format.
Result CreateLogScanner(LogScanner& out);

/// Creates a batch-mode log scanner that yields Arrow record batches.
///
/// Log tables only. Primary-key tables are rejected because the Arrow batch
/// path carries no per-record change types; read a primary-key table's
/// changelog with `CreateLogScanner()` instead. Requires the ARROW log
/// format.
Result CreateRecordBatchLogScanner(LogScanner& out);

Result CreateBucketBatchScanner(const TableBucket& bucket, BatchScanner& out);
Expand Down
182 changes: 182 additions & 0 deletions bindings/cpp/test/test_kv_changelog.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// Integration tests for primary-key (KV) table changelog (CDC) scanning.
// Mirrors crates/fluss/tests/integration/kv_changelog.rs.

#include <gtest/gtest.h>

#include <algorithm>
#include <string>
#include <utility>
#include <vector>

#include "test_utils.h"

class KvChangelogTest : public ::testing::Test {
protected:
fluss::Admin& admin() { return fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); }

fluss::Connection& connection() {
return fluss_test::FlussTestEnvironment::Instance()->GetConnection();
}
};

// A record-mode scanner over a PK table yields its CDC changelog. With the
// default FULL changelog image: inserting a new key emits +I, overwriting an
// existing key emits -U (old image) then +U (new image), and a delete emits -D
// (old image). A single bucket keeps the offsets contiguous.
TEST_F(KvChangelogTest, SubscribeKvTableChangelog) {
auto& adm = admin();
auto& conn = connection();

fluss::TablePath table_path("fluss", "test_kv_changelog_cpp");

auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int())
.AddColumn("name", fluss::DataType::String())
.SetPrimaryKeys({"id"})
.Build();

auto table_descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetBucketCount(1)
.SetProperty("table.replication.factor", "1")
.Build();

fluss_test::CreateTable(adm, table_path, table_descriptor);

fluss::Table table;
ASSERT_OK(conn.GetTable(table_path, table));

auto table_upsert = table.NewUpsert();
fluss::UpsertWriter upsert_writer;
ASSERT_OK(table_upsert.CreateWriter(upsert_writer));

// Await each write so the changelog offsets are produced in a fixed order.
{ // +I (1, alice)
fluss::GenericRow row(2);
row.SetInt32(0, 1);
row.SetString(1, "alice");
fluss::WriteResult wr;
ASSERT_OK(upsert_writer.Upsert(row, wr));
ASSERT_OK(wr.Wait());
}
{ // +I (2, bob)
fluss::GenericRow row(2);
row.SetInt32(0, 2);
row.SetString(1, "bob");
fluss::WriteResult wr;
ASSERT_OK(upsert_writer.Upsert(row, wr));
ASSERT_OK(wr.Wait());
}
{ // overwrite id=1 -> -U (1, alice) then +U (1, alice2)
fluss::GenericRow row(2);
row.SetInt32(0, 1);
row.SetString(1, "alice2");
fluss::WriteResult wr;
ASSERT_OK(upsert_writer.Upsert(row, wr));
ASSERT_OK(wr.Wait());
}
{ // -D (2, bob)
fluss::GenericRow del(2);
del.SetInt32(0, 2);
fluss::WriteResult wr;
ASSERT_OK(upsert_writer.Delete(del, wr));
ASSERT_OK(wr.Wait());
}

auto table_scan = table.NewScan();
fluss::LogScanner log_scanner;
ASSERT_OK(table_scan.CreateLogScanner(log_scanner));
ASSERT_OK(log_scanner.Subscribe(0, fluss::EARLIEST_OFFSET));

struct Decoded {
int64_t offset;
fluss::ChangeType change_type;
int32_t id;
std::string name;
};

std::vector<Decoded> records;
fluss_test::PollRecords(
log_scanner, 5,
[](const fluss::ScanRecord& rec) {
return Decoded{rec.offset, rec.change_type, rec.row.GetInt32(0),
std::string(rec.row.GetString(1))};
},
records);

ASSERT_EQ(records.size(), 5u);
std::sort(records.begin(), records.end(),
[](const Decoded& a, const Decoded& b) { return a.offset < b.offset; });

const std::vector<fluss::ChangeType> expected_types = {
fluss::ChangeType::Insert, fluss::ChangeType::Insert, fluss::ChangeType::UpdateBefore,
fluss::ChangeType::UpdateAfter, fluss::ChangeType::Delete};
const std::vector<std::pair<int32_t, std::string>> expected_rows = {
{1, "alice"}, // +I
{2, "bob"}, // +I
{1, "alice"}, // -U (old image)
{1, "alice2"}, // +U (new image)
{2, "bob"}, // -D (old image)
};

for (size_t i = 0; i < records.size(); ++i) {
EXPECT_EQ(records[i].offset, static_cast<int64_t>(i));
EXPECT_EQ(static_cast<int>(records[i].change_type), static_cast<int>(expected_types[i]))
<< "change_type mismatch at " << i;
EXPECT_EQ(records[i].id, expected_rows[i].first) << "id mismatch at " << i;
EXPECT_EQ(records[i].name, expected_rows[i].second) << "name mismatch at " << i;
}

ASSERT_OK(adm.DropTable(table_path, false));
}

// The Arrow batch scanner carries no per-record change types, so it rejects
// primary-key tables (mirrors the core / Java restriction).
TEST_F(KvChangelogTest, RecordBatchScannerRejectsPrimaryKey) {
auto& adm = admin();
auto& conn = connection();

fluss::TablePath table_path("fluss", "test_kv_changelog_batch_reject_cpp");

auto schema = fluss::Schema::NewBuilder()
.AddColumn("id", fluss::DataType::Int())
.AddColumn("name", fluss::DataType::String())
.SetPrimaryKeys({"id"})
.Build();

auto table_descriptor = fluss::TableDescriptor::NewBuilder()
.SetSchema(schema)
.SetProperty("table.replication.factor", "1")
.Build();

fluss_test::CreateTable(adm, table_path, table_descriptor);

fluss::Table table;
ASSERT_OK(conn.GetTable(table_path, table));

auto table_scan = table.NewScan();
fluss::LogScanner batch_scanner;
auto result = table_scan.CreateRecordBatchLogScanner(batch_scanner);
EXPECT_FALSE(result.Ok()) << "batch scanner should reject a primary-key table";

ASSERT_OK(adm.DropTable(table_path, false));
}
Loading
Loading