diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 57ecfa41..a1c35a8f 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -248,6 +248,7 @@ mod tests { use paimon::spec::{ BinaryRow, DataType, Datum, IntType, PredicateBuilder, Schema as PaimonSchema, TableSchema, }; + use paimon::table::Table; use std::fs; use tempfile::tempdir; use test_utils::{local_file_path, test_data_file, write_int_parquet_file}; @@ -298,6 +299,7 @@ mod tests { Identifier::new("test_db", "test_table"), "/tmp/test-table".to_string(), table_schema, + None, ) } @@ -329,6 +331,7 @@ mod tests { Identifier::new("default", "t"), table_path, table_schema, + None, ); let split = paimon::DataSplitBuilder::new() diff --git a/crates/paimon/src/api/resource_paths.rs b/crates/paimon/src/api/resource_paths.rs index 1cbc88f7..9345310a 100644 --- a/crates/paimon/src/api/resource_paths.rs +++ b/crates/paimon/src/api/resource_paths.rs @@ -131,6 +131,18 @@ impl ResourcePaths { pub fn rename_table(&self) -> String { format!("{}/{}/rename", self.base_path, Self::TABLES) } + + /// Get the commit table endpoint path. + pub fn commit_table(&self, database_name: &str, table_name: &str) -> String { + format!( + "{}/{}/{}/{}/{}/commit", + self.base_path, + Self::DATABASES, + RESTUtil::encode_string(database_name), + Self::TABLES, + RESTUtil::encode_string(table_name) + ) + } } #[cfg(test)] diff --git a/crates/paimon/src/api/rest_api.rs b/crates/paimon/src/api/rest_api.rs index 00a32b8d..8a3416ed 100644 --- a/crates/paimon/src/api/rest_api.rs +++ b/crates/paimon/src/api/rest_api.rs @@ -25,7 +25,7 @@ use std::collections::HashMap; use crate::api::rest_client::HttpClient; use crate::catalog::Identifier; use crate::common::{CatalogOptions, Options}; -use crate::spec::Schema; +use crate::spec::{PartitionStatistics, Schema, Snapshot}; use crate::Result; use super::api_request::{ @@ -391,4 +391,32 @@ impl RESTApi { let path = self.resource_paths.table_token(database, table); self.client.get(&path, None::<&[(&str, &str)]>).await } + + // ==================== Commit Operations ==================== + + /// Commit a snapshot for a table. + /// + /// Corresponds to Python `RESTApi.commit_snapshot`. + pub async fn commit_snapshot( + &self, + identifier: &Identifier, + table_uuid: &str, + snapshot: &Snapshot, + statistics: &[PartitionStatistics], + ) -> Result { + let database = identifier.database(); + let table = identifier.object(); + validate_non_empty_multi(&[(database, "database name"), (table, "table name")])?; + let path = self.resource_paths.commit_table(database, table); + let request = serde_json::json!({ + "tableUuid": table_uuid, + "snapshot": snapshot, + "statistics": statistics, + }); + let resp: serde_json::Value = self.client.post(&path, &request).await?; + Ok(resp + .get("success") + .and_then(|v| v.as_bool()) + .unwrap_or(false)) + } } diff --git a/crates/paimon/src/catalog/filesystem.rs b/crates/paimon/src/catalog/filesystem.rs index 313b6ef6..a061e8ab 100644 --- a/crates/paimon/src/catalog/filesystem.rs +++ b/crates/paimon/src/catalog/filesystem.rs @@ -327,6 +327,7 @@ impl Catalog for FileSystemCatalog { identifier.clone(), table_path, schema, + None, )) } diff --git a/crates/paimon/src/catalog/rest/rest_catalog.rs b/crates/paimon/src/catalog/rest/rest_catalog.rs index e5d023ff..1da2aa16 100644 --- a/crates/paimon/src/catalog/rest/rest_catalog.rs +++ b/crates/paimon/src/catalog/rest/rest_catalog.rs @@ -21,6 +21,7 @@ //! a Paimon REST catalog server for database and table CRUD operations. use std::collections::HashMap; +use std::sync::Arc; use async_trait::async_trait; @@ -32,7 +33,7 @@ use crate::common::{CatalogOptions, Options}; use crate::error::Error; use crate::io::FileIO; use crate::spec::{Schema, SchemaChange, TableSchema}; -use crate::table::Table; +use crate::table::{RESTEnv, Table}; use crate::Result; use super::rest_token_file_io::RESTTokenFileIO; @@ -44,8 +45,8 @@ use super::rest_token_file_io::RESTTokenFileIO; /// /// Corresponds to Python `RESTCatalog` in `pypaimon/catalog/rest/rest_catalog.py`. pub struct RESTCatalog { - /// The REST API client. - api: RESTApi, + /// The REST API client (shared with RESTEnv). + api: Arc, /// Catalog configuration options. options: Options, /// Warehouse path. @@ -71,7 +72,7 @@ impl RESTCatalog { message: format!("Missing required option: {}", CatalogOptions::WAREHOUSE), })?; - let api = RESTApi::new(options.clone(), config_required).await?; + let api = Arc::new(RESTApi::new(options.clone(), config_required).await?); let data_token_enabled = api .options() @@ -232,6 +233,15 @@ impl Catalog for RESTCatalog { source: None, })?; + // Extract table uuid for RESTEnv + let uuid = response.id.ok_or_else(|| Error::DataInvalid { + message: format!( + "Table {} response missing id (uuid)", + identifier.full_name() + ), + source: None, + })?; + // Build FileIO based on data_token_enabled and is_external // TODO Support token cache and direct oss access let file_io = if self.data_token_enabled && !is_external { @@ -244,11 +254,14 @@ impl Catalog for RESTCatalog { FileIO::from_path(&table_path)?.build()? }; + let rest_env = RESTEnv::new(identifier.clone(), uuid, self.api.clone()); + Ok(Table::new( file_io, identifier.clone(), table_path, table_schema, + Some(rest_env), )) } diff --git a/crates/paimon/src/lib.rs b/crates/paimon/src/lib.rs index a68ba6b8..3867d69b 100644 --- a/crates/paimon/src/lib.rs +++ b/crates/paimon/src/lib.rs @@ -42,6 +42,7 @@ pub use catalog::CatalogFactory; pub use catalog::FileSystemCatalog; pub use table::{ - DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, ReadBuilder, RowRange, - SnapshotManager, Table, TableRead, TableScan, TagManager, + CommitMessage, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RESTEnv, + RESTSnapshotCommit, ReadBuilder, RenamingSnapshotCommit, RowRange, SnapshotCommit, + SnapshotManager, Table, TableCommit, TableRead, TableScan, TagManager, WriteBuilder, }; diff --git a/crates/paimon/src/spec/binary_row.rs b/crates/paimon/src/spec/binary_row.rs index 1621476c..0ba6b782 100644 --- a/crates/paimon/src/spec/binary_row.rs +++ b/crates/paimon/src/spec/binary_row.rs @@ -19,6 +19,7 @@ //! and BinaryRowBuilder for constructing BinaryRow instances. use crate::spec::murmur_hash::hash_by_words; +use crate::spec::{DataType, Datum}; use serde::{Deserialize, Serialize}; pub const EMPTY_BINARY_ROW: BinaryRow = BinaryRow::new(0); @@ -523,6 +524,87 @@ impl BinaryRowBuilder { serialized.extend_from_slice(&self.data); serialized } + + /// Write a Datum value at the given position, dispatching by type. + pub fn write_datum(&mut self, pos: usize, datum: &Datum, data_type: &DataType) { + match datum { + Datum::Bool(v) => self.write_boolean(pos, *v), + Datum::TinyInt(v) => self.write_byte(pos, *v), + Datum::SmallInt(v) => self.write_short(pos, *v), + Datum::Int(v) | Datum::Date(v) | Datum::Time(v) => self.write_int(pos, *v), + Datum::Long(v) => self.write_long(pos, *v), + Datum::Float(v) => self.write_float(pos, *v), + Datum::Double(v) => self.write_double(pos, *v), + Datum::Timestamp { millis, nanos } => { + let precision = match data_type { + DataType::Timestamp(ts) => ts.precision(), + _ => 3, + }; + if precision <= 3 { + self.write_timestamp_compact(pos, *millis); + } else { + self.write_timestamp_non_compact(pos, *millis, *nanos); + } + } + Datum::LocalZonedTimestamp { millis, nanos } => { + let precision = match data_type { + DataType::LocalZonedTimestamp(ts) => ts.precision(), + _ => 3, + }; + if precision <= 3 { + self.write_timestamp_compact(pos, *millis); + } else { + self.write_timestamp_non_compact(pos, *millis, *nanos); + } + } + Datum::Decimal { + unscaled, + precision, + .. + } => { + if *precision <= 18 { + self.write_decimal_compact(pos, *unscaled as i64); + } else { + self.write_decimal_var_len(pos, *unscaled); + } + } + Datum::String(s) => { + if s.len() <= 7 { + self.write_string_inline(pos, s); + } else { + self.write_string(pos, s); + } + } + Datum::Bytes(b) => { + if b.len() <= 7 { + self.write_binary_inline(pos, b); + } else { + self.write_binary(pos, b); + } + } + } + } +} + +/// Build a serialized BinaryRow from optional Datum values. +/// Returns empty vec if all values are None. +pub fn datums_to_binary_row(datums: &[(&Option, &DataType)]) -> Vec { + if datums.iter().all(|(d, _)| d.is_none()) { + return vec![]; + } + let arity = datums.len() as i32; + let mut builder = BinaryRowBuilder::new(arity); + for (pos, (datum_opt, data_type)) in datums.iter().enumerate() { + match datum_opt { + Some(datum) => { + builder.write_datum(pos, datum, data_type); + } + None => { + builder.set_null_at(pos); + } + } + } + builder.build_serialized() } #[cfg(test)] @@ -756,6 +838,73 @@ mod tests { assert_eq!(nano, 0); } + #[test] + fn test_write_datum_int_and_string() { + let mut builder = BinaryRowBuilder::new(2); + builder.write_datum( + 0, + &Datum::Int(42), + &DataType::Int(crate::spec::IntType::new()), + ); + builder.write_datum( + 1, + &Datum::String("hello".to_string()), + &DataType::VarChar(crate::spec::VarCharType::string_type()), + ); + let row = builder.build(); + assert_eq!(row.get_int(0).unwrap(), 42); + assert_eq!(row.get_string(1).unwrap(), "hello"); + } + + #[test] + fn test_write_datum_long_string() { + let mut builder = BinaryRowBuilder::new(1); + builder.write_datum( + 0, + &Datum::String("long_string_value".to_string()), + &DataType::VarChar(crate::spec::VarCharType::string_type()), + ); + let row = builder.build(); + assert_eq!(row.get_string(0).unwrap(), "long_string_value"); + } + + #[test] + fn test_datums_to_binary_row_roundtrip() { + let d1 = Some(Datum::Int(100)); + let d2 = Some(Datum::String("abc".to_string())); + let dt1 = DataType::Int(crate::spec::IntType::new()); + let dt2 = DataType::VarChar(crate::spec::VarCharType::string_type()); + let datums = vec![(&d1, &dt1), (&d2, &dt2)]; + let bytes = datums_to_binary_row(&datums); + assert!(!bytes.is_empty()); + let row = BinaryRow::from_serialized_bytes(&bytes).unwrap(); + assert_eq!(row.get_int(0).unwrap(), 100); + assert_eq!(row.get_string(1).unwrap(), "abc"); + } + + #[test] + fn test_datums_to_binary_row_all_none() { + let d1: Option = None; + let dt1 = DataType::Int(crate::spec::IntType::new()); + let datums = vec![(&d1, &dt1)]; + let bytes = datums_to_binary_row(&datums); + assert!(bytes.is_empty()); + } + + #[test] + fn test_datums_to_binary_row_mixed_null() { + let d1 = Some(Datum::Int(7)); + let d2: Option = None; + let dt1 = DataType::Int(crate::spec::IntType::new()); + let dt2 = DataType::Int(crate::spec::IntType::new()); + let datums = vec![(&d1, &dt1), (&d2, &dt2)]; + let bytes = datums_to_binary_row(&datums); + assert!(!bytes.is_empty()); + let row = BinaryRow::from_serialized_bytes(&bytes).unwrap(); + assert_eq!(row.get_int(0).unwrap(), 7); + assert!(row.is_null_at(1)); + } + #[test] fn test_get_timestamp_non_compact() { let epoch_millis: i64 = 1_704_067_200_123; diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs index 93c3fec1..17993d9e 100644 --- a/crates/paimon/src/spec/core_options.rs +++ b/crates/paimon/src/spec/core_options.rs @@ -26,6 +26,17 @@ const PARTITION_DEFAULT_NAME_OPTION: &str = "partition.default-name"; const PARTITION_LEGACY_NAME_OPTION: &str = "partition.legacy-name"; const BUCKET_KEY_OPTION: &str = "bucket-key"; const BUCKET_FUNCTION_TYPE_OPTION: &str = "bucket-function.type"; +const BUCKET_OPTION: &str = "bucket"; +const DEFAULT_BUCKET: i32 = 1; +const COMMIT_MAX_RETRIES_OPTION: &str = "commit.max-retries"; +const COMMIT_TIMEOUT_OPTION: &str = "commit.timeout"; +const COMMIT_MIN_RETRY_WAIT_OPTION: &str = "commit.min-retry-wait"; +const COMMIT_MAX_RETRY_WAIT_OPTION: &str = "commit.max-retry-wait"; +const ROW_TRACKING_ENABLED_OPTION: &str = "row-tracking.enabled"; +const DEFAULT_COMMIT_MAX_RETRIES: u32 = 10; +const DEFAULT_COMMIT_TIMEOUT_MS: u64 = 120_000; +const DEFAULT_COMMIT_MIN_RETRY_WAIT_MS: u64 = 1_000; +const DEFAULT_COMMIT_MAX_RETRY_WAIT_MS: u64 = 10_000; pub const SCAN_SNAPSHOT_ID_OPTION: &str = "scan.snapshot-id"; pub const SCAN_TIMESTAMP_MILLIS_OPTION: &str = "scan.timestamp-millis"; pub const SCAN_TAG_NAME_OPTION: &str = "scan.tag-name"; @@ -200,6 +211,49 @@ impl<'a> CoreOptions<'a> { .map(|v| v.split(',').map(|s| s.trim().to_string()).collect()) } + pub fn commit_max_retries(&self) -> u32 { + self.options + .get(COMMIT_MAX_RETRIES_OPTION) + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_COMMIT_MAX_RETRIES) + } + + pub fn commit_timeout_ms(&self) -> u64 { + self.options + .get(COMMIT_TIMEOUT_OPTION) + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_COMMIT_TIMEOUT_MS) + } + + pub fn commit_min_retry_wait_ms(&self) -> u64 { + self.options + .get(COMMIT_MIN_RETRY_WAIT_OPTION) + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_COMMIT_MIN_RETRY_WAIT_MS) + } + + pub fn commit_max_retry_wait_ms(&self) -> u64 { + self.options + .get(COMMIT_MAX_RETRY_WAIT_OPTION) + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_COMMIT_MAX_RETRY_WAIT_MS) + } + + pub fn row_tracking_enabled(&self) -> bool { + self.options + .get(ROW_TRACKING_ENABLED_OPTION) + .map(|v| v.eq_ignore_ascii_case("true")) + .unwrap_or(false) + } + + /// Number of buckets for the table. Default is 1. + pub fn bucket(&self) -> i32 { + self.options + .get(BUCKET_OPTION) + .and_then(|v| v.parse().ok()) + .unwrap_or(DEFAULT_BUCKET) + } + /// Whether the bucket function type is the default hash-based function. /// /// Only the default function (`Math.abs(hash % numBuckets)`) is supported @@ -363,6 +417,37 @@ mod tests { } } + #[test] + fn test_commit_options_defaults() { + let options = HashMap::new(); + let core = CoreOptions::new(&options); + assert_eq!(core.bucket(), 1); + assert_eq!(core.commit_max_retries(), 10); + assert_eq!(core.commit_timeout_ms(), 120_000); + assert_eq!(core.commit_min_retry_wait_ms(), 1_000); + assert_eq!(core.commit_max_retry_wait_ms(), 10_000); + assert!(!core.row_tracking_enabled()); + } + + #[test] + fn test_commit_options_custom() { + let options = HashMap::from([ + (BUCKET_OPTION.to_string(), "4".to_string()), + (COMMIT_MAX_RETRIES_OPTION.to_string(), "20".to_string()), + (COMMIT_TIMEOUT_OPTION.to_string(), "60000".to_string()), + (COMMIT_MIN_RETRY_WAIT_OPTION.to_string(), "500".to_string()), + (COMMIT_MAX_RETRY_WAIT_OPTION.to_string(), "5000".to_string()), + (ROW_TRACKING_ENABLED_OPTION.to_string(), "true".to_string()), + ]); + let core = CoreOptions::new(&options); + assert_eq!(core.bucket(), 4); + assert_eq!(core.commit_max_retries(), 20); + assert_eq!(core.commit_timeout_ms(), 60_000); + assert_eq!(core.commit_min_retry_wait_ms(), 500); + assert_eq!(core.commit_max_retry_wait_ms(), 5_000); + assert!(core.row_tracking_enabled()); + } + #[test] fn test_try_time_travel_selector_normalizes_valid_selector() { let tag_options = HashMap::from([(SCAN_TAG_NAME_OPTION.to_string(), "tag1".to_string())]); diff --git a/crates/paimon/src/spec/manifest.rs b/crates/paimon/src/spec/manifest.rs index 1893a7df..84f36291 100644 --- a/crates/paimon/src/spec/manifest.rs +++ b/crates/paimon/src/spec/manifest.rs @@ -17,6 +17,7 @@ use crate::io::FileIO; use crate::spec::manifest_entry::ManifestEntry; +use crate::spec::manifest_entry::MANIFEST_ENTRY_SCHEMA; use serde_avro_fast::object_container_file_encoding::Reader; use snafu::ResultExt; @@ -32,13 +33,6 @@ pub struct Manifest; impl Manifest { /// Read manifest entries from a file. - /// - /// # Arguments - /// * `file_io` - FileIO instance for reading files - /// * `path` - Path to the manifest file - /// - /// # Returns - /// A vector of ManifestEntry records pub async fn read(file_io: &FileIO, path: &str) -> Result> { let input_file = file_io.new_input(path)?; @@ -51,12 +45,6 @@ impl Manifest { } /// Read manifest entries from bytes. - /// - /// # Arguments - /// * `bytes` - Avro-encoded manifest file content - /// - /// # Returns - /// A vector of ManifestEntry records fn read_from_bytes(bytes: &[u8]) -> Result> { let mut reader = Reader::from_slice(bytes).whatever_context::<_, crate::Error>("read manifest avro")?; @@ -65,6 +53,13 @@ impl Manifest { .collect::, _>>() .whatever_context::<_, crate::Error>("deserialize manifest entry") } + + /// Write manifest entries to a file. + pub async fn write(file_io: &FileIO, path: &str, entries: &[ManifestEntry]) -> Result<()> { + let bytes = crate::spec::to_avro_bytes(MANIFEST_ENTRY_SCHEMA, entries)?; + let output = file_io.new_output(path)?; + output.write(bytes::Bytes::from(bytes)).await + } } #[cfg(test)] diff --git a/crates/paimon/src/spec/manifest_entry.rs b/crates/paimon/src/spec/manifest_entry.rs index d7aa32c6..cbe295b2 100644 --- a/crates/paimon/src/spec/manifest_entry.rs +++ b/crates/paimon/src/spec/manifest_entry.rs @@ -35,7 +35,7 @@ pub struct Identifier { /// Entry of a manifest file, representing an addition / deletion of a data file. /// Impl Reference: -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ManifestEntry { #[serde(rename = "_KIND")] kind: FileKind, @@ -124,4 +124,79 @@ impl ManifestEntry { version, } } + + /// Return a copy with a different kind. + pub fn with_kind(mut self, kind: FileKind) -> Self { + self.kind = kind; + self + } + + /// Return a copy with sequence numbers set on the file. + pub fn with_sequence_number(mut self, min_seq: i64, max_seq: i64) -> Self { + self.file.min_sequence_number = min_seq; + self.file.max_sequence_number = max_seq; + self + } + + /// Return a copy with first_row_id set on the file. + pub fn with_first_row_id(mut self, first_row_id: i64) -> Self { + self.file.first_row_id = Some(first_row_id); + self + } } + +/// Avro schema for ManifestEntry (used in manifest files). +pub const MANIFEST_ENTRY_SCHEMA: &str = r#"["null", { + "type": "record", + "name": "record", + "namespace": "org.apache.paimon.avro.generated", + "fields": [ + {"name": "_KIND", "type": "int"}, + {"name": "_PARTITION", "type": "bytes"}, + {"name": "_BUCKET", "type": "int"}, + {"name": "_TOTAL_BUCKETS", "type": "int"}, + {"name": "_FILE", "type": ["null", { + "type": "record", + "name": "record__FILE", + "fields": [ + {"name": "_FILE_NAME", "type": "string"}, + {"name": "_FILE_SIZE", "type": "long"}, + {"name": "_ROW_COUNT", "type": "long"}, + {"name": "_MIN_KEY", "type": "bytes"}, + {"name": "_MAX_KEY", "type": "bytes"}, + {"name": "_KEY_STATS", "type": ["null", { + "type": "record", + "name": "record__FILE__KEY_STATS", + "fields": [ + {"name": "_MIN_VALUES", "type": "bytes"}, + {"name": "_MAX_VALUES", "type": "bytes"}, + {"name": "_NULL_COUNTS", "type": ["null", {"type": "array", "items": ["null", "long"]}], "default": null} + ] + }], "default": null}, + {"name": "_VALUE_STATS", "type": ["null", { + "type": "record", + "name": "record__FILE__VALUE_STATS", + "fields": [ + {"name": "_MIN_VALUES", "type": "bytes"}, + {"name": "_MAX_VALUES", "type": "bytes"}, + {"name": "_NULL_COUNTS", "type": ["null", {"type": "array", "items": ["null", "long"]}], "default": null} + ] + }], "default": null}, + {"name": "_MIN_SEQUENCE_NUMBER", "type": "long"}, + {"name": "_MAX_SEQUENCE_NUMBER", "type": "long"}, + {"name": "_SCHEMA_ID", "type": "long"}, + {"name": "_LEVEL", "type": "int"}, + {"name": "_EXTRA_FILES", "type": {"type": "array", "items": "string"}}, + {"name": "_CREATION_TIME", "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}], "default": null}, + {"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default": null}, + {"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default": null}, + {"name": "_FILE_SOURCE", "type": ["null", "int"], "default": null}, + {"name": "_VALUE_STATS_COLS", "type": ["null", {"type": "array", "items": "string"}], "default": null}, + {"name": "_EXTERNAL_PATH", "type": ["null", "string"], "default": null}, + {"name": "_FIRST_ROW_ID", "type": ["null", "long"], "default": null}, + {"name": "_WRITE_COLS", "type": ["null", {"type": "array", "items": "string"}], "default": null} + ] + }], "default": null}, + {"name": "_VERSION", "type": "int"} + ] +}]"#; diff --git a/crates/paimon/src/spec/manifest_file_meta.rs b/crates/paimon/src/spec/manifest_file_meta.rs index 36f92b9b..b74389eb 100644 --- a/crates/paimon/src/spec/manifest_file_meta.rs +++ b/crates/paimon/src/spec/manifest_file_meta.rs @@ -115,6 +115,30 @@ impl ManifestFileMeta { } } +/// Avro schema for ManifestFileMeta (used in manifest-list files). +pub const MANIFEST_FILE_META_SCHEMA: &str = r#"["null", { + "type": "record", + "name": "record", + "namespace": "org.apache.paimon.avro.generated", + "fields": [ + {"name": "_VERSION", "type": "int"}, + {"name": "_FILE_NAME", "type": "string"}, + {"name": "_FILE_SIZE", "type": "long"}, + {"name": "_NUM_ADDED_FILES", "type": "long"}, + {"name": "_NUM_DELETED_FILES", "type": "long"}, + {"name": "_PARTITION_STATS", "type": ["null", { + "type": "record", + "name": "record__PARTITION_STATS", + "fields": [ + {"name": "_MIN_VALUES", "type": "bytes"}, + {"name": "_MAX_VALUES", "type": "bytes"}, + {"name": "_NULL_COUNTS", "type": ["null", {"type": "array", "items": ["null", "long"]}], "default": null} + ] + }], "default": null}, + {"name": "_SCHEMA_ID", "type": "long"} + ] +}]"#; + impl Display for ManifestFileMeta { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( diff --git a/crates/paimon/src/spec/manifest_list.rs b/crates/paimon/src/spec/manifest_list.rs new file mode 100644 index 00000000..050edf46 --- /dev/null +++ b/crates/paimon/src/spec/manifest_list.rs @@ -0,0 +1,120 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::io::FileIO; +use crate::spec::manifest_file_meta::MANIFEST_FILE_META_SCHEMA; +use crate::spec::ManifestFileMeta; +use crate::Result; + +/// Manifest list file reader and writer. +/// +/// A manifest list file contains a list of ManifestFileMeta records in Avro format. +/// Each record describes a manifest file. +/// +/// Impl Reference: +pub struct ManifestList; + +impl ManifestList { + /// Read manifest file metas from a manifest list file. + pub async fn read(file_io: &FileIO, path: &str) -> Result> { + let input = file_io.new_input(path)?; + if !input.exists().await? { + return Ok(Vec::new()); + } + let content = input.read().await?; + crate::spec::from_avro_bytes(&content) + } + + /// Write manifest file metas to a manifest list file. + pub async fn write(file_io: &FileIO, path: &str, metas: &[ManifestFileMeta]) -> Result<()> { + let bytes = crate::spec::to_avro_bytes(MANIFEST_FILE_META_SCHEMA, metas)?; + let output = file_io.new_output(path)?; + output.write(bytes::Bytes::from(bytes)).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::stats::BinaryTableStats; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + #[tokio::test] + async fn test_manifest_list_roundtrip() { + let file_io = test_file_io(); + let path = "memory:/test_manifest_list_roundtrip/manifest-list-0"; + file_io + .mkdirs("memory:/test_manifest_list_roundtrip/") + .await + .unwrap(); + + let value_bytes = vec![ + 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, + ]; + let original = vec![ + ManifestFileMeta::new( + "manifest-a".to_string(), + 1024, + 5, + 2, + BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![Some(1)]), + 0, + ), + ManifestFileMeta::new( + "manifest-b".to_string(), + 2048, + 10, + 0, + BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![Some(3)]), + 1, + ), + ]; + + ManifestList::write(&file_io, path, &original) + .await + .unwrap(); + let decoded = ManifestList::read(&file_io, path).await.unwrap(); + assert_eq!(original, decoded); + } + + #[tokio::test] + async fn test_manifest_list_read_nonexistent() { + let file_io = test_file_io(); + let result = ManifestList::read(&file_io, "memory:/nonexistent/manifest-list") + .await + .unwrap(); + assert!(result.is_empty()); + } + + #[tokio::test] + async fn test_manifest_list_write_empty() { + let file_io = test_file_io(); + let path = "memory:/test_manifest_list_empty/manifest-list-0"; + file_io + .mkdirs("memory:/test_manifest_list_empty/") + .await + .unwrap(); + + ManifestList::write(&file_io, path, &[]).await.unwrap(); + let decoded = ManifestList::read(&file_io, path).await.unwrap(); + assert!(decoded.is_empty()); + } +} diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index 50402a32..f30df9f3 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -53,8 +53,11 @@ pub use manifest_common::FileKind; mod manifest_entry; pub use manifest_entry::Identifier; pub use manifest_entry::ManifestEntry; +mod manifest_list; +pub use manifest_list::ManifestList; mod objects_file; pub use objects_file::from_avro_bytes; +pub use objects_file::to_avro_bytes; pub(crate) mod stats; mod types; pub use types::*; @@ -67,3 +70,5 @@ pub use predicate::{ field_idx_to_partition_idx, Datum, Predicate, PredicateBuilder, PredicateOperator, }; pub(crate) mod murmur_hash; +mod partition_statistics; +pub use partition_statistics::PartitionStatistics; diff --git a/crates/paimon/src/spec/objects_file.rs b/crates/paimon/src/spec/objects_file.rs index 171de1d7..864ea002 100644 --- a/crates/paimon/src/spec/objects_file.rs +++ b/crates/paimon/src/spec/objects_file.rs @@ -16,7 +16,8 @@ // under the License. use serde::de::DeserializeOwned; -use serde_avro_fast::object_container_file_encoding::Reader; +use serde::Serialize; +use serde_avro_fast::object_container_file_encoding::{Compression, Reader}; use snafu::ResultExt; pub fn from_avro_bytes(bytes: &[u8]) -> crate::Result> { @@ -28,15 +29,112 @@ pub fn from_avro_bytes(bytes: &[u8]) -> crate::Result("deserialize avro records") } +/// Serialize records into Avro Object Container File bytes. +/// +/// The `schema_json` must be a valid Avro schema JSON string that matches +/// the serde serialization layout of `T`. +pub fn to_avro_bytes(schema_json: &str, records: &[T]) -> crate::Result> { + let schema: serde_avro_fast::Schema = + schema_json + .parse() + .map_err( + |e: serde_avro_fast::schema::SchemaError| crate::Error::DataInvalid { + message: format!("invalid avro schema: {e}"), + source: Some(Box::new(e)), + }, + )?; + serde_avro_fast::object_container_file_encoding::write_all( + &schema, + Compression::Null, + Vec::new(), + records.iter(), + ) + .map_err(|e| crate::Error::DataInvalid { + message: format!("avro serialization failed: {e}"), + source: Some(Box::new(e)), + }) +} + #[cfg(test)] mod tests { + use super::*; use crate::spec::manifest_common::FileKind; - use crate::spec::manifest_entry::ManifestEntry; - use crate::spec::objects_file::from_avro_bytes; + use crate::spec::manifest_entry::{ManifestEntry, MANIFEST_ENTRY_SCHEMA}; + use crate::spec::manifest_file_meta::MANIFEST_FILE_META_SCHEMA; use crate::spec::stats::BinaryTableStats; use crate::spec::{DataFileMeta, ManifestFileMeta}; use chrono::{DateTime, Utc}; + #[test] + fn test_roundtrip_manifest_file_meta() { + let value_bytes = vec![ + 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, + ]; + let original = vec![ManifestFileMeta::new( + "manifest-test-0".to_string(), + 1024, + 5, + 2, + BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![Some(1)]), + 0, + )]; + let bytes = to_avro_bytes(MANIFEST_FILE_META_SCHEMA, &original).unwrap(); + let decoded = from_avro_bytes::(&bytes).unwrap(); + assert_eq!(original, decoded); + } + + #[test] + fn test_roundtrip_manifest_entry() { + let value_bytes = vec![ + 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, 1, 0, 0, 0, 0, 0, 0, 0, + ]; + let single_value = vec![0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]; + let original = vec![ManifestEntry::new( + FileKind::Add, + single_value.clone(), + 1, + 10, + DataFileMeta { + file_name: "test.parquet".to_string(), + file_size: 100, + row_count: 50, + min_key: single_value.clone(), + max_key: single_value.clone(), + key_stats: BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![Some(1), Some(2)], + ), + value_stats: BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![Some(1), Some(2)], + ), + min_sequence_number: 1, + max_sequence_number: 50, + schema_id: 0, + level: 0, + extra_files: vec![], + creation_time: Some( + "2024-09-06T07:45:55.039+00:00" + .parse::>() + .unwrap(), + ), + delete_row_count: Some(0), + embedded_index: None, + first_row_id: None, + write_cols: None, + external_path: None, + file_source: None, + value_stats_cols: None, + }, + 2, + )]; + let bytes = to_avro_bytes(MANIFEST_ENTRY_SCHEMA, &original).unwrap(); + let decoded = from_avro_bytes::(&bytes).unwrap(); + assert_eq!(original, decoded); + } + #[tokio::test] async fn test_read_manifest_list() { let workdir = diff --git a/crates/paimon/src/spec/partition_statistics.rs b/crates/paimon/src/spec/partition_statistics.rs new file mode 100644 index 00000000..30f05793 --- /dev/null +++ b/crates/paimon/src/spec/partition_statistics.rs @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Partition-level statistics for snapshot commits. +/// +/// Reference: [org.apache.paimon.partition.PartitionStatistics](https://github.com/apache/paimon) +/// and [pypaimon snapshot_commit.py PartitionStatistics](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/snapshot/snapshot_commit.py) +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PartitionStatistics { + pub spec: HashMap, + pub record_count: i64, + pub file_size_in_bytes: i64, + pub file_count: i64, + pub last_file_creation_time: u64, + pub total_buckets: i32, +} diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index 39d6aeec..62ab3fe1 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -86,6 +86,13 @@ impl TableSchema { &self.partition_keys } + pub fn partition_fields(&self) -> Vec { + self.partition_keys + .iter() + .filter_map(|key| self.fields.iter().find(|f| f.name() == key).cloned()) + .collect() + } + pub fn primary_keys(&self) -> &[String] { &self.primary_keys } diff --git a/crates/paimon/src/spec/snapshot.rs b/crates/paimon/src/spec/snapshot.rs index 28dc92db..013211b6 100644 --- a/crates/paimon/src/spec/snapshot.rs +++ b/crates/paimon/src/spec/snapshot.rs @@ -92,6 +92,10 @@ pub struct Snapshot { #[builder(default = None)] #[serde(skip_serializing_if = "Option::is_none")] statistics: Option, + /// next row id for row tracking + #[builder(default = None)] + #[serde(skip_serializing_if = "Option::is_none")] + next_row_id: Option, } impl Snapshot { @@ -190,6 +194,18 @@ impl Snapshot { pub fn statistics(&self) -> Option<&str> { self.statistics.as_deref() } + + /// Get the next row id of this snapshot. + #[inline] + pub fn next_row_id(&self) -> Option { + self.next_row_id + } + + /// Get the commit kind of this snapshot. + #[inline] + pub fn commit_kind(&self) -> &CommitKind { + &self.commit_kind + } } #[cfg(test)] diff --git a/crates/paimon/src/table/commit_message.rs b/crates/paimon/src/table/commit_message.rs new file mode 100644 index 00000000..4e9c4edc --- /dev/null +++ b/crates/paimon/src/table/commit_message.rs @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::spec::DataFileMeta; + +/// A commit message representing new files to be committed for a specific partition and bucket. +/// +/// Reference: [org.apache.paimon.table.sink.CommitMessage](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageImpl.java) +#[derive(Debug, Clone)] +pub struct CommitMessage { + /// Binary row bytes for the partition. + pub partition: Vec, + /// Bucket id. + pub bucket: i32, + /// New data files to be added. + pub new_files: Vec, +} + +impl CommitMessage { + pub fn new(partition: Vec, bucket: i32, new_files: Vec) -> Self { + Self { + partition, + bucket, + new_files, + } + } +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 1b343241..c17ebbc7 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -19,31 +19,41 @@ pub(crate) mod bin_pack; mod bucket_filter; +mod commit_message; #[cfg(feature = "fulltext")] mod full_text_search_builder; pub(crate) mod global_index_scanner; mod read_builder; +pub(crate) mod rest_env; pub(crate) mod row_id_predicate; pub(crate) mod schema_manager; +pub(crate) mod snapshot_commit; mod snapshot_manager; mod source; mod stats_filter; +pub(crate) mod table_commit; mod table_scan; mod tag_manager; +mod write_builder; use crate::Result; use arrow_array::RecordBatch; +pub use commit_message::CommitMessage; #[cfg(feature = "fulltext")] pub use full_text_search_builder::FullTextSearchBuilder; use futures::stream::BoxStream; pub use read_builder::{ReadBuilder, TableRead}; +pub use rest_env::RESTEnv; pub use schema_manager::SchemaManager; +pub use snapshot_commit::{RESTSnapshotCommit, RenamingSnapshotCommit, SnapshotCommit}; pub use snapshot_manager::SnapshotManager; pub use source::{ merge_row_ranges, DataSplit, DataSplitBuilder, DeletionFile, PartitionBucket, Plan, RowRange, }; +pub use table_commit::TableCommit; pub use table_scan::TableScan; pub use tag_manager::TagManager; +pub use write_builder::WriteBuilder; use crate::catalog::Identifier; use crate::io::FileIO; @@ -58,6 +68,7 @@ pub struct Table { location: String, schema: TableSchema, schema_manager: SchemaManager, + rest_env: Option, } impl Table { @@ -67,6 +78,7 @@ impl Table { identifier: Identifier, location: String, schema: TableSchema, + rest_env: Option, ) -> Self { let schema_manager = SchemaManager::new(file_io.clone(), location.clone()); Self { @@ -75,6 +87,7 @@ impl Table { location, schema, schema_manager, + rest_env, } } @@ -118,6 +131,13 @@ impl Table { FullTextSearchBuilder::new(self) } + /// Create a write builder for write/commit. + /// + /// Reference: [pypaimon FileStoreTable.new_write_builder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/table/file_store_table.py). + pub fn new_write_builder(&self) -> WriteBuilder<'_> { + WriteBuilder::new(self) + } + /// Create a copy of this table with extra options merged into the schema. pub fn copy_with_options(&self, extra: HashMap) -> Self { Self { @@ -126,6 +146,7 @@ impl Table { location: self.location.clone(), schema: self.schema.copy_with_options(extra), schema_manager: self.schema_manager.clone(), + rest_env: self.rest_env.clone(), } } } diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 6712b0e8..63d9917c 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -410,6 +410,7 @@ mod tests { Identifier::new("default", "t"), table_path, table_schema, + None, ); let split = DataSplitBuilder::new() @@ -469,6 +470,7 @@ mod tests { Identifier::new("default", "t"), table_path, table_schema, + None, ); let split = DataSplitBuilder::new() @@ -526,6 +528,7 @@ mod tests { Identifier::new("default", "t"), table_path, table_schema, + None, ); let split = DataSplitBuilder::new() @@ -586,6 +589,7 @@ mod tests { Identifier::new("default", "t"), table_path, table_schema, + None, ); let split = DataSplitBuilder::new() @@ -637,6 +641,7 @@ mod tests { Identifier::new("default", "t"), "/tmp/test".to_string(), table_schema, + None, ); let mut builder = table.new_read_builder(); @@ -692,6 +697,7 @@ mod tests { Identifier::new("default", "t"), "/tmp/test".to_string(), table_schema, + None, ); let mut builder = table.new_read_builder(); diff --git a/crates/paimon/src/table/rest_env.rs b/crates/paimon/src/table/rest_env.rs new file mode 100644 index 00000000..bd1b42ac --- /dev/null +++ b/crates/paimon/src/table/rest_env.rs @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! REST environment for creating RESTSnapshotCommit instances. + +use crate::api::rest_api::RESTApi; +use crate::catalog::Identifier; +use crate::table::snapshot_commit::{RESTSnapshotCommit, SnapshotCommit}; +use std::sync::Arc; + +/// REST environment that holds the REST API client, identifier, and uuid +/// needed to create a `RESTSnapshotCommit`. +#[derive(Clone)] +pub struct RESTEnv { + identifier: Identifier, + uuid: String, + api: Arc, +} + +impl std::fmt::Debug for RESTEnv { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RESTEnv") + .field("identifier", &self.identifier) + .field("uuid", &self.uuid) + .finish() + } +} + +impl RESTEnv { + /// Create a new RESTEnv. + pub fn new(identifier: Identifier, uuid: String, api: Arc) -> Self { + Self { + identifier, + uuid, + api, + } + } + + /// Create a `RESTSnapshotCommit` from this environment. + pub fn snapshot_commit(&self) -> Arc { + Arc::new(RESTSnapshotCommit::new( + self.api.clone(), + self.identifier.clone(), + self.uuid.clone(), + )) + } +} diff --git a/crates/paimon/src/table/snapshot_commit.rs b/crates/paimon/src/table/snapshot_commit.rs new file mode 100644 index 00000000..9d141f79 --- /dev/null +++ b/crates/paimon/src/table/snapshot_commit.rs @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! SnapshotCommit abstraction for atomic snapshot commits. +//! +//! Reference: [pypaimon snapshot_commit.py](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/snapshot/snapshot_commit.py) + +use crate::api::rest_api::RESTApi; +use crate::catalog::Identifier; +use crate::spec::{PartitionStatistics, Snapshot}; +use crate::table::SnapshotManager; +use crate::Result; +use async_trait::async_trait; +use std::sync::Arc; + +/// Interface to commit a snapshot atomically. +/// +/// Two implementations: +/// - `RenamingSnapshotCommit` — file system atomic rename +/// - `RESTSnapshotCommit` — via Catalog API (e.g. REST) +#[async_trait] +pub trait SnapshotCommit: Send + Sync { + /// Commit the given snapshot. Returns true if successful, false if + /// another writer won the race. + async fn commit(&self, snapshot: &Snapshot, statistics: &[PartitionStatistics]) + -> Result; +} + +/// A SnapshotCommit using file renaming to commit. +/// +/// Reference: [pypaimon RenamingSnapshotCommit](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/snapshot/renaming_snapshot_commit.py) +pub struct RenamingSnapshotCommit { + snapshot_manager: SnapshotManager, +} + +impl RenamingSnapshotCommit { + pub fn new(snapshot_manager: SnapshotManager) -> Self { + Self { snapshot_manager } + } +} + +#[async_trait] +impl SnapshotCommit for RenamingSnapshotCommit { + async fn commit( + &self, + snapshot: &Snapshot, + _statistics: &[PartitionStatistics], + ) -> Result { + // statistics are not used in file system mode (same as Python) + self.snapshot_manager.commit_snapshot(snapshot).await + } +} + +/// A SnapshotCommit using REST API to commit (e.g. REST Catalog). +/// +/// Reference: [pypaimon RESTSnapshotCommit](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/snapshot/catalog_snapshot_commit.py) +pub struct RESTSnapshotCommit { + api: Arc, + identifier: Identifier, + uuid: String, +} + +impl RESTSnapshotCommit { + pub fn new(api: Arc, identifier: Identifier, uuid: String) -> Self { + Self { + api, + identifier, + uuid, + } + } +} + +#[async_trait] +impl SnapshotCommit for RESTSnapshotCommit { + async fn commit( + &self, + snapshot: &Snapshot, + statistics: &[PartitionStatistics], + ) -> Result { + self.api + .commit_snapshot(&self.identifier, &self.uuid, snapshot, statistics) + .await + } +} diff --git a/crates/paimon/src/table/snapshot_manager.rs b/crates/paimon/src/table/snapshot_manager.rs index 6d8eab28..abff248e 100644 --- a/crates/paimon/src/table/snapshot_manager.rs +++ b/crates/paimon/src/table/snapshot_manager.rs @@ -45,6 +45,10 @@ impl SnapshotManager { } } + pub fn file_io(&self) -> &FileIO { + &self.file_io + } + /// Path to the snapshot directory (e.g. `table_path/snapshot`). pub fn snapshot_dir(&self) -> String { format!("{}/{}", self.table_path, SNAPSHOT_DIR) @@ -65,14 +69,22 @@ impl SnapshotManager { format!("{}/snapshot-{}", self.snapshot_dir(), snapshot_id) } + /// Path to the manifest directory. + pub fn manifest_dir(&self) -> String { + format!("{}/manifest", self.table_path) + } + + /// Path to a manifest file. + pub fn manifest_path(&self, manifest_name: &str) -> String { + format!("{}/{}", self.manifest_dir(), manifest_name) + } + /// Read a hint file and return the id, or None if the file does not exist, /// is being deleted, or contains invalid content. /// /// Reference: [HintFileUtils.readHint](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/utils/HintFileUtils.java) async fn read_hint(&self, path: &str) -> Option { let input = self.file_io.new_input(path).ok()?; - // Try to read directly without exists() check to avoid TOCTOU race. - // The file may be deleted or overwritten concurrently. let content = input.read().await.ok()?; let id_str = str::from_utf8(&content).ok()?; id_str.trim().parse().ok() @@ -138,7 +150,7 @@ impl SnapshotManager { self.find_by_list_files(i64::min).await } - /// Get a snapshot by id. Returns an error if the snapshot file does not exist. + /// Get a snapshot by id. pub async fn get_snapshot(&self, snapshot_id: i64) -> crate::Result { let snapshot_path = self.snapshot_path(snapshot_id); let snap_input = self.file_io.new_input(&snapshot_path)?; @@ -176,6 +188,68 @@ impl SnapshotManager { Ok(Some(snapshot)) } + /// Atomically commit a snapshot. + /// + /// Writes the snapshot JSON to the target path. Returns `false` if the + /// target already exists (another writer won the race). + /// + /// On file systems that support atomic rename, we write to a temp file + /// first then rename. On backends where rename is not supported (e.g. + /// memory, object stores), we fall back to a direct write after an + /// existence check. + pub async fn commit_snapshot(&self, snapshot: &Snapshot) -> crate::Result { + let target_path = self.snapshot_path(snapshot.id()); + + let json = serde_json::to_string(snapshot).map_err(|e| crate::Error::DataInvalid { + message: format!("failed to serialize snapshot: {e}"), + source: Some(Box::new(e)), + })?; + + // Try rename-based atomic commit first, fall back to check-and-write. + // + // TODO: opendal's rename uses POSIX semantics which silently overwrites the target. + // The exists() check below narrows the race window but does not eliminate it. + // Java Paimon uses `lock.runWithLock(() -> !fileIO.exists(newPath) && callable.call())` + // for full mutual exclusion. We need an external lock mechanism (like Java's Lock + // interface) for backends without atomic rename-no-replace support. + let tmp_path = format!("{}.tmp-{}", target_path, uuid::Uuid::new_v4()); + let output = self.file_io.new_output(&tmp_path)?; + output.write(bytes::Bytes::from(json.clone())).await?; + + // Check before rename to avoid silent overwrite (opendal uses POSIX rename semantics) + if self.file_io.exists(&target_path).await? { + let _ = self.file_io.delete_file(&tmp_path).await; + return Ok(false); + } + + match self.file_io.rename(&tmp_path, &target_path).await { + Ok(()) => {} + Err(_) => { + // Rename not supported (e.g. memory/object store). + // Clean up temp file, then check-and-write. + let _ = self.file_io.delete_file(&tmp_path).await; + if self.file_io.exists(&target_path).await? { + return Ok(false); + } + let output = self.file_io.new_output(&target_path)?; + output.write(bytes::Bytes::from(json)).await?; + } + } + + // Update LATEST hint (best-effort) + let _ = self.write_latest_hint(snapshot.id()).await; + Ok(true) + } + + /// Update the LATEST hint file. + pub async fn write_latest_hint(&self, snapshot_id: i64) -> crate::Result<()> { + let hint_path = self.latest_hint_path(); + let output = self.file_io.new_output(&hint_path)?; + output + .write(bytes::Bytes::from(snapshot_id.to_string())) + .await + } + /// Returns the snapshot whose commit time is earlier than or equal to the given /// `timestamp_millis`. If no such snapshot exists, returns None. /// @@ -196,7 +270,6 @@ impl SnapshotManager { None => return Ok(None), }; - // If the earliest snapshot is already after the timestamp, no match. if (earliest_snapshot.time_millis() as i64) > timestamp_millis { return Ok(None); } @@ -220,3 +293,77 @@ impl SnapshotManager { Ok(result) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::CommitKind; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + async fn setup(table_path: &str) -> (FileIO, SnapshotManager) { + let file_io = test_file_io(); + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + let sm = SnapshotManager::new(file_io.clone(), table_path.to_string()); + (file_io, sm) + } + + fn test_snapshot(id: i64) -> Snapshot { + Snapshot::builder() + .version(3) + .id(id) + .schema_id(0) + .base_manifest_list("base-list".to_string()) + .delta_manifest_list("delta-list".to_string()) + .commit_user("test-user".to_string()) + .commit_identifier(0) + .commit_kind(CommitKind::APPEND) + .time_millis(1000 * id as u64) + .build() + } + + #[tokio::test] + async fn test_commit_snapshot_first() { + let (_, sm) = setup("memory:/test_commit_first").await; + let snap = test_snapshot(1); + let result = sm.commit_snapshot(&snap).await.unwrap(); + assert!(result); + + let loaded = sm.get_snapshot(1).await.unwrap(); + assert_eq!(loaded.id(), 1); + } + + #[tokio::test] + async fn test_commit_snapshot_already_exists() { + let (_, sm) = setup("memory:/test_commit_exists").await; + let snap = test_snapshot(1); + assert!(sm.commit_snapshot(&snap).await.unwrap()); + // Second commit to same id should return false + let result = sm.commit_snapshot(&snap).await.unwrap(); + assert!(!result); + } + + #[tokio::test] + async fn test_commit_updates_latest_hint() { + let (_, sm) = setup("memory:/test_commit_hint").await; + let snap = test_snapshot(1); + sm.commit_snapshot(&snap).await.unwrap(); + + let latest_id = sm.get_latest_snapshot_id().await.unwrap(); + assert_eq!(latest_id, Some(1)); + } + + #[tokio::test] + async fn test_write_latest_hint() { + let (_, sm) = setup("memory:/test_write_hint").await; + sm.write_latest_hint(42).await.unwrap(); + let hint = sm.read_hint(&sm.latest_hint_path()).await; + assert_eq!(hint, Some(42)); + } +} diff --git a/crates/paimon/src/table/table_commit.rs b/crates/paimon/src/table/table_commit.rs new file mode 100644 index 00000000..bc7f5c11 --- /dev/null +++ b/crates/paimon/src/table/table_commit.rs @@ -0,0 +1,995 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Table commit logic for Paimon write operations. +//! +//! Reference: [org.apache.paimon.operation.FileStoreCommitImpl](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java) +//! and [pypaimon table_commit.py / file_store_commit.py](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/) + +use crate::io::FileIO; +use crate::spec::stats::BinaryTableStats; +use crate::spec::FileKind; +use crate::spec::{ + datums_to_binary_row, extract_datum, BinaryRow, CommitKind, CoreOptions, Datum, Manifest, + ManifestEntry, ManifestFileMeta, ManifestList, PartitionStatistics, Predicate, + PredicateBuilder, Snapshot, +}; +use crate::table::commit_message::CommitMessage; +use crate::table::snapshot_commit::SnapshotCommit; +use crate::table::{SnapshotManager, Table, TableScan}; +use crate::Result; +use std::collections::HashMap; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Batch commit identifier (i64::MAX), same as Python's BATCH_COMMIT_IDENTIFIER. +const BATCH_COMMIT_IDENTIFIER: i64 = i64::MAX; + +/// Table commit logic for Paimon write operations. +/// +/// Provides atomic commit functionality including append, overwrite and truncate +pub struct TableCommit { + table: Table, + snapshot_manager: SnapshotManager, + snapshot_commit: Arc, + commit_user: String, + total_buckets: i32, + overwrite_partition: Option>, + // commit config + commit_max_retries: u32, + commit_timeout_ms: u64, + commit_min_retry_wait_ms: u64, + commit_max_retry_wait_ms: u64, + row_tracking_enabled: bool, +} + +impl TableCommit { + pub fn new( + table: Table, + commit_user: String, + overwrite_partition: Option>, + ) -> Self { + let snapshot_manager = SnapshotManager::new(table.file_io.clone(), table.location.clone()); + let snapshot_commit = if let Some(env) = &table.rest_env { + env.snapshot_commit() + } else { + Arc::new(crate::table::snapshot_commit::RenamingSnapshotCommit::new( + snapshot_manager.clone(), + )) + }; + let core_options = CoreOptions::new(table.schema().options()); + let total_buckets = core_options.bucket(); + let commit_max_retries = core_options.commit_max_retries(); + let commit_timeout_ms = core_options.commit_timeout_ms(); + let commit_min_retry_wait_ms = core_options.commit_min_retry_wait_ms(); + let commit_max_retry_wait_ms = core_options.commit_max_retry_wait_ms(); + let row_tracking_enabled = core_options.row_tracking_enabled(); + Self { + table, + snapshot_manager, + snapshot_commit, + commit_user, + total_buckets, + overwrite_partition, + commit_max_retries, + commit_timeout_ms, + commit_min_retry_wait_ms, + commit_max_retry_wait_ms, + row_tracking_enabled, + } + } + + /// Commit new files. Uses OVERWRITE mode if overwrite_partition was set + /// in the constructor, otherwise uses APPEND mode. + pub async fn commit(&self, commit_messages: Vec) -> Result<()> { + if commit_messages.is_empty() { + return Ok(()); + } + + let commit_entries = self.messages_to_entries(&commit_messages); + + if let Some(overwrite_partition) = &self.overwrite_partition { + let partition_predicate = if overwrite_partition.is_empty() { + None + } else { + Some(self.build_partition_predicate(overwrite_partition)?) + }; + self.try_commit( + CommitKind::OVERWRITE, + CommitEntriesPlan::Overwrite { + partition_predicate, + new_entries: commit_entries, + }, + ) + .await + } else { + self.try_commit( + CommitKind::APPEND, + CommitEntriesPlan::Static(commit_entries), + ) + .await + } + } + + /// Build a partition predicate from key-value pairs. + fn build_partition_predicate(&self, partition: &HashMap) -> Result { + let pb = PredicateBuilder::new(&self.table.schema().partition_fields()); + let predicates: Vec = partition + .iter() + .map(|(key, value)| pb.equal(key, value.clone())) + .collect::>>()?; + Ok(Predicate::and(predicates)) + } + + /// Drop specific partitions (OVERWRITE with only deletes). + pub async fn truncate_partitions(&self, partitions: Vec>) -> Result<()> { + if partitions.is_empty() { + return Ok(()); + } + + let predicates: Vec = partitions + .iter() + .map(|p| self.build_partition_predicate(p)) + .collect::>>()?; + + self.try_commit( + CommitKind::OVERWRITE, + CommitEntriesPlan::Overwrite { + partition_predicate: Some(Predicate::or(predicates)), + new_entries: vec![], + }, + ) + .await + } + + /// Truncate the entire table (OVERWRITE with no filter, only deletes). + pub async fn truncate_table(&self) -> Result<()> { + self.try_commit( + CommitKind::OVERWRITE, + CommitEntriesPlan::Overwrite { + partition_predicate: None, + new_entries: vec![], + }, + ) + .await + } + + /// Try to commit with retries. + async fn try_commit(&self, commit_kind: CommitKind, plan: CommitEntriesPlan) -> Result<()> { + let mut retry_count = 0u32; + let mut last_snapshot_for_dup_check: Option = None; + let start_time_ms = current_time_millis(); + + loop { + let latest_snapshot = self.snapshot_manager.get_latest_snapshot().await?; + let commit_entries = self.resolve_commit_entries(&plan, &latest_snapshot).await?; + + if commit_entries.is_empty() { + break; + } + + // Check for duplicate commit (idempotency on retry) + if self + .is_duplicate_commit(&last_snapshot_for_dup_check, &latest_snapshot, &commit_kind) + .await + { + break; + } + + let result = self + .try_commit_once(&commit_kind, commit_entries, &latest_snapshot) + .await?; + + match result { + true => break, + false => { + last_snapshot_for_dup_check = latest_snapshot; + } + } + + let elapsed_ms = current_time_millis() - start_time_ms; + if elapsed_ms > self.commit_timeout_ms || retry_count >= self.commit_max_retries { + let snap_id = last_snapshot_for_dup_check + .as_ref() + .map(|s| s.id() + 1) + .unwrap_or(1); + return Err(crate::Error::DataInvalid { + message: format!( + "Commit failed for snapshot {} after {} millis with {} retries, \ + there may exist commit conflicts between multiple jobs.", + snap_id, elapsed_ms, retry_count + ), + source: None, + }); + } + + self.commit_retry_wait(retry_count).await; + retry_count += 1; + } + + Ok(()) + } + + /// Single commit attempt. + async fn try_commit_once( + &self, + commit_kind: &CommitKind, + mut commit_entries: Vec, + latest_snapshot: &Option, + ) -> Result { + let new_snapshot_id = latest_snapshot.as_ref().map(|s| s.id() + 1).unwrap_or(1); + + // Row tracking + let mut next_row_id: Option = None; + if self.row_tracking_enabled { + commit_entries = self.assign_snapshot_id(new_snapshot_id, commit_entries); + let first_row_id_start = latest_snapshot + .as_ref() + .and_then(|s| s.next_row_id()) + .unwrap_or(0); + let (assigned, nrid) = + self.assign_row_tracking_meta(first_row_id_start, commit_entries); + commit_entries = assigned; + next_row_id = Some(nrid); + } + + let file_io = self.snapshot_manager.file_io(); + let manifest_dir = self.snapshot_manager.manifest_dir(); + + let unique_id = uuid::Uuid::new_v4(); + let base_manifest_list_name = format!("manifest-list-{unique_id}-0"); + let delta_manifest_list_name = format!("manifest-list-{unique_id}-1"); + let new_manifest_name = format!("manifest-{}-0", uuid::Uuid::new_v4()); + + let base_manifest_list_path = format!("{manifest_dir}/{base_manifest_list_name}"); + let delta_manifest_list_path = format!("{manifest_dir}/{delta_manifest_list_name}"); + let new_manifest_path = format!("{manifest_dir}/{new_manifest_name}"); + + // Write manifest file + let new_manifest_file_meta = self + .write_manifest_file( + file_io, + &new_manifest_path, + &new_manifest_name, + &commit_entries, + ) + .await?; + + // Write delta manifest list + ManifestList::write( + file_io, + &delta_manifest_list_path, + &[new_manifest_file_meta], + ) + .await?; + + // Read existing manifests (base + delta from previous snapshot) and write base manifest list + let mut total_record_count: i64 = 0; + let existing_manifest_files = if let Some(snap) = latest_snapshot { + let base_path = format!("{manifest_dir}/{}", snap.base_manifest_list()); + let delta_path = format!("{manifest_dir}/{}", snap.delta_manifest_list()); + let base_files = ManifestList::read(file_io, &base_path).await?; + let delta_files = ManifestList::read(file_io, &delta_path).await?; + if let Some(prev) = snap.total_record_count() { + total_record_count += prev; + } + let mut all = base_files; + all.extend(delta_files); + all + } else { + vec![] + }; + + ManifestList::write(file_io, &base_manifest_list_path, &existing_manifest_files).await?; + + // Calculate delta record count + let mut delta_record_count: i64 = 0; + for entry in &commit_entries { + match entry.kind() { + FileKind::Add => delta_record_count += entry.file().row_count, + FileKind::Delete => delta_record_count -= entry.file().row_count, + } + } + total_record_count += delta_record_count; + + let snapshot = Snapshot::builder() + .version(3) + .id(new_snapshot_id) + .schema_id(self.table.schema().id()) + .base_manifest_list(base_manifest_list_name) + .delta_manifest_list(delta_manifest_list_name) + .commit_user(self.commit_user.clone()) + .commit_identifier(BATCH_COMMIT_IDENTIFIER) + .commit_kind(commit_kind.clone()) + .time_millis(current_time_millis()) + .total_record_count(Some(total_record_count)) + .delta_record_count(Some(delta_record_count)) + .next_row_id(next_row_id) + .build(); + + let statistics = self.generate_partition_statistics(&commit_entries)?; + + self.snapshot_commit.commit(&snapshot, &statistics).await + } + + /// Write a manifest file and return its metadata. + async fn write_manifest_file( + &self, + file_io: &FileIO, + path: &str, + file_name: &str, + entries: &[ManifestEntry], + ) -> Result { + Manifest::write(file_io, path, entries).await?; + + let mut added_file_count: i64 = 0; + let mut deleted_file_count: i64 = 0; + for entry in entries { + match entry.kind() { + FileKind::Add => added_file_count += 1, + FileKind::Delete => deleted_file_count += 1, + } + } + + // Get file size + let status = file_io.get_status(path).await?; + + let partition_stats = self.compute_partition_stats(entries)?; + + Ok(ManifestFileMeta::new( + file_name.to_string(), + status.size as i64, + added_file_count, + deleted_file_count, + partition_stats, + self.table.schema().id(), + )) + } + + /// Check if this commit was already completed (idempotency). + async fn is_duplicate_commit( + &self, + last_snapshot_for_dup_check: &Option, + latest_snapshot: &Option, + commit_kind: &CommitKind, + ) -> bool { + if let (Some(prev_snap), Some(latest)) = (last_snapshot_for_dup_check, latest_snapshot) { + let start_id = prev_snap.id() + 1; + for snapshot_id in start_id..=latest.id() { + if let Ok(snap) = self.snapshot_manager.get_snapshot(snapshot_id).await { + if snap.commit_user() == self.commit_user && snap.commit_kind() == commit_kind { + return true; + } + } + } + } + false + } + + /// Resolve commit entries based on the plan type. + async fn resolve_commit_entries( + &self, + plan: &CommitEntriesPlan, + latest_snapshot: &Option, + ) -> Result> { + match plan { + CommitEntriesPlan::Static(entries) => Ok(entries.clone()), + CommitEntriesPlan::Overwrite { + partition_predicate, + new_entries, + } => { + self.generate_overwrite_entries( + latest_snapshot, + partition_predicate.as_ref(), + new_entries, + ) + .await + } + } + } + + /// Generate overwrite entries: DELETE existing + ADD new. + async fn generate_overwrite_entries( + &self, + latest_snapshot: &Option, + partition_predicate: Option<&Predicate>, + new_entries: &[ManifestEntry], + ) -> Result> { + let mut entries = Vec::new(); + + if let Some(snap) = latest_snapshot { + let scan = TableScan::new( + &self.table, + partition_predicate.cloned(), + vec![], + None, + None, + None, + ); + let current_entries = scan.plan_manifest_entries(snap).await?; + for entry in current_entries { + entries.push(entry.with_kind(FileKind::Delete)); + } + } + + entries.extend(new_entries.iter().cloned()); + Ok(entries) + } + + /// Assign snapshot ID as sequence number to entries. + fn assign_snapshot_id( + &self, + snapshot_id: i64, + entries: Vec, + ) -> Vec { + entries + .into_iter() + .map(|e| e.with_sequence_number(snapshot_id, snapshot_id)) + .collect() + } + + /// Assign row tracking metadata to new files. + fn assign_row_tracking_meta( + &self, + first_row_id_start: i64, + entries: Vec, + ) -> (Vec, i64) { + let mut result = Vec::with_capacity(entries.len()); + let mut start = first_row_id_start; + + for entry in entries { + if *entry.kind() == FileKind::Add + && entry.file().file_source == Some(0) // APPEND + && entry.file().first_row_id.is_none() + { + let row_count = entry.file().row_count; + result.push(entry.with_first_row_id(start)); + start += row_count; + } else { + result.push(entry); + } + } + + (result, start) + } + + /// Exponential backoff with jitter. + async fn commit_retry_wait(&self, retry_count: u32) { + let base_wait = self + .commit_min_retry_wait_ms + .saturating_mul(2u64.saturating_pow(retry_count)); + let wait = base_wait.min(self.commit_max_retry_wait_ms); + // Simple jitter: add up to 20% of wait time + let jitter = (wait as f64 * 0.2 * rand_f64()) as u64; + let total_wait = wait + jitter; + tokio::time::sleep(std::time::Duration::from_millis(total_wait)).await; + } + + /// Compute partition stats (min/max/null_counts) across all entries. + fn compute_partition_stats(&self, entries: &[ManifestEntry]) -> Result { + let partition_fields = self.table.schema().partition_fields(); + let num_fields = partition_fields.len(); + + if num_fields == 0 || entries.is_empty() { + return Ok(BinaryTableStats::new(vec![], vec![], vec![])); + } + + let data_types: Vec<_> = partition_fields + .iter() + .map(|f| f.data_type().clone()) + .collect(); + let mut mins: Vec> = vec![None; num_fields]; + let mut maxs: Vec> = vec![None; num_fields]; + let mut null_counts: Vec = vec![0; num_fields]; + + for entry in entries { + let partition_bytes = entry.partition(); + if partition_bytes.is_empty() { + continue; + } + let row = BinaryRow::from_serialized_bytes(partition_bytes)?; + for i in 0..num_fields { + match extract_datum(&row, i, &data_types[i])? { + Some(datum) => { + mins[i] = Some(match mins[i].take() { + Some(cur) if cur <= datum => cur, + Some(_) => datum.clone(), + None => datum.clone(), + }); + maxs[i] = Some(match maxs[i].take() { + Some(cur) if cur >= datum => cur, + Some(_) => datum, + None => datum, + }); + } + None => { + null_counts[i] += 1; + } + } + } + } + + let min_datums: Vec<_> = mins.iter().zip(data_types.iter()).collect(); + let max_datums: Vec<_> = maxs.iter().zip(data_types.iter()).collect(); + + let min_bytes = datums_to_binary_row(&min_datums); + let max_bytes = datums_to_binary_row(&max_datums); + let null_counts = null_counts.into_iter().map(Some).collect(); + + Ok(BinaryTableStats::new(min_bytes, max_bytes, null_counts)) + } + + /// Generate per-partition statistics from commit entries. + /// + /// Reference: [pypaimon FileStoreCommit._generate_partition_statistics](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/file_store_commit.py) + fn generate_partition_statistics( + &self, + entries: &[ManifestEntry], + ) -> Result> { + let partition_fields = self.table.schema().partition_fields(); + let data_types: Vec<_> = partition_fields + .iter() + .map(|f| f.data_type().clone()) + .collect(); + let partition_keys: Vec<_> = self + .table + .schema() + .partition_keys() + .iter() + .map(|s| s.to_string()) + .collect(); + + let mut stats_map: HashMap, PartitionStatistics> = HashMap::new(); + + for entry in entries { + let partition_bytes = entry.partition().to_vec(); + let is_add = *entry.kind() == FileKind::Add; + let sign: i64 = if is_add { 1 } else { -1 }; + + let file = entry.file(); + let file_creation_time = file + .creation_time + .map(|t| t.timestamp_millis() as u64) + .unwrap_or_else(current_time_millis); + + let stats = stats_map.entry(partition_bytes.clone()).or_insert_with(|| { + // Parse partition spec from BinaryRow + let spec = self + .parse_partition_spec(&partition_bytes, &partition_keys, &data_types) + .unwrap_or_default(); + PartitionStatistics { + spec, + record_count: 0, + file_size_in_bytes: 0, + file_count: 0, + last_file_creation_time: 0, + total_buckets: entry.total_buckets(), + } + }); + + stats.record_count += sign * file.row_count; + stats.file_size_in_bytes += sign * file.file_size; + stats.file_count += sign; + stats.last_file_creation_time = stats.last_file_creation_time.max(file_creation_time); + } + + Ok(stats_map.into_values().collect()) + } + + /// Parse partition BinaryRow bytes into a HashMap. + fn parse_partition_spec( + &self, + partition_bytes: &[u8], + partition_keys: &[String], + data_types: &[crate::spec::DataType], + ) -> Result> { + let mut spec = HashMap::new(); + if partition_bytes.is_empty() || partition_keys.is_empty() { + return Ok(spec); + } + let row = BinaryRow::from_serialized_bytes(partition_bytes)?; + for (i, key) in partition_keys.iter().enumerate() { + if let Some(datum) = extract_datum(&row, i, &data_types[i])? { + spec.insert(key.clone(), datum.to_string()); + } + } + Ok(spec) + } + + /// Convert commit messages to manifest entries (ADD kind). + fn messages_to_entries(&self, messages: &[CommitMessage]) -> Vec { + messages + .iter() + .flat_map(|msg| { + msg.new_files.iter().map(move |file| { + ManifestEntry::new( + FileKind::Add, + msg.partition.clone(), + msg.bucket, + self.total_buckets, + file.clone(), + 2, + ) + }) + }) + .collect() + } +} + +/// Plan for resolving commit entries. +enum CommitEntriesPlan { + /// Static entries (for APPEND). + Static(Vec), + /// Overwrite with optional partition predicate. + Overwrite { + partition_predicate: Option, + new_entries: Vec, + }, +} + +fn current_time_millis() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64 +} + +/// Random f64 in [0, 1) using RandomState for per-process entropy. +fn rand_f64() -> f64 { + use std::collections::hash_map::RandomState; + use std::hash::{BuildHasher, Hasher}; + let mut hasher = RandomState::new().build_hasher(); + hasher.write_u64( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() as u64, + ); + (hasher.finish() as f64) / (u64::MAX as f64) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::catalog::Identifier; + use crate::io::FileIOBuilder; + use crate::spec::stats::BinaryTableStats; + use crate::spec::{BinaryRowBuilder, DataFileMeta, ManifestList, TableSchema}; + use chrono::{DateTime, Utc}; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + fn test_schema() -> TableSchema { + use crate::spec::{DataType, IntType, Schema, VarCharType}; + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::string_type())) + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_partitioned_schema() -> TableSchema { + use crate::spec::{DataType, IntType, Schema, VarCharType}; + let schema = Schema::builder() + .column("pt", DataType::VarChar(VarCharType::string_type())) + .column("id", DataType::Int(IntType::new())) + .partition_keys(["pt"]) + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + test_schema(), + None, + ) + } + + fn test_partitioned_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_table"), + table_path.to_string(), + test_partitioned_schema(), + None, + ) + } + + fn test_data_file(name: &str, row_count: i64) -> DataFileMeta { + DataFileMeta { + file_name: name.to_string(), + file_size: 1024, + row_count, + min_key: vec![], + max_key: vec![], + key_stats: BinaryTableStats::new(vec![], vec![], vec![]), + value_stats: BinaryTableStats::new(vec![], vec![], vec![]), + min_sequence_number: 0, + max_sequence_number: 0, + schema_id: 0, + level: 0, + extra_files: vec![], + creation_time: Some( + "2024-09-06T07:45:55.039+00:00" + .parse::>() + .unwrap(), + ), + delete_row_count: Some(0), + embedded_index: None, + first_row_id: None, + write_cols: None, + external_path: None, + file_source: None, + value_stats_cols: None, + } + } + + fn setup_commit(file_io: &FileIO, table_path: &str) -> TableCommit { + let table = test_table(file_io, table_path); + TableCommit::new(table, "test-user".to_string(), None) + } + + fn setup_partitioned_commit(file_io: &FileIO, table_path: &str) -> TableCommit { + let table = test_partitioned_table(file_io, table_path); + TableCommit::new(table, "test-user".to_string(), None) + } + + fn partition_bytes(pt: &str) -> Vec { + use crate::spec::{DataType, VarCharType}; + let datum = Datum::String(pt.to_string()); + let dt = DataType::VarChar(VarCharType::string_type()); + let datums = vec![(&datum, &dt)]; + BinaryRow::from_datums(&datums).unwrap(); + let mut builder = BinaryRowBuilder::new(1); + if pt.len() <= 7 { + builder.write_string_inline(0, pt); + } else { + builder.write_string(0, pt); + } + builder.build_serialized() + } + + async fn setup_dirs(file_io: &FileIO, table_path: &str) { + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_append_commit() { + let file_io = test_file_io(); + let table_path = "memory:/test_append_commit"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + + let messages = vec![CommitMessage::new( + vec![], + 0, + vec![test_data_file("data-0.parquet", 100)], + )]; + + commit.commit(messages).await.unwrap(); + + // Verify snapshot was created + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 1); + assert_eq!(snapshot.commit_identifier(), BATCH_COMMIT_IDENTIFIER); + assert_eq!(snapshot.total_record_count(), Some(100)); + assert_eq!(snapshot.delta_record_count(), Some(100)); + + // Verify manifest list was written + let manifest_dir = format!("{table_path}/manifest"); + let delta_path = format!("{manifest_dir}/{}", snapshot.delta_manifest_list()); + let delta_metas = ManifestList::read(&file_io, &delta_path).await.unwrap(); + assert_eq!(delta_metas.len(), 1); + assert_eq!(delta_metas[0].num_added_files(), 1); + + // Verify manifest entries + let manifest_path = format!("{manifest_dir}/{}", delta_metas[0].file_name()); + let entries = Manifest::read(&file_io, &manifest_path).await.unwrap(); + assert_eq!(entries.len(), 1); + assert_eq!(*entries[0].kind(), FileKind::Add); + assert_eq!(entries[0].file().file_name, "data-0.parquet"); + } + + #[tokio::test] + async fn test_multiple_appends() { + let file_io = test_file_io(); + let table_path = "memory:/test_multiple_appends"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + + // First commit + commit + .commit(vec![CommitMessage::new( + vec![], + 0, + vec![test_data_file("data-0.parquet", 100)], + )]) + .await + .unwrap(); + + // Second commit + commit + .commit(vec![CommitMessage::new( + vec![], + 0, + vec![test_data_file("data-1.parquet", 200)], + )]) + .await + .unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 2); + assert_eq!(snapshot.total_record_count(), Some(300)); + assert_eq!(snapshot.delta_record_count(), Some(200)); + } + + #[tokio::test] + async fn test_empty_commit_is_noop() { + let file_io = test_file_io(); + let table_path = "memory:/test_empty_commit"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + commit.commit(vec![]).await.unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap(); + assert!(snapshot.is_none()); + } + + #[tokio::test] + async fn test_truncate_table() { + let file_io = test_file_io(); + let table_path = "memory:/test_truncate"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_commit(&file_io, table_path); + + // Append some data first + commit + .commit(vec![CommitMessage::new( + vec![], + 0, + vec![test_data_file("data-0.parquet", 100)], + )]) + .await + .unwrap(); + + // Truncate + commit.truncate_table().await.unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 2); + assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE); + assert_eq!(snapshot.total_record_count(), Some(0)); + assert_eq!(snapshot.delta_record_count(), Some(-100)); + } + + #[tokio::test] + async fn test_overwrite_partition() { + let file_io = test_file_io(); + let table_path = "memory:/test_overwrite_partition"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_partitioned_commit(&file_io, table_path); + + // Append data for partition "a" and "b" + commit + .commit(vec![ + CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a.parquet", 100)], + ), + CommitMessage::new( + partition_bytes("b"), + 0, + vec![test_data_file("data-b.parquet", 200)], + ), + ]) + .await + .unwrap(); + + // Overwrite partition "a" with new data + let mut overwrite_partition = HashMap::new(); + overwrite_partition.insert("pt".to_string(), Datum::String("a".to_string())); + + let table = test_partitioned_table(&file_io, table_path); + let overwrite_commit = + TableCommit::new(table, "test-user".to_string(), Some(overwrite_partition)); + + overwrite_commit + .commit(vec![CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a2.parquet", 50)], + )]) + .await + .unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 2); + assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE); + // 300 - 100 (delete a) + 50 (add a2) = 250 + assert_eq!(snapshot.total_record_count(), Some(250)); + } + + #[tokio::test] + async fn test_drop_partitions() { + let file_io = test_file_io(); + let table_path = "memory:/test_drop_partitions"; + setup_dirs(&file_io, table_path).await; + + let commit = setup_partitioned_commit(&file_io, table_path); + + // Append data for partitions "a", "b", "c" + commit + .commit(vec![ + CommitMessage::new( + partition_bytes("a"), + 0, + vec![test_data_file("data-a.parquet", 100)], + ), + CommitMessage::new( + partition_bytes("b"), + 0, + vec![test_data_file("data-b.parquet", 200)], + ), + CommitMessage::new( + partition_bytes("c"), + 0, + vec![test_data_file("data-c.parquet", 300)], + ), + ]) + .await + .unwrap(); + + // Drop partitions "a" and "c" + let partitions = vec![ + HashMap::from([("pt".to_string(), Datum::String("a".to_string()))]), + HashMap::from([("pt".to_string(), Datum::String("c".to_string()))]), + ]; + commit.truncate_partitions(partitions).await.unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 2); + assert_eq!(snapshot.commit_kind(), &CommitKind::OVERWRITE); + // 600 - 100 (a) - 300 (c) = 200 + assert_eq!(snapshot.total_record_count(), Some(200)); + } +} diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index e21f00b7..06fe87a0 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -440,33 +440,23 @@ impl<'a> TableScan<'a> { limited_splits } - async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result { + /// Read all manifest entries from a snapshot, applying filters and merging. + /// + /// This is the shared entry point used by both `plan_snapshot` (scan) and + /// `TableCommit` (overwrite). Filters include partition predicate, data + /// predicates, and bucket predicate. + pub(crate) async fn plan_manifest_entries( + &self, + snapshot: &Snapshot, + ) -> crate::Result> { let file_io = self.table.file_io(); let table_path = self.table.location(); let core_options = CoreOptions::new(self.table.schema().options()); let deletion_vectors_enabled = core_options.deletion_vectors_enabled(); let data_evolution_enabled = core_options.data_evolution_enabled(); - let target_split_size = core_options.source_split_target_size(); - let open_file_cost = core_options.source_split_open_file_cost(); - // Resolve partition fields for manifest-file-level stats pruning. - let partition_keys = self.table.schema().partition_keys(); - let partition_fields: Vec = self - .table - .schema() - .partition_keys() - .iter() - .filter_map(|key| { - self.table - .schema() - .fields() - .iter() - .find(|f| f.name() == key) - .cloned() - }) - .collect(); + let partition_fields = self.table.schema().partition_fields(); - // Data-evolution tables must not prune data files independently. let pushdown_data_predicates = if data_evolution_enabled { &[][..] } else { @@ -475,8 +465,6 @@ impl<'a> TableScan<'a> { let has_primary_keys = !self.table.schema().primary_keys().is_empty(); - // Compute bucket predicate and key fields for per-entry bucket pruning. - // Only supported for the default bucket function (MurmurHash3-based). let bucket_key_fields: Vec = if self.bucket_predicate.is_none() || !core_options.is_default_bucket_function() { Vec::new() @@ -509,7 +497,7 @@ impl<'a> TableScan<'a> { let entries = read_all_manifest_entries( file_io, table_path, - &snapshot, + snapshot, deletion_vectors_enabled, has_primary_keys, self.partition_predicate.as_ref(), @@ -521,7 +509,19 @@ impl<'a> TableScan<'a> { &bucket_key_fields, ) .await?; - let entries = merge_manifest_entries(entries); + Ok(merge_manifest_entries(entries)) + } + + async fn plan_snapshot(&self, snapshot: Snapshot) -> crate::Result { + let file_io = self.table.file_io(); + let table_path = self.table.location(); + let core_options = CoreOptions::new(self.table.schema().options()); + let data_evolution_enabled = core_options.data_evolution_enabled(); + let target_split_size = core_options.source_split_target_size(); + let open_file_cost = core_options.source_split_open_file_cost(); + let partition_keys = self.table.schema().partition_keys(); + + let entries = self.plan_manifest_entries(&snapshot).await?; if entries.is_empty() { return Ok(Plan::new(Vec::new())); } diff --git a/crates/paimon/src/table/write_builder.rs b/crates/paimon/src/table/write_builder.rs new file mode 100644 index 00000000..d6458cf8 --- /dev/null +++ b/crates/paimon/src/table/write_builder.rs @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! WriteBuilder for table write API. +//! +//! Reference: [pypaimon WriteBuilder](https://github.com/apache/paimon/blob/master/paimon-python/pypaimon/write/write_builder.py) + +use crate::spec::Datum; +use crate::table::{Table, TableCommit}; +use std::collections::HashMap; +use uuid::Uuid; + +/// Builder for creating table writers and committers. +/// +/// Provides `new_write` (TODO) and `new_commit` methods, with optional +/// `overwrite` support for partition-level overwrites. +pub struct WriteBuilder<'a> { + table: &'a Table, + commit_user: String, + overwrite_partition: Option>, +} + +impl<'a> WriteBuilder<'a> { + pub fn new(table: &'a Table) -> Self { + Self { + table, + commit_user: Uuid::new_v4().to_string(), + overwrite_partition: None, + } + } + + /// Set overwrite mode. If `partition` is None, overwrites the entire table. + /// If `partition` is Some, overwrites only the specified partition. + pub fn overwrite(&mut self, partition: Option>) -> &mut Self { + self.overwrite_partition = Some(partition.unwrap_or_default()); + self + } + + /// Create a new TableCommit for committing write results. + pub fn new_commit(&self) -> TableCommit { + TableCommit::new( + self.table.clone(), + self.commit_user.clone(), + self.overwrite_partition.clone(), + ) + } + + // TODO: pub fn new_write(&self) -> TableWrite { ... } +} diff --git a/crates/paimon/src/tantivy/directory.rs b/crates/paimon/src/tantivy/directory.rs index 32c3058a..22440a60 100644 --- a/crates/paimon/src/tantivy/directory.rs +++ b/crates/paimon/src/tantivy/directory.rs @@ -368,9 +368,14 @@ mod tests { #[tokio::test] async fn test_read_file_from_archive() { let dir = make_test_dir().await; - // Read any file from the archive and verify it's non-empty. - let first_path = dir.files.keys().next().unwrap().clone(); - let handle = dir.get_file_handle(&first_path).unwrap(); + // Find a non-empty file (some tantivy index files can be 0 bytes). + let non_empty_path = dir + .files + .iter() + .find(|(_, meta)| meta.length > 0) + .map(|(p, _)| p.clone()) + .expect("archive should contain at least one non-empty file"); + let handle = dir.get_file_handle(&non_empty_path).unwrap(); assert!(handle.len() > 0); let data = handle.read_bytes(0..handle.len()).unwrap(); assert_eq!(data.len(), handle.len());