From b061abdbf771a18bf32bd898dfd6edd252b39211 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Wed, 24 Jun 2026 17:55:19 +0100 Subject: [PATCH 1/4] [rust] Support subscribing to primary-key table changelog --- crates/examples/Cargo.toml | 4 + crates/examples/src/example_kv_changelog.rs | 96 +++++ .../src/client/table/log_fetch_buffer.rs | 145 ++++++-- crates/fluss/src/client/table/scanner.rs | 84 +++-- crates/fluss/src/record/arrow.rs | 329 +++++++++++++++--- .../fluss/tests/integration/kv_changelog.rs | 170 +++++++++ crates/fluss/tests/test_fluss.rs | 1 + website/docs/user-guide/rust/api-reference.md | 21 +- .../rust/example/primary-key-tables.md | 42 +++ 9 files changed, 798 insertions(+), 94 deletions(-) create mode 100644 crates/examples/src/example_kv_changelog.rs create mode 100644 crates/fluss/tests/integration/kv_changelog.rs diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 7203095b..b335e1e4 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -41,6 +41,10 @@ path = "src/example_table.rs" name = "example-upsert-lookup" path = "src/example_kv_table.rs" +[[example]] +name = "example-kv-changelog" +path = "src/example_kv_changelog.rs" + [[example]] name = "example-partitioned-upsert-lookup" path = "src/example_partitioned_kv_table.rs" diff --git a/crates/examples/src/example_kv_changelog.rs b/crates/examples/src/example_kv_changelog.rs new file mode 100644 index 00000000..1554e66a --- /dev/null +++ b/crates/examples/src/example_kv_changelog.rs @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use clap::Parser; +use fluss::client::{EARLIEST_OFFSET, FlussConnection}; +use fluss::config::Config; +use fluss::error::Result; +use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; +use fluss::row::{DataGetters, GenericRow}; +use std::time::Duration; + +#[tokio::main] +#[allow(dead_code)] +pub async fn main() -> Result<()> { + let mut config = Config::parse(); + config.bootstrap_servers = "127.0.0.1:9123".to_string(); + + let conn = FlussConnection::new(config).await?; + let admin = conn.get_admin()?; + + // A single-bucket primary key table keeps the changelog on one bucket and in + // order, which makes the CDC output easy to follow. + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id"]) + .build()?, + ) + .distributed_by(Some(1), vec!["id".to_string()]) + .build()?; + + let table_path = TablePath::new("fluss", "rust_kv_changelog_example"); + admin + .create_table(&table_path, &table_descriptor, true) + .await?; + + let table = conn.get_table(&table_path).await?; + let upsert_writer = table.new_upsert()?.create_writer()?; + + // Insert three keys (+I), update one (-U / +U) and delete one (-D). + for (id, name) in [(1, "alice"), (2, "bob"), (3, "carol")] { + let mut row = GenericRow::new(2); + row.set_field(0, id); + row.set_field(1, name); + upsert_writer.upsert(&row)?; + } + let mut updated = GenericRow::new(2); + updated.set_field(0, 2); + updated.set_field(1, "bob-v2"); + upsert_writer.upsert(&updated)?; + + let mut deleted = GenericRow::new(2); + deleted.set_field(0, 3); + upsert_writer.delete(&deleted)?; + upsert_writer.flush().await?; + + // Subscribe to the changelog from the start and print each CDC event until + // we reach the end of the log. + let log_scanner = table.new_scan().create_log_scanner()?; + log_scanner.subscribe(0, EARLIEST_OFFSET).await?; + + println!("Changelog (change_type id name):"); + loop { + let records = log_scanner.poll(Duration::from_secs(3)).await?; + if records.is_empty() { + break; + } + for record in records { + let row = record.row(); + println!( + " {} {} {}", + record.change_type().short_string(), + row.get_int(0)?, + row.get_string(1)?, + ); + } + } + + Ok(()) +} diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index 9d45abad..ea188111 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -24,7 +24,7 @@ use crate::client::table::remote_log::{ use crate::error::{ApiError, Error, Result}; use crate::metadata::TableBucket; use crate::record::{ - LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, ScanRecord, + ChangeType, LogRecordBatch, LogRecordIterator, LogRecordsBatches, ReadContext, ScanRecord, }; use std::{ collections::{HashMap, VecDeque}, @@ -463,6 +463,38 @@ impl DefaultCompletedFetch { } } + /// Pulls the next decoded record into `out`, returning whether one was + /// pushed. A fetch error is cached and surfaced on a later call rather than + /// propagated mid-batch (mirrors one iteration of Java + /// `CompletedFetch.fetchRecord`). + fn fetch_one_record(&mut self, out: &mut Vec) -> bool { + if self.cached_record_error.is_none() { + self.corrupt_last_record = true; + match self.next_fetched_record() { + Ok(Some(record)) => { + self.corrupt_last_record = false; + self.last_record = Some(record); + } + Ok(None) => { + self.corrupt_last_record = false; + self.last_record = None; + } + Err(e) => { + self.cached_record_error = Some(e.to_string()); + } + } + } + + let Some(record) = self.last_record.take() else { + return false; + }; + + self.next_fetch_offset = record.offset() + 1; + self.records_read += 1; + out.push(record); + true + } + fn fetch_error(&self) -> Error { let mut message = format!( "Received exception when fetching the next record from {table_bucket}. If needed, please back to past the record to continue scanning.", @@ -556,30 +588,20 @@ impl CompletedFetch for DefaultCompletedFetch { let mut scan_records = Vec::new(); for _ in 0..max_records { - if self.cached_record_error.is_none() { - self.corrupt_last_record = true; - match self.next_fetched_record() { - Ok(Some(record)) => { - self.corrupt_last_record = false; - self.last_record = Some(record); - } - Ok(None) => { - self.corrupt_last_record = false; - self.last_record = None; - } - Err(e) => { - self.cached_record_error = Some(e.to_string()); - } - } - } - - let Some(record) = self.last_record.take() else { + if !self.fetch_one_record(&mut scan_records) { break; - }; + } + } - self.next_fetch_offset = record.offset() + 1; - self.records_read += 1; - scan_records.push(record); + // Keep a -U paired with its +U in the same poll batch: a KV changelog + // writes the pair as consecutive records, and splitting it across polls + // would expose an orphaned -U to retract-based CDC consumers. Mirrors + // Java's `CompletedFetch.fetchRecords`. + if scan_records + .last() + .is_some_and(|record| *record.change_type() == ChangeType::UpdateBefore) + { + self.fetch_one_record(&mut scan_records); } if self.cached_record_error.is_some() && scan_records.is_empty() { @@ -835,7 +857,10 @@ mod tests { DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, }; use crate::metadata::{DataField, DataTypes, PhysicalTablePath, RowType, TablePath}; - use crate::record::{MemoryLogRecordsArrowBuilder, ReadContext, to_arrow_schema}; + use crate::record::{ + ATTRIBUTES_OFFSET, LENGTH_LENGTH, LENGTH_OFFSET, LOG_OVERHEAD, + MemoryLogRecordsArrowBuilder, RECORDS_OFFSET, ReadContext, to_arrow_schema, + }; use crate::row::GenericRow; use crate::test_utils::build_table_info; use std::sync::Arc; @@ -944,4 +969,76 @@ mod tests { Ok(()) } + + /// A `-U`/`+U` pair must not be split across polls: even when `max_records` + /// falls between them, `fetch_records` pulls the matching `+U` so the batch + /// ends on a complete pair (mirrors Java `CompletedFetch.fetchRecords`). + #[test] + fn fetch_records_keeps_update_before_after_pair_together() -> Result<()> { + let row_type = RowType::new(vec![DataField::new("id", DataTypes::int(), None)]); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path))); + + let mut builder = MemoryLogRecordsArrowBuilder::new( + 1, + &row_type, + false, + ArrowCompressionInfo { + compression_type: ArrowCompressionType::None, + compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }, + usize::MAX, + Arc::new(ArrowCompressionRatioEstimator::default()), + )?; + for id in [10_i32, 20, 20] { + let mut row = GenericRow::new(1); + row.set_field(0, id); + let record = WriteRecord::for_append( + Arc::clone(&table_info), + physical_table_path.clone(), + 1, + &row, + ); + builder.append(&record)?; + } + let append_only = builder.build()?; + + // Synthesize a changelog batch carrying +I, -U, +U. + let change_types = [ + ChangeType::Insert, + ChangeType::UpdateBefore, + ChangeType::UpdateAfter, + ]; + let mut data = Vec::with_capacity(append_only.len() + change_types.len()); + data.extend_from_slice(&append_only[..RECORDS_OFFSET]); + data.extend(change_types.iter().map(ChangeType::to_byte_value)); + data.extend_from_slice(&append_only[RECORDS_OFFSET..]); + data[ATTRIBUTES_OFFSET] = 0; + let new_len = ((data.len() - LOG_OVERHEAD) as i32).to_le_bytes(); + data[LENGTH_OFFSET..LENGTH_OFFSET + LENGTH_LENGTH].copy_from_slice(&new_len); + + let read_context = ReadContext::new(to_arrow_schema(&row_type)?, Arc::new(row_type), false); + let mut fetch = DefaultCompletedFetch::new( + TableBucket::new(1, 0), + LogRecordsBatches::new(data.clone()), + data.len(), + read_context, + 0, + 0, + ); + + // max_records=2 cuts between -U (offset 1) and +U (offset 2); the pairing + // guard pulls the +U so the batch ends on a complete pair. + let records = fetch.fetch_records(2)?; + assert_eq!(records.len(), 3, "pair guard should append the matching +U"); + assert_eq!(*records[0].change_type(), ChangeType::Insert); + assert_eq!(*records[1].change_type(), ChangeType::UpdateBefore); + assert_eq!(*records[2].change_type(), ChangeType::UpdateAfter); + + let rest = fetch.fetch_records(10)?; + assert!(rest.is_empty(), "all records consumed"); + + Ok(()) + } } diff --git a/crates/fluss/src/client/table/scanner.rs b/crates/fluss/src/client/table/scanner.rs index b36b0e42..287fcd06 100644 --- a/crates/fluss/src/client/table/scanner.rs +++ b/crates/fluss/src/client/table/scanner.rs @@ -316,9 +316,16 @@ impl<'a> TableScan<'a> { Ok(self) } + /// Creates a record-mode log scanner, polled for individual [`ScanRecord`]s. + /// + /// Works on log tables and on primary-key (KV) tables. For a primary-key + /// table this subscribes to its CDC changelog: each [`ScanRecord`] carries a + /// [`ChangeType`](crate::record::ChangeType) — `+I` (insert), `-U` + /// (update-before), `+U` (update-after) or `-D` (delete). A log table yields + /// `+A` (append-only) for every record. Requires the ARROW log format. pub fn create_log_scanner(self) -> Result { self.reject_limit("LogScanner")?; - validate_scan_support(&self.table_info.table_path, &self.table_info)?; + validate_scan_support_inner(&self.table_info.table_path, &self.table_info, true)?; let inner = LogScannerInner::new( &self.table_info, self.metadata.clone(), @@ -331,6 +338,12 @@ impl<'a> TableScan<'a> { }) } + /// Creates a batch-mode log scanner that yields Arrow `RecordBatch`es. + /// + /// Log tables only. Primary-key tables are rejected because the Arrow batch + /// path carries no per-record change types; read a primary-key table's + /// changelog with [`create_log_scanner`](Self::create_log_scanner) instead. + /// Requires the ARROW log format. pub fn create_record_batch_log_scanner(self) -> Result { self.reject_limit("RecordBatchLogScanner")?; validate_scan_support(&self.table_info.table_path, &self.table_info)?; @@ -2225,10 +2238,29 @@ impl BucketScanStatus { } } +/// Validates that `table_info` can be scanned as a log, rejecting primary-key +/// tables (the default for batch-mode scans). fn validate_scan_support(table_path: &TablePath, table_info: &TableInfo) -> Result<()> { - if table_info.schema.primary_key().is_some() { + validate_scan_support_inner(table_path, table_info, false) +} + +/// Validates that `table_info` can be scanned as a log. ARROW log format is +/// required; INDEXED is not supported by the client decoder. +/// +/// When `allow_primary_key` is set, a primary-key table is accepted and its +/// changelog is read as a CDC stream (each record carries a `ChangeType`). It is +/// cleared for the Arrow batch path, which has no slot for per-record change +/// types, so reading a changelog there would silently drop them. +fn validate_scan_support_inner( + table_path: &TablePath, + table_info: &TableInfo, + allow_primary_key: bool, +) -> Result<()> { + if !allow_primary_key && table_info.schema.primary_key().is_some() { return Err(UnsupportedOperation { - message: format!("Table {table_path} is not a Log Table and doesn't support scan."), + message: format!( + "Batch-mode log scan does not support primary-key table {table_path}; use create_log_scanner() to read its changelog record by record" + ), }); } @@ -2236,7 +2268,7 @@ fn validate_scan_support(table_path: &TablePath, table_info: &TableInfo) -> Resu if LogFormat::ARROW != log_format { return Err(UnsupportedOperation { message: format!( - "Scan is only supported for ARROW format and table {table_path} uses {log_format} format" + "Log scan is only supported for ARROW log format, but table {table_path} uses {log_format} format" ), }); } @@ -2529,35 +2561,33 @@ mod tests { #[test] fn test_validate_scan_support() { - // Primary key table + // Record mode (allow_primary_key = true): a primary-key table's changelog + // is scannable on ARROW or the default format. let (table_info, table_path) = create_test_table_info(true, Some("ARROW")); - let result = validate_scan_support(&table_path, &table_info); + assert!(validate_scan_support_inner(&table_path, &table_info, true).is_ok()); + let (table_info, table_path) = create_test_table_info(true, None); + assert!(validate_scan_support_inner(&table_path, &table_info, true).is_ok()); - assert!(result.is_err()); - let err = result.unwrap_err(); - assert!(matches!(err, UnsupportedOperation { .. })); - assert!(err.to_string().contains( - format!("Table {table_path} is not a Log Table and doesn't support scan.").as_str() - )); - - // Indexed format - let (table_info, table_path) = create_test_table_info(false, Some("INDEXED")); - let result = validate_scan_support(&table_path, &table_info); - - assert!(result.is_err()); - let err = result.unwrap_err(); + // Batch mode (allow_primary_key = false): a primary-key table is rejected. + let (table_info, table_path) = create_test_table_info(true, Some("ARROW")); + let err = validate_scan_support(&table_path, &table_info).unwrap_err(); assert!(matches!(err, UnsupportedOperation { .. })); - assert!(err.to_string().contains(format!("Scan is only supported for ARROW format and table {table_path} uses INDEXED format").as_str())); + assert!(err.to_string().contains("primary-key table")); + + // INDEXED is unsupported regardless of table type or mode. + for allow_primary_key in [false, true] { + let (table_info, table_path) = create_test_table_info(false, Some("INDEXED")); + let err = validate_scan_support_inner(&table_path, &table_info, allow_primary_key) + .unwrap_err(); + assert!(matches!(err, UnsupportedOperation { .. })); + assert!(err.to_string().contains("ARROW log format")); + } - // Default format + // Log tables scan on ARROW or the default format. let (table_info, table_path) = create_test_table_info(false, None); - let result = validate_scan_support(&table_path, &table_info); - assert!(result.is_ok()); - - // Arrow format + assert!(validate_scan_support(&table_path, &table_info).is_ok()); let (table_info, table_path) = create_test_table_info(false, Some("ARROW")); - let result = validate_scan_support(&table_path, &table_info); - assert!(result.is_ok()); + assert!(validate_scan_support(&table_path, &table_info).is_ok()); } #[tokio::test] diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index 33a0f6f4..ff610689 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -84,6 +84,12 @@ pub const RECORD_BATCH_HEADER_SIZE: usize = RECORDS_OFFSET; pub const ARROW_CHANGETYPE_OFFSET: usize = RECORD_BATCH_HEADER_SIZE; pub const LOG_OVERHEAD: usize = LENGTH_OFFSET + LENGTH_LENGTH; +/// Bit 0 of the attributes byte. When set, the batch is append-only and carries +/// no change-type vector; when clear, a `record_count`-byte change-type vector +/// precedes the Arrow IPC payload (the changelog of a primary-key table). Shares +/// the wire layout of the Java client's `DefaultLogRecordBatch`. +pub const APPEND_ONLY_FLAG_MASK: u8 = 0x01; + /// Maximum batch size matches Java's Integer.MAX_VALUE limit. /// Java uses int type for batch size, so max value is 2^31 - 1 = 2,147,483,647 bytes (~2GB). /// This is the implicit limit in FileLogRecords.java and other Java components. @@ -952,6 +958,11 @@ impl LogRecordBatch { self.data[ATTRIBUTES_OFFSET] } + /// Whether this batch is append-only (see [`APPEND_ONLY_FLAG_MASK`]). + fn is_append_only(&self) -> bool { + self.attributes() & APPEND_ONLY_FLAG_MASK != 0 + } + pub fn next_log_offset(&self) -> i64 { self.last_log_offset() + 1 } @@ -990,28 +1001,70 @@ impl LogRecordBatch { LittleEndian::read_i32(&self.data[offset..offset + RECORDS_COUNT_LENGTH]) } + /// Splits the batch body into its per-record change types and the trailing + /// Arrow IPC payload (see [`APPEND_ONLY_FLAG_MASK`] for the layout). + fn decode_change_types(&self) -> Result<(BatchChangeTypes, &[u8])> { + let body = self + .data + .get(RECORDS_OFFSET..) + .ok_or_else(|| Error::UnexpectedError { + message: format!( + "Corrupt log record batch: data length {} is less than RECORDS_OFFSET {}", + self.data.len(), + RECORDS_OFFSET + ), + source: None, + })?; + + if self.is_append_only() { + return Ok((BatchChangeTypes::Uniform(ChangeType::AppendOnly), body)); + } + + let record_count = self.record_count() as usize; + let (change_type_bytes, arrow_data) = + body.split_at_checked(record_count) + .ok_or_else(|| Error::UnexpectedError { + message: format!( + "Corrupt changelog batch: body length {} is smaller than its \ + {record_count}-record change-type vector", + body.len() + ), + source: None, + })?; + + let mut change_types = Vec::with_capacity(record_count); + for &byte in change_type_bytes { + let change_type = + ChangeType::from_byte_value(byte).map_err(|message| Error::UnexpectedError { + message, + source: None, + })?; + change_types.push(change_type); + } + + Ok((BatchChangeTypes::PerRecord(change_types), arrow_data)) + } + pub fn records(&self, read_context: &ReadContext) -> Result { if self.record_count() == 0 { return Ok(LogRecordIterator::empty()); } - let data = &self.data[RECORDS_OFFSET..]; - - let record_batch = read_context.record_batch(data)?; + let (change_types, arrow_data) = self.decode_change_types()?; + let record_batch = read_context.record_batch(arrow_data)?; let arrow_reader = ArrowReader::new_with_fluss_row_type( Arc::new(record_batch), read_context.row_type.clone(), read_context.fluss_row_type().cloned(), )?; - let log_record_iterator = LogRecordIterator::Arrow(ArrowLogRecordIterator { - reader: arrow_reader, - base_offset: self.base_log_offset(), - timestamp: self.commit_timestamp(), - row_id: 0, - change_type: ChangeType::AppendOnly, - }); + let iterator = ArrowLogRecordIterator::new( + arrow_reader, + self.base_log_offset(), + self.commit_timestamp(), + change_types, + )?; - Ok(log_record_iterator) + Ok(LogRecordIterator::Arrow(iterator)) } pub fn records_for_remote_log(&self, read_context: &ReadContext) -> Result { @@ -1019,9 +1072,8 @@ impl LogRecordBatch { return Ok(LogRecordIterator::empty()); } - let data = &self.data[RECORDS_OFFSET..]; - - let record_batch = read_context.record_batch_for_remote_log(data)?; + let (change_types, arrow_data) = self.decode_change_types()?; + let record_batch = read_context.record_batch_for_remote_log(arrow_data)?; let log_record_iterator = match record_batch { None => LogRecordIterator::empty(), Some(record_batch) => { @@ -1030,13 +1082,13 @@ impl LogRecordBatch { read_context.row_type.clone(), read_context.fluss_row_type().cloned(), )?; - LogRecordIterator::Arrow(ArrowLogRecordIterator { - reader: arrow_reader, - base_offset: self.base_log_offset(), - timestamp: self.commit_timestamp(), - row_id: 0, - change_type: ChangeType::AppendOnly, - }) + let iterator = ArrowLogRecordIterator::new( + arrow_reader, + self.base_log_offset(), + self.commit_timestamp(), + change_types, + )?; + LogRecordIterator::Arrow(iterator) } }; Ok(log_record_iterator) @@ -1051,18 +1103,9 @@ impl LogRecordBatch { return Ok(RecordBatch::new_empty(read_context.target_schema.clone())); } - let data = self - .data - .get(RECORDS_OFFSET..) - .ok_or_else(|| Error::UnexpectedError { - message: format!( - "Corrupt log record batch: data length {} is less than RECORDS_OFFSET {}", - self.data.len(), - RECORDS_OFFSET - ), - source: None, - })?; - read_context.record_batch(data) + // Batch access drops the change-type vector; use `records()` for CDC. + let (_, arrow_data) = self.decode_change_types()?; + read_context.record_batch(arrow_data) } } @@ -1645,24 +1688,63 @@ impl Iterator for LogRecordIterator { } } +/// Per-record change types decoded from a log batch. +/// +/// Append-only batches carry no change-type vector on the wire, so a single +/// `AppendOnly` value covers every record without allocating. Changelog batches +/// (the CDC stream of a primary-key table) decode one change type per record, +/// in record order. +enum BatchChangeTypes { + /// Every record shares this change type (append-only batches). + Uniform(ChangeType), + /// One change type per record, indexed by row id (changelog batches). + PerRecord(Vec), +} + +impl BatchChangeTypes { + fn get(&self, row_id: usize) -> ChangeType { + match self { + BatchChangeTypes::Uniform(change_type) => *change_type, + BatchChangeTypes::PerRecord(change_types) => change_types[row_id], + } + } +} + pub struct ArrowLogRecordIterator { reader: ArrowReader, base_offset: i64, timestamp: i64, row_id: usize, - change_type: ChangeType, + change_types: BatchChangeTypes, } -#[allow(dead_code)] impl ArrowLogRecordIterator { - fn new(reader: ArrowReader, base_offset: i64, timestamp: i64, change_type: ChangeType) -> Self { - Self { + fn new( + reader: ArrowReader, + base_offset: i64, + timestamp: i64, + change_types: BatchChangeTypes, + ) -> Result { + if let BatchChangeTypes::PerRecord(ref change_types) = change_types { + if change_types.len() != reader.row_count() { + return Err(Error::UnexpectedError { + message: format!( + "Changelog batch decode mismatch: {} change types for {} Arrow rows", + change_types.len(), + reader.row_count() + ), + source: None, + }); + } + } + + Ok(Self { reader, base_offset, timestamp, row_id: 0, - change_type, - } + change_types, + }) } } @@ -1679,7 +1761,7 @@ impl Iterator for ArrowLogRecordIterator { columnar_row, self.base_offset + self.row_id as i64, self.timestamp, - self.change_type, + self.change_types.get(self.row_id), ); self.row_id += 1; Some(scan_record) @@ -1720,7 +1802,12 @@ pub struct MyVec(pub StreamReader); #[cfg(test)] mod tests { use super::*; - use crate::metadata::{DataField, DataTypes, RowType}; + use crate::client::WriteRecord; + use crate::compression::{ + ArrowCompressionInfo, ArrowCompressionType, DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }; + use crate::metadata::{DataField, DataTypes, PhysicalTablePath, RowType, TablePath}; + use crate::row::{DataGetters, GenericRow}; use crate::test_utils::build_table_info; #[test] @@ -2349,4 +2436,164 @@ mod tests { Ok(()) } + + /// Builds an append-only `(id INT, name STRING)` Arrow log batch from `rows`. + /// The writer always emits append-only batches, so changelog tests derive + /// their bytes from this with [`splice_change_type_vector`]. + fn build_append_only_batch(rows: &[(i32, &str)]) -> (RowType, Vec) { + let row_type = RowType::new(vec![ + DataField::new("id".to_string(), DataTypes::int(), None), + DataField::new("name".to_string(), DataTypes::string(), None), + ]); + let table_path = TablePath::new("db".to_string(), "tbl".to_string()); + let table_info = Arc::new(build_table_info(table_path.clone(), 1, 1)); + let physical_table_path = Arc::new(PhysicalTablePath::of(Arc::new(table_path))); + + let mut builder = MemoryLogRecordsArrowBuilder::new( + 1, + &row_type, + false, + ArrowCompressionInfo { + compression_type: ArrowCompressionType::None, + compression_level: DEFAULT_NON_ZSTD_COMPRESSION_LEVEL, + }, + usize::MAX, + Arc::new(ArrowCompressionRatioEstimator::default()), + ) + .unwrap(); + + for (id, name) in rows { + let mut row = GenericRow::new(2); + row.set_field(0, *id); + row.set_field(1, *name); + let record = WriteRecord::for_append( + Arc::clone(&table_info), + physical_table_path.clone(), + 1, + &row, + ); + builder.append(&record).unwrap(); + } + + (row_type, builder.build().unwrap()) + } + + /// Turns an append-only batch into a wire-valid changelog batch: clears the + /// append-only flag, splices one change-type byte per record between the + /// header and the Arrow payload, then fixes up the length field and CRC. + fn splice_change_type_vector(append_only: &[u8], change_types: &[ChangeType]) -> Vec { + let mut data = append_only.to_vec(); + data[ATTRIBUTES_OFFSET] = 0; + let change_bytes = change_types.iter().map(|ct| ct.to_byte_value()); + data.splice(RECORDS_OFFSET..RECORDS_OFFSET, change_bytes); + + let new_length = (data.len() - LOG_OVERHEAD) as i32; + data[LENGTH_OFFSET..LENGTH_OFFSET + LENGTH_LENGTH] + .copy_from_slice(&new_length.to_le_bytes()); + + let crc = crc32c(&data[SCHEMA_ID_OFFSET..]); + data[CRC_OFFSET..CRC_OFFSET + CRC_LENGTH].copy_from_slice(&crc.to_le_bytes()); + data + } + + #[test] + fn decode_changelog_batch_applies_per_record_change_types() -> Result<()> { + let (row_type, append_only) = + build_append_only_batch(&[(1, "alice"), (2, "bob"), (3, "carol")]); + let read_context = ReadContext::new(to_arrow_schema(&row_type)?, Arc::new(row_type), false); + + // Append-only batch: every record decodes as AppendOnly (regression guard). + let batch = LogRecordsBatches::new(append_only.clone()) + .next() + .expect("append-only batch")?; + assert!(batch.is_append_only()); + let records: Vec<_> = batch.records(&read_context)?.collect(); + assert_eq!(records.len(), 3); + assert!( + records + .iter() + .all(|r| *r.change_type() == ChangeType::AppendOnly) + ); + + // Changelog variant: the spliced change-type vector drives per-record types. + let change_types = [ + ChangeType::Insert, + ChangeType::UpdateAfter, + ChangeType::Delete, + ]; + let changelog = splice_change_type_vector(&append_only, &change_types); + let batch = LogRecordsBatches::new(changelog) + .next() + .expect("changelog batch")?; + assert!(!batch.is_append_only()); + assert_eq!(batch.record_count(), 3); + + let records: Vec<_> = batch.records(&read_context)?.collect(); + let got: Vec = records.iter().map(|r| *r.change_type()).collect(); + assert_eq!(got, change_types.to_vec()); + + // The row payload and offsets survive the splice unchanged. + let mut ids = Vec::new(); + for record in &records { + ids.push(record.row().get_int(0)?); + } + assert_eq!(ids, vec![1, 2, 3]); + let offsets: Vec = records.iter().map(|r| r.offset()).collect(); + assert_eq!(offsets, vec![0, 1, 2]); + + // Batch-level access skips the change-type vector and still decodes rows. + let batch = LogRecordsBatches::new(splice_change_type_vector(&append_only, &change_types)) + .next() + .expect("changelog batch")?; + assert_eq!(batch.record_batch(&read_context)?.num_rows(), 3); + + Ok(()) + } + + #[test] + fn decode_changelog_batch_rejects_invalid_change_type_byte() { + let (row_type, append_only) = build_append_only_batch(&[(1, "a"), (2, "b")]); + let read_context = ReadContext::new( + to_arrow_schema(&row_type).unwrap(), + Arc::new(row_type), + false, + ); + + let mut changelog = + splice_change_type_vector(&append_only, &[ChangeType::Insert, ChangeType::Insert]); + // Corrupt the second change-type byte to an out-of-range value. + changelog[RECORDS_OFFSET + 1] = 99; + + let batch = LogRecordBatch::new(Bytes::from(changelog)); + let err = batch + .records(&read_context) + .err() + .expect("expected decode to reject an invalid change-type byte"); + assert!(matches!(err, Error::UnexpectedError { .. })); + assert!(err.to_string().contains("change type")); + } + + #[test] + fn decode_changelog_batch_rejects_truncated_change_type_vector() { + let (row_type, append_only) = build_append_only_batch(&[(1, "a"), (2, "b")]); + let read_context = ReadContext::new( + to_arrow_schema(&row_type).unwrap(), + Arc::new(row_type), + false, + ); + + // Clear the append-only flag, then cut the body shorter than the + // record_count change-type bytes the decoder now expects. + let mut data = append_only; + data[ATTRIBUTES_OFFSET] = 0; + data.truncate(RECORDS_OFFSET + 1); + + let batch = LogRecordBatch::new(Bytes::from(data)); + assert_eq!(batch.record_count(), 2); + let err = batch + .records(&read_context) + .err() + .expect("expected decode to reject a truncated change-type vector"); + assert!(matches!(err, Error::UnexpectedError { .. })); + } } diff --git a/crates/fluss/tests/integration/kv_changelog.rs b/crates/fluss/tests/integration/kv_changelog.rs new file mode 100644 index 00000000..30a85fd3 --- /dev/null +++ b/crates/fluss/tests/integration/kv_changelog.rs @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#[cfg(test)] +mod kv_changelog_test { + use crate::integration::utils::{create_table, get_shared_cluster}; + use fluss::client::EARLIEST_OFFSET; + use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; + use fluss::record::{ChangeType, ScanRecord}; + use fluss::row::{DataGetters, GenericRow}; + use std::time::Duration; + + /// Subscribing to a primary-key (KV) table yields its CDC changelog. + /// + /// With the default `FULL` changelog image: upserting a new key emits `+I`, + /// overwriting an existing key emits `-U` (old image) then `+U` (new image), + /// and a delete emits `-D` (old image). A single bucket keeps the changelog + /// offsets contiguous so the sequence is deterministic. This exercises the + /// per-record change-type vector decode on a real cluster end to end. + #[tokio::test] + async fn subscribe_kv_table_changelog() { + let cluster = get_shared_cluster(); + let connection = cluster.get_fluss_connection().await; + let admin = connection.get_admin().expect("Failed to get admin"); + + let table_path = TablePath::new("fluss", "test_kv_changelog_subscribe"); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .primary_key(vec!["id"]) + .build() + .expect("Failed to build schema"), + ) + .distributed_by(Some(1), vec!["id".to_string()]) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + let upsert_writer = table + .new_upsert() + .expect("Failed to create upsert") + .create_writer() + .expect("Failed to create writer"); + + let make_row = |id: i32, name: &'static str| { + let mut row = GenericRow::new(2); + row.set_field(0, id); + row.set_field(1, name); + row + }; + + // Await each write so the changelog offsets are produced in a fixed order. + // +I (1, "alice") + upsert_writer + .upsert(&make_row(1, "alice")) + .expect("upsert (1, alice)") + .await + .expect("ack (1, alice)"); + // +I (2, "bob") + upsert_writer + .upsert(&make_row(2, "bob")) + .expect("upsert (2, bob)") + .await + .expect("ack (2, bob)"); + // Overwrite id=1 -> -U (1, "alice") then +U (1, "alice2") + upsert_writer + .upsert(&make_row(1, "alice2")) + .expect("update (1, alice2)") + .await + .expect("ack (1, alice2)"); + // -D (2, "bob") + let mut delete_key = GenericRow::new(2); + delete_key.set_field(0, 2); + upsert_writer + .delete(&delete_key) + .expect("delete (2)") + .await + .expect("ack delete (2)"); + + // Subscribe to the single bucket from the earliest changelog offset. + let log_scanner = table + .new_scan() + .create_log_scanner() + .expect("Failed to create log scanner"); + log_scanner + .subscribe(0, EARLIEST_OFFSET) + .await + .expect("Failed to subscribe to bucket 0"); + + // Poll until the whole changelog is collected (or time out). + let mut records: Vec = Vec::new(); + let start = std::time::Instant::now(); + while records.len() < 5 && start.elapsed() < Duration::from_secs(15) { + let polled = log_scanner + .poll(Duration::from_millis(500)) + .await + .expect("Failed to poll changelog"); + records.extend(polled); + } + + assert_eq!(records.len(), 5, "expected 5 changelog records"); + + let change_types: Vec = records.iter().map(|r| *r.change_type()).collect(); + assert_eq!( + change_types, + vec![ + ChangeType::Insert, + ChangeType::Insert, + ChangeType::UpdateBefore, + ChangeType::UpdateAfter, + ChangeType::Delete, + ], + "unexpected change-type sequence" + ); + + // Single bucket -> contiguous, in-order offsets. + let offsets: Vec = records.iter().map(|r| r.offset()).collect(); + assert_eq!(offsets, vec![0, 1, 2, 3, 4]); + + // Each record carries the correct before/after row image. + let rows: Vec<(i32, String)> = records + .iter() + .map(|r| { + let row = r.row(); + ( + row.get_int(0).unwrap(), + row.get_string(1).unwrap().to_string(), + ) + }) + .collect(); + assert_eq!( + rows, + vec![ + (1, "alice".to_string()), // +I + (2, "bob".to_string()), // +I + (1, "alice".to_string()), // -U (old image) + (1, "alice2".to_string()), // +U (new image) + (2, "bob".to_string()), // -D (old image) + ] + ); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } +} diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs index 2d2bd152..eb1e4070 100644 --- a/crates/fluss/tests/test_fluss.rs +++ b/crates/fluss/tests/test_fluss.rs @@ -23,6 +23,7 @@ mod integration { mod admin; mod batch_scanner; mod fluss_cluster; + mod kv_changelog; mod kv_table; mod log_table; mod record_batch_log_reader; diff --git a/website/docs/user-guide/rust/api-reference.md b/website/docs/user-guide/rust/api-reference.md index 49c54823..09618e5a 100644 --- a/website/docs/user-guide/rust/api-reference.md +++ b/website/docs/user-guide/rust/api-reference.md @@ -145,8 +145,8 @@ series are shared by `AppendWriter` (log tables) and `UpsertWriter` (PK tables). | `fn project(self, indices: &[usize]) -> Result` | Project columns by index | | `fn project_by_name(self, names: &[&str]) -> Result` | Project columns by name | | `fn limit(self, n: i32) -> Result` | Set a row limit (enables `create_bucket_batch_scanner`; rejected by log scanners) | -| `fn create_log_scanner(self) -> Result` | Create a record-based log scanner | -| `fn create_record_batch_log_scanner(self) -> Result` | Create an Arrow batch-based log scanner | +| `fn create_log_scanner(self) -> Result` | Create a record-based log scanner; on a primary-key table, subscribes to its CDC changelog (per-record `ChangeType`) | +| `fn create_record_batch_log_scanner(self) -> Result` | Create an Arrow batch-based log scanner (log tables only — no per-record change types) | | `fn create_bucket_batch_scanner(self, bucket: TableBucket) -> Result` | Bounded scan of one bucket (requires `limit`; runs on first `next_batch`) | ## `LogScanner` @@ -245,6 +245,23 @@ server-deduplicated state); yields a single batch of at most `n` rows. | `fn timestamp(&self) -> i64` | Record timestamp | | `fn change_type(&self) -> &ChangeType` | Change type (AppendOnly, Insert, etc.) | +## `ChangeType` + +The kind of change a `ScanRecord` represents. Log (append-only) tables always +produce `AppendOnly`; scanning a primary-key table's changelog produces the CDC +variants below. + +| Variant | Symbol | Meaning | +|----------------|--------|---------------------------------------------| +| `AppendOnly` | `+A` | Append-only record (log tables) | +| `Insert` | `+I` | A new primary key was inserted | +| `UpdateBefore` | `-U` | Row image *before* an update | +| `UpdateAfter` | `+U` | Row image *after* an update | +| `Delete` | `-D` | A primary key was deleted (carries old row) | + +With the default `FULL` changelog mode (`table.changelog.image`), updating an +existing key emits `-U` then `+U`; the `WAL` mode emits only `+U`. + ## `ScanRecords` | Method | Description | diff --git a/website/docs/user-guide/rust/example/primary-key-tables.md b/website/docs/user-guide/rust/example/primary-key-tables.md index 01836e29..b3d03823 100644 --- a/website/docs/user-guide/rust/example/primary-key-tables.md +++ b/website/docs/user-guide/rust/example/primary-key-tables.md @@ -121,6 +121,48 @@ let batch = result.to_record_batch()?; println!("Rows: {}", batch.num_rows()); ``` +## Subscribing to the Changelog (CDC) + +Every primary key table maintains a changelog of its row-level changes. Read it +with a record-mode `LogScanner` — the same API used for [log tables](./log-tables.md) — +to stream CDC events. Each `ScanRecord` carries a `ChangeType`: + +- `+I` — a new key was inserted +- `-U` / `+U` — the old and new images of an updated key +- `-D` — a key was deleted (the record carries the old row) + +```rust +use fluss::client::EARLIEST_OFFSET; +use std::time::Duration; + +let table = conn.get_table(&table_path).await?; +let log_scanner = table.new_scan().create_log_scanner()?; + +// Subscribe to every bucket from the start of the changelog. +let num_buckets = table.get_table_info().get_num_buckets(); +for bucket in 0..num_buckets { + log_scanner.subscribe(bucket, EARLIEST_OFFSET).await?; +} + +loop { + let records = log_scanner.poll(Duration::from_secs(1)).await?; + for record in records { + let row = record.row(); + println!( + "{} id={} name={}", + record.change_type().short_string(), // +I / -U / +U / -D + row.get_int(0)?, + row.get_string(1)?, + ); + } +} +``` + +With the default changelog mode (`'table.changelog.image' = 'FULL'`), updating a +key emits a `-U`/`+U` pair and deleting it emits `-D`; the `WAL` mode emits only +`+U` on update. To resume from a previously committed position rather than the +start, `subscribe` at a specific offset instead of `EARLIEST_OFFSET`. + ## Prefix Lookup To fetch all rows sharing a common primary-key prefix (by choosing a bucket key that's a strict prefix of the primary key), see [Prefix Lookup](./prefix-lookup.md). From 93189e45c7c32b61a88d67f0530eb192d4a104d3 Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Wed, 24 Jun 2026 20:46:08 +0100 Subject: [PATCH 2/4] [python] Support subscribing to primary-key table changelog --- bindings/python/example/kv_changelog.py | 103 ++++++++++++++++ bindings/python/fluss/__init__.pyi | 9 ++ bindings/python/test/test_kv_changelog.py | 112 ++++++++++++++++++ .../docs/user-guide/python/api-reference.md | 4 +- .../python/example/primary-key-tables.md | 22 ++++ 5 files changed, 248 insertions(+), 2 deletions(-) create mode 100644 bindings/python/example/kv_changelog.py create mode 100644 bindings/python/test/test_kv_changelog.py diff --git a/bindings/python/example/kv_changelog.py b/bindings/python/example/kv_changelog.py new file mode 100644 index 00000000..bc129c6d --- /dev/null +++ b/bindings/python/example/kv_changelog.py @@ -0,0 +1,103 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Primary-key table changelog (CDC) example. + +Subscribe to a primary-key table's changelog and print each row-level change as +a ``+I`` / ``-U`` / ``+U`` / ``-D`` event. + +Run standalone against a local cluster: + + python example/kv_changelog.py + +Or point it at a specific cluster: + + FLUSS_BOOTSTRAP_SERVERS=host:port python example/kv_changelog.py +""" + +import asyncio +import os +from typing import Optional + +import pyarrow as pa + +import fluss + +DEFAULT_BOOTSTRAP_SERVERS = "127.0.0.1:9123" + + +async def main(bootstrap_servers: Optional[str] = None): + bootstrap_servers = bootstrap_servers or os.environ.get( + "FLUSS_BOOTSTRAP_SERVERS", DEFAULT_BOOTSTRAP_SERVERS + ) + + config = fluss.Config({"bootstrap.servers": bootstrap_servers}) + conn = await fluss.FlussConnection.create(config) + try: + await _run(conn) + finally: + await conn.close() + print("\nConnection closed") + + +async def _run(conn): + schema = fluss.Schema( + pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]), + primary_keys=["id"], + ) + # A single bucket keeps the changelog on one bucket and in order, which makes + # the CDC output easy to follow. + table_descriptor = fluss.TableDescriptor(schema, bucket_count=1) + + admin = conn.get_admin() + table_path = fluss.TablePath("fluss", "example_kv_changelog") + await admin.drop_table(table_path, ignore_if_not_exists=True) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=True) + print(f"Created PK table: {table_path}") + + table = await conn.get_table(table_path) + writer = table.new_upsert().create_writer() + + # Insert three keys (+I), update one (-U / +U) and delete one (-D). + for id_, name in [(1, "alice"), (2, "bob"), (3, "carol")]: + writer.upsert({"id": id_, "name": name}) + writer.upsert({"id": 2, "name": "bob-v2"}) + writer.delete({"id": 3}) + await writer.flush() + + # Subscribe from the start of the changelog and print each CDC event until we + # reach the end of the log. + scanner = await table.new_scan().create_log_scanner() + scanner.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) + + print("Changelog (change_type id name):") + while True: + records = await scanner.poll(3000) + if records.is_empty(): + break + for record in records: + print( + f" {record.change_type.short_string()} " + f"{record.row['id']} {record.row['name']}" + ) + + await admin.drop_table(table_path, ignore_if_not_exists=True) + print(f"\nDropped table: {table_path}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index 2ba50a42..3f7e94b4 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -537,6 +537,11 @@ class TableScan: Use this scanner with `poll()` to get individual records with metadata (offset, timestamp, change_type). + Works on log tables and on primary-key (KV) tables. For a primary-key + table it subscribes to the CDC changelog: each record's ``change_type`` + is ``+I`` (insert), ``-U`` (update-before), ``+U`` (update-after) or + ``-D`` (delete). A log table yields ``+A`` (append-only). + Returns: LogScanner for record-by-record scanning with `poll()` """ @@ -547,6 +552,10 @@ class TableScan: Use this scanner with `poll_arrow()` to get Arrow Tables, or with `poll_record_batch()` to get individual batches with metadata. + Log tables only. Primary-key tables are rejected because the Arrow batch + path carries no per-record change types; read a primary-key table's + changelog with `create_log_scanner()` instead. + Returns: LogScanner for batch-based scanning with `poll_arrow()` or `poll_record_batch()` """ diff --git a/bindings/python/test/test_kv_changelog.py b/bindings/python/test/test_kv_changelog.py new file mode 100644 index 00000000..3df3ca44 --- /dev/null +++ b/bindings/python/test/test_kv_changelog.py @@ -0,0 +1,112 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Integration tests for primary-key (KV) table changelog (CDC) scanning. + +Mirrors crates/fluss/tests/integration/kv_changelog.rs. +""" + +import time + +import pyarrow as pa +import pytest + +import fluss + + +async def test_subscribe_kv_table_changelog(connection, admin, wait_for_table_ready): + """A record-mode scanner over a PK table yields its CDC changelog. + + With the default FULL changelog image: inserting a new key emits +I, + overwriting an existing key emits -U (old image) then +U (new image), and a + delete emits -D (old image). A single bucket keeps the offsets contiguous. + """ + table_path = fluss.TablePath("fluss", "py_test_kv_changelog") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]), + primary_keys=["id"], + ) + table_descriptor = fluss.TableDescriptor(schema, bucket_count=1) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + await wait_for_table_ready(table_path) + + table = await connection.get_table(table_path) + writer = table.new_upsert().create_writer() + + # Await each write so the changelog offsets are produced in a fixed order. + await writer.upsert({"id": 1, "name": "alice"}).wait() # +I + await writer.upsert({"id": 2, "name": "bob"}).wait() # +I + await writer.upsert({"id": 1, "name": "alice2"}).wait() # -U / +U + await writer.delete({"id": 2}).wait() # -D + + scanner = await table.new_scan().create_log_scanner() + scanner.subscribe(bucket_id=0, start_offset=fluss.EARLIEST_OFFSET) + + records = await _poll_records(scanner, expected_count=5) + assert len(records) == 5 + + records.sort(key=lambda r: r.offset) + assert [r.offset for r in records] == [0, 1, 2, 3, 4] + assert [r.change_type for r in records] == [ + fluss.ChangeType.Insert, + fluss.ChangeType.Insert, + fluss.ChangeType.UpdateBefore, + fluss.ChangeType.UpdateAfter, + fluss.ChangeType.Delete, + ] + assert [(r.row["id"], r.row["name"]) for r in records] == [ + (1, "alice"), # +I + (2, "bob"), # +I + (1, "alice"), # -U (old image) + (1, "alice2"), # +U (new image) + (2, "bob"), # -D (old image) + ] + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_record_batch_scanner_rejects_primary_key(connection, admin): + """The Arrow batch scanner carries no per-record change types, so it rejects + primary-key tables (mirrors the core / Java restriction).""" + table_path = fluss.TablePath("fluss", "py_test_kv_changelog_batch_reject") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema([pa.field("id", pa.int32()), pa.field("name", pa.string())]), + primary_keys=["id"], + ) + await admin.create_table( + table_path, fluss.TableDescriptor(schema), ignore_if_exists=False + ) + + table = await connection.get_table(table_path) + with pytest.raises(fluss.FlussError): + await table.new_scan().create_record_batch_log_scanner() + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def _poll_records(scanner, expected_count, timeout_s=10): + """Poll a record-based scanner until expected_count records are collected.""" + collected = [] + deadline = time.monotonic() + timeout_s + while len(collected) < expected_count and time.monotonic() < deadline: + records = await scanner.poll(5000) + collected.extend(records) + return collected diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index eeac5acc..54d8af66 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -97,8 +97,8 @@ Supports `async with` statement (async context manager). | `.project(indices) -> TableScan` | Project columns by index | | `.project_by_name(names) -> TableScan` | Project columns by name | | `.limit(n) -> TableScan` | Set a positive row limit (enables `create_bucket_batch_scanner`; rejected by log scanners) | -| `await .create_log_scanner() -> LogScanner` | Create record-based scanner (for `poll()`) | -| `await .create_record_batch_log_scanner() -> LogScanner` | Create batch-based scanner (for `poll_arrow()`, `to_arrow()`, etc.) | +| `await .create_log_scanner() -> LogScanner` | Create record-based scanner (for `poll()`); on a primary-key table, subscribes to its CDC changelog (per-record `change_type`) | +| `await .create_record_batch_log_scanner() -> LogScanner` | Create batch-based scanner (for `poll_arrow()`, `to_arrow()`, etc.); log tables only — no per-record change types | | `.create_bucket_batch_scanner(bucket) -> BatchScanner` | Bounded scan of one bucket (requires `limit`; runs on first `next_batch()`) | ## `TableAppend` diff --git a/website/docs/user-guide/python/example/primary-key-tables.md b/website/docs/user-guide/python/example/primary-key-tables.md index 3cbb8d57..ca8a82a0 100644 --- a/website/docs/user-guide/python/example/primary-key-tables.md +++ b/website/docs/user-guide/python/example/primary-key-tables.md @@ -60,6 +60,28 @@ partial_writer.upsert({"id": 1, "age": 27}) # only updates age await partial_writer.flush() ``` +## Subscribing to the Changelog (CDC) + +Every primary key table maintains a changelog of its row-level changes. Read it with a record-mode scanner — the same API used for [log tables](./log-tables.md) — to stream CDC events. Each `ScanRecord` carries a `change_type`: `+I` (insert), `-U` / `+U` (the old and new images of an update), and `-D` (delete, carrying the old row). + +```python +table = await conn.get_table(table_path) +scanner = await table.new_scan().create_log_scanner() + +# Subscribe to every bucket from the start of the changelog. +num_buckets = (await admin.get_table_info(table_path)).num_buckets +scanner.subscribe_buckets({i: fluss.EARLIEST_OFFSET for i in range(num_buckets)}) + +while True: + records = await scanner.poll(3000) + if records.is_empty(): + break + for record in records: + print(record.change_type.short_string(), record.row) # +I / -U / +U / -D +``` + +With the default changelog mode (`'table.changelog.image' = 'FULL'`), updating a key emits a `-U`/`+U` pair and deleting it emits `-D`; the `WAL` mode emits only `+U` on update. To resume from a committed position instead of the start, `subscribe` at a specific offset. + ## Limit Scan To read up to `n` rows of a bucket's current state without supplying keys, use a batch scanner. The server returns the deduplicated current rows as Arrow batches — convenient for previews or DataFusion sources. From a06ae7955ecd525eefb9ea36a099af87c19dc9dd Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Wed, 24 Jun 2026 21:35:15 +0100 Subject: [PATCH 3/4] [cpp] Support subscribing to primary-key table changelog --- bindings/cpp/CMakeLists.txt | 6 + .../cpp/examples/kv_changelog_example.cpp | 130 +++++++++++++ bindings/cpp/include/fluss.hpp | 14 ++ bindings/cpp/test/test_kv_changelog.cpp | 182 ++++++++++++++++++ website/docs/user-guide/cpp/api-reference.md | 4 +- .../cpp/example/primary-key-tables.md | 30 +++ 6 files changed, 364 insertions(+), 2 deletions(-) create mode 100644 bindings/cpp/examples/kv_changelog_example.cpp create mode 100644 bindings/cpp/test/test_kv_changelog.cpp diff --git a/bindings/cpp/CMakeLists.txt b/bindings/cpp/CMakeLists.txt index 44407ac8..c72b3675 100644 --- a/bindings/cpp/CMakeLists.txt +++ b/bindings/cpp/CMakeLists.txt @@ -236,6 +236,12 @@ target_link_libraries(fluss_cpp_kv_example PRIVATE Arrow::arrow_shared) target_compile_definitions(fluss_cpp_kv_example PRIVATE ARROW_FOUND) target_include_directories(fluss_cpp_kv_example PUBLIC ${CPP_INCLUDE_DIR}) +add_executable(fluss_cpp_kv_changelog_example examples/kv_changelog_example.cpp) +target_link_libraries(fluss_cpp_kv_changelog_example PRIVATE fluss_cpp) +target_link_libraries(fluss_cpp_kv_changelog_example PRIVATE Arrow::arrow_shared) +target_compile_definitions(fluss_cpp_kv_changelog_example PRIVATE ARROW_FOUND) +target_include_directories(fluss_cpp_kv_changelog_example PUBLIC ${CPP_INCLUDE_DIR}) + if (CARGO_TARGET_DIR) set_target_properties(fluss_cpp PROPERTIES ADDITIONAL_CLEAN_FILES "${CARGO_TARGET_DIR}" diff --git a/bindings/cpp/examples/kv_changelog_example.cpp b/bindings/cpp/examples/kv_changelog_example.cpp new file mode 100644 index 00000000..f3a6ab91 --- /dev/null +++ b/bindings/cpp/examples/kv_changelog_example.cpp @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Primary-key table changelog (CDC) example: subscribe to a KV table's +// changelog and print each row-level change as a +I / -U / +U / -D event. + +#include +#include + +#include "fluss.hpp" + +static void check(const char* step, const fluss::Result& r) { + if (!r.Ok()) { + std::cerr << step << " failed: code=" << r.error_code << " msg=" << r.error_message + << std::endl; + std::exit(1); + } +} + +static const char* change_symbol(fluss::ChangeType ct) { + switch (ct) { + case fluss::ChangeType::AppendOnly: + return "+A"; + case fluss::ChangeType::Insert: + return "+I"; + case fluss::ChangeType::UpdateBefore: + return "-U"; + case fluss::ChangeType::UpdateAfter: + return "+U"; + case fluss::ChangeType::Delete: + return "-D"; + } + return "?"; +} + +int main() { + fluss::Configuration config; + config.bootstrap_servers = "127.0.0.1:9123"; + + fluss::Connection conn; + check("create", fluss::Connection::Create(config, conn)); + + fluss::Admin admin; + check("get_admin", conn.GetAdmin(admin)); + + fluss::TablePath table_path("fluss", "kv_changelog_cpp"); + admin.DropTable(table_path, true); + + // A single bucket keeps the changelog on one bucket and in order, which + // makes the CDC output easy to follow. + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .SetPrimaryKeys({"id"}) + .Build(); + + auto descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetBucketCount(1) + .SetComment("cpp kv changelog example") + .Build(); + + check("create_table", admin.CreateTable(table_path, descriptor, false)); + std::cout << "Created PK table: " << table_path.ToString() << std::endl; + + fluss::Table table; + check("get_table", conn.GetTable(table_path, table)); + + fluss::UpsertWriter writer; + check("new_upsert_writer", table.NewUpsert().CreateWriter(writer)); + + // Insert three keys (+I), update one (-U / +U) and delete one (-D). + for (const auto& kv : {std::pair{1, "alice"}, + {2, "bob"}, {3, "carol"}}) { + fluss::GenericRow row(2); + row.SetInt32(0, kv.first); + row.SetString(1, kv.second); + check("upsert", writer.Upsert(row)); + } + { + fluss::GenericRow row(2); + row.SetInt32(0, 2); + row.SetString(1, "bob-v2"); + check("update", writer.Upsert(row)); + } + { + fluss::GenericRow del(2); + del.SetInt32(0, 3); + check("delete", writer.Delete(del)); + } + check("flush", writer.Flush()); + + // Subscribe from the start of the changelog and print each CDC event until + // we reach the end of the log. + auto table_scan = table.NewScan(); + fluss::LogScanner log_scanner; + check("create_log_scanner", table_scan.CreateLogScanner(log_scanner)); + check("subscribe", log_scanner.Subscribe(0, fluss::EARLIEST_OFFSET)); + + std::cout << "Changelog (change_type id name):" << std::endl; + while (true) { + fluss::ScanRecords records; + check("poll", log_scanner.Poll(3000, records)); + if (records.IsEmpty()) { + break; + } + for (auto rec : records) { + std::cout << " " << change_symbol(rec.change_type) << " " << rec.row.GetInt32(0) << " " + << rec.row.GetString(1) << std::endl; + } + } + + check("drop_table", admin.DropTable(table_path, true)); + std::cout << "\nKV changelog example completed successfully!" << std::endl; + return 0; +} diff --git a/bindings/cpp/include/fluss.hpp b/bindings/cpp/include/fluss.hpp index 6987e683..9e727d6d 100644 --- a/bindings/cpp/include/fluss.hpp +++ b/bindings/cpp/include/fluss.hpp @@ -1630,7 +1630,21 @@ class TableScan { TableScan& Limit(int32_t row_number); + /// Creates a record-mode log scanner, polled for individual `ScanRecord`s. + /// + /// Works on log tables and on primary-key (KV) tables. For a primary-key + /// table this subscribes to its CDC changelog: each `ScanRecord` carries a + /// `ChangeType` -- `+I` (insert), `-U` (update-before), `+U` (update-after) + /// or `-D` (delete). A log table yields `+A` (append-only). Requires the + /// ARROW log format. Result CreateLogScanner(LogScanner& out); + + /// Creates a batch-mode log scanner that yields Arrow record batches. + /// + /// Log tables only. Primary-key tables are rejected because the Arrow batch + /// path carries no per-record change types; read a primary-key table's + /// changelog with `CreateLogScanner()` instead. Requires the ARROW log + /// format. Result CreateRecordBatchLogScanner(LogScanner& out); Result CreateBucketBatchScanner(const TableBucket& bucket, BatchScanner& out); diff --git a/bindings/cpp/test/test_kv_changelog.cpp b/bindings/cpp/test/test_kv_changelog.cpp new file mode 100644 index 00000000..8be2aca8 --- /dev/null +++ b/bindings/cpp/test/test_kv_changelog.cpp @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// Integration tests for primary-key (KV) table changelog (CDC) scanning. +// Mirrors crates/fluss/tests/integration/kv_changelog.rs. + +#include + +#include +#include +#include +#include + +#include "test_utils.h" + +class KvChangelogTest : public ::testing::Test { + protected: + fluss::Admin& admin() { return fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); } + + fluss::Connection& connection() { + return fluss_test::FlussTestEnvironment::Instance()->GetConnection(); + } +}; + +// A record-mode scanner over a PK table yields its CDC changelog. With the +// default FULL changelog image: inserting a new key emits +I, overwriting an +// existing key emits -U (old image) then +U (new image), and a delete emits -D +// (old image). A single bucket keeps the offsets contiguous. +TEST_F(KvChangelogTest, SubscribeKvTableChangelog) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_kv_changelog_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .SetPrimaryKeys({"id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetBucketCount(1) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + // Await each write so the changelog offsets are produced in a fixed order. + { // +I (1, alice) + fluss::GenericRow row(2); + row.SetInt32(0, 1); + row.SetString(1, "alice"); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(row, wr)); + ASSERT_OK(wr.Wait()); + } + { // +I (2, bob) + fluss::GenericRow row(2); + row.SetInt32(0, 2); + row.SetString(1, "bob"); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(row, wr)); + ASSERT_OK(wr.Wait()); + } + { // overwrite id=1 -> -U (1, alice) then +U (1, alice2) + fluss::GenericRow row(2); + row.SetInt32(0, 1); + row.SetString(1, "alice2"); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(row, wr)); + ASSERT_OK(wr.Wait()); + } + { // -D (2, bob) + fluss::GenericRow del(2); + del.SetInt32(0, 2); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Delete(del, wr)); + ASSERT_OK(wr.Wait()); + } + + auto table_scan = table.NewScan(); + fluss::LogScanner log_scanner; + ASSERT_OK(table_scan.CreateLogScanner(log_scanner)); + ASSERT_OK(log_scanner.Subscribe(0, fluss::EARLIEST_OFFSET)); + + struct Decoded { + int64_t offset; + fluss::ChangeType change_type; + int32_t id; + std::string name; + }; + + std::vector records; + fluss_test::PollRecords( + log_scanner, 5, + [](const fluss::ScanRecord& rec) { + return Decoded{rec.offset, rec.change_type, rec.row.GetInt32(0), + std::string(rec.row.GetString(1))}; + }, + records); + + ASSERT_EQ(records.size(), 5u); + std::sort(records.begin(), records.end(), + [](const Decoded& a, const Decoded& b) { return a.offset < b.offset; }); + + const std::vector expected_types = { + fluss::ChangeType::Insert, fluss::ChangeType::Insert, fluss::ChangeType::UpdateBefore, + fluss::ChangeType::UpdateAfter, fluss::ChangeType::Delete}; + const std::vector> expected_rows = { + {1, "alice"}, // +I + {2, "bob"}, // +I + {1, "alice"}, // -U (old image) + {1, "alice2"}, // +U (new image) + {2, "bob"}, // -D (old image) + }; + + for (size_t i = 0; i < records.size(); ++i) { + EXPECT_EQ(records[i].offset, static_cast(i)); + EXPECT_EQ(static_cast(records[i].change_type), static_cast(expected_types[i])) + << "change_type mismatch at " << i; + EXPECT_EQ(records[i].id, expected_rows[i].first) << "id mismatch at " << i; + EXPECT_EQ(records[i].name, expected_rows[i].second) << "name mismatch at " << i; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +// The Arrow batch scanner carries no per-record change types, so it rejects +// primary-key tables (mirrors the core / Java restriction). +TEST_F(KvChangelogTest, RecordBatchScannerRejectsPrimaryKey) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_kv_changelog_batch_reject_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .SetPrimaryKeys({"id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto table_scan = table.NewScan(); + fluss::LogScanner batch_scanner; + auto result = table_scan.CreateRecordBatchLogScanner(batch_scanner); + EXPECT_FALSE(result.Ok()) << "batch scanner should reject a primary-key table"; + + ASSERT_OK(adm.DropTable(table_path, false)); +} diff --git a/website/docs/user-guide/cpp/api-reference.md b/website/docs/user-guide/cpp/api-reference.md index c845010f..c5640a89 100644 --- a/website/docs/user-guide/cpp/api-reference.md +++ b/website/docs/user-guide/cpp/api-reference.md @@ -151,8 +151,8 @@ Complete API reference for the Fluss C++ client. | `ProjectByIndex(std::vector column_indices) -> TableScan&` | Project columns by index | | `ProjectByName(std::vector column_names) -> TableScan&` | Project columns by name | | `Limit(int32_t row_number) -> TableScan&` | Set a positive row limit (enables `CreateBucketBatchScanner`; rejected by log scanners) | -| `CreateLogScanner(LogScanner& out) -> Result` | Create a record-based log scanner | -| `CreateRecordBatchLogScanner(LogScanner& out) -> Result` | Create an Arrow RecordBatch-based log scanner | +| `CreateLogScanner(LogScanner& out) -> Result` | Create a record-based log scanner; on a primary-key table, subscribes to its CDC changelog (per-record `change_type`) | +| `CreateRecordBatchLogScanner(LogScanner& out) -> Result` | Create an Arrow RecordBatch-based log scanner (log tables only — no per-record change types) | | `CreateBucketBatchScanner(const TableBucket& bucket, BatchScanner& out) -> Result` | Bounded scan of one bucket (requires `Limit`) | ## `AppendWriter` diff --git a/website/docs/user-guide/cpp/example/primary-key-tables.md b/website/docs/user-guide/cpp/example/primary-key-tables.md index f0dfad90..1e5aa744 100644 --- a/website/docs/user-guide/cpp/example/primary-key-tables.md +++ b/website/docs/user-guide/cpp/example/primary-key-tables.md @@ -131,6 +131,36 @@ if (result.Found()) { } ``` +## Subscribing to the Changelog (CDC) + +Every primary key table maintains a changelog of its row-level changes. Read it with a record-mode `LogScanner` — the same API used for [log tables](./log-tables.md) — to stream CDC events. Each `ScanRecord` carries a `change_type`: `+I` (insert), `-U` / `+U` (the old and new images of an update), and `-D` (delete, carrying the old row). + +```cpp +fluss::LogScanner log_scanner; +table.NewScan().CreateLogScanner(log_scanner); + +// Subscribe to every bucket from the start of the changelog. +int32_t num_buckets = table.GetTableInfo().num_buckets; +for (int32_t bucket_id = 0; bucket_id < num_buckets; ++bucket_id) { + log_scanner.Subscribe(bucket_id, fluss::EARLIEST_OFFSET); +} + +while (true) { + fluss::ScanRecords records; + log_scanner.Poll(3000, records); + if (records.IsEmpty()) { + break; // caught up to the end of the changelog + } + for (auto rec : records) { + // rec.change_type is +I / -U / +U / -D (see ChangeTypeShortString in the API reference) + std::cout << ChangeTypeShortString(rec.change_type) << " " << rec.row.GetInt32(0) << " " + << rec.row.GetString(1) << std::endl; + } +} +``` + +With the default changelog mode (`'table.changelog.image' = 'FULL'`), updating a key emits a `-U`/`+U` pair and deleting it emits `-D`; the `WAL` mode emits only `+U` on update. To resume from a committed position instead of the start, `Subscribe` at a specific offset. + ## Limit Scan To read up to `n` rows of a bucket's current state without supplying keys, use a batch scanner. The server returns the deduplicated current rows as Arrow batches, convenient for previews or analytics. From 3676aed87a0482a70e953b3eac999b250cb4c4dd Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Thu, 25 Jun 2026 10:50:11 +0100 Subject: [PATCH 4/4] address comments --- crates/fluss/src/client/table/log_fetch_buffer.rs | 4 ++-- crates/fluss/src/record/arrow.rs | 13 ++++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/crates/fluss/src/client/table/log_fetch_buffer.rs b/crates/fluss/src/client/table/log_fetch_buffer.rs index ea188111..dee68c95 100644 --- a/crates/fluss/src/client/table/log_fetch_buffer.rs +++ b/crates/fluss/src/client/table/log_fetch_buffer.rs @@ -858,7 +858,7 @@ mod tests { }; use crate::metadata::{DataField, DataTypes, PhysicalTablePath, RowType, TablePath}; use crate::record::{ - ATTRIBUTES_OFFSET, LENGTH_LENGTH, LENGTH_OFFSET, LOG_OVERHEAD, + APPEND_ONLY_FLAG_MASK, ATTRIBUTES_OFFSET, LENGTH_LENGTH, LENGTH_OFFSET, LOG_OVERHEAD, MemoryLogRecordsArrowBuilder, RECORDS_OFFSET, ReadContext, to_arrow_schema, }; use crate::row::GenericRow; @@ -1014,7 +1014,7 @@ mod tests { data.extend_from_slice(&append_only[..RECORDS_OFFSET]); data.extend(change_types.iter().map(ChangeType::to_byte_value)); data.extend_from_slice(&append_only[RECORDS_OFFSET..]); - data[ATTRIBUTES_OFFSET] = 0; + data[ATTRIBUTES_OFFSET] &= !APPEND_ONLY_FLAG_MASK; let new_len = ((data.len() - LOG_OVERHEAD) as i32).to_le_bytes(); data[LENGTH_OFFSET..LENGTH_OFFSET + LENGTH_LENGTH].copy_from_slice(&new_len); diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index ff610689..025fe0d6 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -1020,7 +1020,14 @@ impl LogRecordBatch { return Ok((BatchChangeTypes::Uniform(ChangeType::AppendOnly), body)); } - let record_count = self.record_count() as usize; + let record_count = self.record_count(); + if record_count < 0 { + return Err(Error::UnexpectedError { + message: format!("Corrupt changelog batch: negative record count {record_count}"), + source: None, + }); + } + let record_count = record_count as usize; let (change_type_bytes, arrow_data) = body.split_at_checked(record_count) .ok_or_else(|| Error::UnexpectedError { @@ -2483,7 +2490,7 @@ mod tests { /// header and the Arrow payload, then fixes up the length field and CRC. fn splice_change_type_vector(append_only: &[u8], change_types: &[ChangeType]) -> Vec { let mut data = append_only.to_vec(); - data[ATTRIBUTES_OFFSET] = 0; + data[ATTRIBUTES_OFFSET] &= !APPEND_ONLY_FLAG_MASK; let change_bytes = change_types.iter().map(|ct| ct.to_byte_value()); data.splice(RECORDS_OFFSET..RECORDS_OFFSET, change_bytes); @@ -2585,7 +2592,7 @@ mod tests { // Clear the append-only flag, then cut the body shorter than the // record_count change-type bytes the decoder now expects. let mut data = append_only; - data[ATTRIBUTES_OFFSET] = 0; + data[ATTRIBUTES_OFFSET] &= !APPEND_ONLY_FLAG_MASK; data.truncate(RECORDS_OFFSET + 1); let batch = LogRecordBatch::new(Bytes::from(data));