diff --git a/crates/fluss/src/client/admin.rs b/crates/fluss/src/client/admin.rs index d3d5a5e3..7015e573 100644 --- a/crates/fluss/src/client/admin.rs +++ b/crates/fluss/src/client/admin.rs @@ -18,14 +18,27 @@ use crate::client::metadata::Metadata; use crate::cluster::ServerNode; use crate::metadata::{ - DatabaseDescriptor, DatabaseInfo, JsonSerde, LakeSnapshot, PartitionInfo, PartitionSpec, - PhysicalTablePath, Schema, SchemaInfo, TableBucket, TableDescriptor, TableInfo, TablePath, + AclFilter, AclInfo, AcquireKvSnapshotLeaseResult, ActiveKvSnapshots, AlterConfig, + AlterTableChanges, BucketStatsRequest, ClusterHealth, CreateAclResult, DatabaseDescriptor, + DatabaseInfo, DatabaseSummary, DescribeConfig, DropAclsFilterResult, GoalType, JsonSerde, + KvSnapshotLeaseForTable, KvSnapshotMetadata, LakeSnapshot, LakeSnapshotInfo, LatestKvSnapshots, + PartitionInfo, PartitionSpec, PhysicalTablePath, ProducerOffsets, ProducerTableOffsets, + RebalanceProgress, RegisterProducerResult, RemoteLogManifestEntry, Schema, SchemaInfo, + ServerTag, TableBucket, TableDescriptor, TableInfo, TablePath, TableStats, }; use crate::rpc::message::{ + AcquireKvSnapshotLeaseRequest, AddServerTagRequest, AlterClusterConfigsRequest, + AlterDatabaseRequest, AlterTableRequest, CancelRebalanceRequest, CreateAclsRequest, CreateDatabaseRequest, CreatePartitionRequest, CreateTableRequest, DatabaseExistsRequest, - DropDatabaseRequest, DropPartitionRequest, DropTableRequest, GetDatabaseInfoRequest, - GetLatestLakeSnapshotRequest, GetTableRequest, GetTableSchemaRequestMsg, ListDatabasesRequest, - ListPartitionInfosRequest, ListTablesRequest, TableExistsRequest, + DeleteProducerOffsetsRequest, DescribeClusterConfigsRequest, DropAclsRequest, + DropDatabaseRequest, DropKvSnapshotLeaseRequest, DropPartitionRequest, DropTableRequest, + GetClusterHealthRequest, GetDatabaseInfoRequest, GetKvSnapshotMetadataRequest, + GetLakeSnapshotRequest, GetLatestKvSnapshotsRequest, GetLatestLakeSnapshotRequest, + GetProducerOffsetsRequest, GetTableRequest, GetTableSchemaRequestMsg, GetTableStatsRequest, + ListAclsRequest, ListDatabaseSummariesRequest, ListDatabasesRequest, ListKvSnapshotsRequest, + ListPartitionInfosRequest, ListRebalanceProgressRequest, ListRemoteLogManifestsRequest, + ListTablesRequest, RebalanceRequest, RegisterProducerOffsetsRequest, + ReleaseKvSnapshotLeaseRequest, RemoveServerTagRequest, TableExistsRequest, }; use crate::rpc::message::{ListOffsetsRequest, OffsetSpec}; use crate::rpc::{RpcClient, ServerConnection}; @@ -483,4 +496,388 @@ impl FlussAdmin { } Ok(tasks) } + + /// List database summaries (name, created_time, table_count). + pub async fn list_database_summaries(&self) -> Result> { + let response = self + .admin_gateway() + .await? + .request(ListDatabaseSummariesRequest::new()) + .await?; + Ok(response + .database_summary + .iter() + .map(DatabaseSummary::from_pb) + .collect()) + } + + /// Alter a database: config changes and/or an updated comment. + pub async fn alter_database( + &self, + name: &str, + config_changes: Vec, + comment: Option<&str>, + ignore_if_not_exists: bool, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AlterDatabaseRequest::new( + name, + ignore_if_not_exists, + config_changes, + comment, + )) + .await?; + Ok(()) + } + + /// Alter a table: config changes plus any combination of add/drop/rename/modify columns. + /// Bundle the column-level edits in [`AlterTableChanges`]. + pub async fn alter_table( + &self, + table_path: &TablePath, + ignore_if_not_exists: bool, + changes: AlterTableChanges, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AlterTableRequest::new( + table_path, + ignore_if_not_exists, + changes.config_changes, + changes.add_columns, + changes.drop_columns, + changes.rename_columns, + changes.modify_columns, + )) + .await?; + Ok(()) + } + + /// Get table statistics for buckets. Pass empty `target_columns` to request stats for all columns. + pub async fn get_table_stats( + &self, + table_id: TableId, + buckets_req: Vec, + target_columns: Vec, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetTableStatsRequest::new( + table_id, + buckets_req, + target_columns, + )) + .await?; + Ok(TableStats::from_pb(&response)) + } + + /// Get the latest KV snapshots for a table (optionally scoped to one partition). + pub async fn get_latest_kv_snapshots( + &self, + table_path: &TablePath, + partition_name: Option<&str>, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetLatestKvSnapshotsRequest::new(table_path, partition_name)) + .await?; + Ok(LatestKvSnapshots::from_pb(&response)) + } + + /// Get KV snapshot metadata (manifest file list). + pub async fn get_kv_snapshot_metadata( + &self, + table_id: TableId, + partition_id: Option, + bucket_id: BucketId, + snapshot_id: i64, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetKvSnapshotMetadataRequest::new( + table_id, + partition_id, + bucket_id, + snapshot_id, + )) + .await?; + Ok(KvSnapshotMetadata::from_pb(&response)) + } + + /// Acquire a KV snapshot lease. Returns the snapshots the server could not lease. + pub async fn create_kv_snapshot_lease( + &self, + lease_id: &str, + lease_duration_ms: i64, + snapshots_to_lease: Vec, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(AcquireKvSnapshotLeaseRequest::new( + lease_id, + lease_duration_ms, + snapshots_to_lease, + )) + .await?; + Ok(AcquireKvSnapshotLeaseResult::from_pb(&response)) + } + + /// Get a specific lake snapshot for a table. + pub async fn get_lake_snapshot( + &self, + table_path: &TablePath, + snapshot_id: Option, + readable: Option, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetLakeSnapshotRequest::new( + table_path, + snapshot_id, + readable, + )) + .await?; + Ok(LakeSnapshotInfo::from_pb(&response)) + } + + /// Create ACLs. Returns one result per submitted ACL (success or per-ACL error). + pub async fn create_acls(&self, acls: Vec) -> Result> { + let response = self + .admin_gateway() + .await? + .request(CreateAclsRequest::new(acls)) + .await?; + response + .acl_res + .iter() + .map(CreateAclResult::from_pb) + .collect() + } + + /// List ACLs matching a filter. + pub async fn list_acls(&self, acl_filter: AclFilter) -> Result> { + let response = self + .admin_gateway() + .await? + .request(ListAclsRequest::new(acl_filter)) + .await?; + response.acl.iter().map(AclInfo::from_pb).collect() + } + + /// Drop ACLs matching filters. Returns one result per submitted filter. + pub async fn drop_acls( + &self, + acl_filters: Vec, + ) -> Result> { + let response = self + .admin_gateway() + .await? + .request(DropAclsRequest::new(acl_filters)) + .await?; + response + .filter_results + .iter() + .map(DropAclsFilterResult::from_pb) + .collect() + } + + /// Describe cluster configuration. + pub async fn describe_cluster_configs(&self) -> Result> { + let response = self + .admin_gateway() + .await? + .request(DescribeClusterConfigsRequest::new()) + .await?; + Ok(response + .configs + .iter() + .map(DescribeConfig::from_pb) + .collect()) + } + + /// Alter cluster configuration. + pub async fn alter_cluster_configs(&self, alter_configs: Vec) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AlterClusterConfigsRequest::new(alter_configs)) + .await?; + Ok(()) + } + + /// Add a tag to servers. + pub async fn add_server_tag(&self, server_ids: Vec, server_tag: ServerTag) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(AddServerTagRequest::new(server_ids, server_tag)) + .await?; + Ok(()) + } + + /// Remove a tag from servers. + pub async fn remove_server_tag( + &self, + server_ids: Vec, + server_tag: ServerTag, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(RemoveServerTagRequest::new(server_ids, server_tag)) + .await?; + Ok(()) + } + + /// Trigger a rebalance. Returns the rebalance id assigned by the server. + pub async fn rebalance(&self, goals: Vec) -> Result { + let response = self + .admin_gateway() + .await? + .request(RebalanceRequest::new(goals)) + .await?; + Ok(response.rebalance_id) + } + + /// List rebalance progress (for a specific rebalance id, or all in-flight ones if `None`). + pub async fn list_rebalance_progress( + &self, + rebalance_id: Option<&str>, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(ListRebalanceProgressRequest::new(rebalance_id)) + .await?; + RebalanceProgress::from_pb(&response) + } + + /// Cancel a rebalance. + pub async fn cancel_rebalance(&self, rebalance_id: Option<&str>) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(CancelRebalanceRequest::new(rebalance_id)) + .await?; + Ok(()) + } + + /// Register producer offsets. Returns the server-side registration outcome (if any). + pub async fn register_producer_offsets( + &self, + producer_id: &str, + table_offsets: Vec, + ttl_ms: Option, + ) -> Result> { + let response = self + .admin_gateway() + .await? + .request(RegisterProducerOffsetsRequest::new( + producer_id, + table_offsets, + ttl_ms, + )) + .await?; + response + .result + .map(RegisterProducerResult::try_from_i32) + .transpose() + } + + /// Get producer offsets. + pub async fn get_producer_offsets(&self, producer_id: &str) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetProducerOffsetsRequest::new(producer_id)) + .await?; + Ok(ProducerOffsets::from_pb(&response)) + } + + /// Delete producer offsets. + pub async fn delete_producer_offsets(&self, producer_id: &str) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(DeleteProducerOffsetsRequest::new(producer_id)) + .await?; + Ok(()) + } + + /// Get cluster health status. + pub async fn get_cluster_health(&self) -> Result { + let response = self + .admin_gateway() + .await? + .request(GetClusterHealthRequest::new()) + .await?; + ClusterHealth::from_pb(&response) + } + + /// List remote log manifests for a table (optionally scoped to one partition). + pub async fn list_remote_log_manifests( + &self, + table_id: TableId, + partition_id: Option, + ) -> Result> { + let response = self + .admin_gateway() + .await? + .request(ListRemoteLogManifestsRequest::new(table_id, partition_id)) + .await?; + Ok(response + .manifests + .iter() + .map(RemoteLogManifestEntry::from_pb) + .collect()) + } + + /// List active KV snapshots for a table (optionally scoped to one partition). + pub async fn list_kv_snapshots( + &self, + table_id: TableId, + partition_id: Option, + ) -> Result { + let response = self + .admin_gateway() + .await? + .request(ListKvSnapshotsRequest::new(table_id, partition_id)) + .await?; + Ok(ActiveKvSnapshots::from_pb(&response)) + } + + /// Release specific bucket snapshots from a KV snapshot lease. + pub async fn release_kv_snapshot_lease( + &self, + lease_id: &str, + buckets_to_release: Vec, + ) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(ReleaseKvSnapshotLeaseRequest::new( + lease_id, + buckets_to_release, + )) + .await?; + Ok(()) + } + + /// Drop an entire KV snapshot lease. + pub async fn drop_kv_snapshot_lease(&self, lease_id: &str) -> Result<()> { + let _response = self + .admin_gateway() + .await? + .request(DropKvSnapshotLeaseRequest::new(lease_id)) + .await?; + Ok(()) + } } diff --git a/crates/fluss/src/metadata/acl.rs b/crates/fluss/src/metadata/acl.rs index 84ec97af..f69ec952 100644 --- a/crates/fluss/src/metadata/acl.rs +++ b/crates/fluss/src/metadata/acl.rs @@ -16,7 +16,9 @@ // under the License. use crate::error::{Error, Result}; -use crate::proto::{PbAclFilter, PbAclInfo}; +use crate::proto::{ + PbAclFilter, PbAclInfo, PbCreateAclRespInfo, PbDropAclsFilterResult, PbDropAclsMatchingAcl, +}; /// Mirrors Java `org.apache.fluss.security.acl.ResourceType`. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -197,6 +199,76 @@ impl AclFilter { } } +/// One per ACL submitted to `create_acls`: success or a server-side error. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CreateAclResult { + pub acl: AclInfo, + pub error: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AclError { + pub code: i32, + pub message: Option, +} + +impl CreateAclResult { + pub fn from_pb(pb: &PbCreateAclRespInfo) -> Result { + Ok(Self { + acl: AclInfo::from_pb(&pb.acl)?, + error: pb.error_code.map(|code| AclError { + code, + message: pb.error_message.clone(), + }), + }) + } +} + +/// One ACL matched by a filter in `drop_acls`. Reports the bound ACL and any +/// server-side error encountered while dropping it. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DropAclMatchingAcl { + pub acl: AclInfo, + pub error: Option, +} + +impl DropAclMatchingAcl { + pub fn from_pb(pb: &PbDropAclsMatchingAcl) -> Result { + Ok(Self { + acl: AclInfo::from_pb(&pb.acl)?, + error: pb.error_code.map(|code| AclError { + code, + message: pb.error_message.clone(), + }), + }) + } +} + +/// One per filter submitted to `drop_acls`: the matching ACLs that were +/// targeted plus any filter-level error. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DropAclsFilterResult { + pub matching_acls: Vec, + pub error: Option, +} + +impl DropAclsFilterResult { + pub fn from_pb(pb: &PbDropAclsFilterResult) -> Result { + let matching_acls = pb + .matching_acls + .iter() + .map(DropAclMatchingAcl::from_pb) + .collect::>>()?; + Ok(Self { + matching_acls, + error: pb.error_code.map(|code| AclError { + code, + message: pb.error_message.clone(), + }), + }) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/fluss/src/metadata/cluster_health.rs b/crates/fluss/src/metadata/cluster_health.rs new file mode 100644 index 00000000..938d1f5f --- /dev/null +++ b/crates/fluss/src/metadata/cluster_health.rs @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; +use crate::proto::GetClusterHealthResponse; + +/// Mirrors Java `org.apache.fluss.client.admin.ClusterHealthStatus`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ClusterHealthStatus { + Green, + Yellow, + Red, + Unknown, +} + +impl ClusterHealthStatus { + pub fn to_i32(self) -> i32 { + match self { + Self::Green => 0, + Self::Yellow => 1, + Self::Red => 2, + Self::Unknown => 3, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::Green), + 1 => Ok(Self::Yellow), + 2 => Ok(Self::Red), + 3 => Ok(Self::Unknown), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported ClusterHealthStatus: {value}"), + }), + } + } +} + +/// Result of `get_cluster_health`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ClusterHealth { + pub num_replicas: i32, + pub in_sync_replicas: i32, + pub num_leader_replicas: i32, + pub active_leader_replicas: i32, + pub status: ClusterHealthStatus, +} + +impl ClusterHealth { + pub fn from_pb(pb: &GetClusterHealthResponse) -> Result { + Ok(Self { + num_replicas: pb.num_replicas, + in_sync_replicas: pb.in_sync_replicas, + num_leader_replicas: pb.num_leader_replicas, + active_leader_replicas: pb.active_leader_replicas, + status: ClusterHealthStatus::try_from_i32(pb.status)?, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cluster_health_status_roundtrip() { + for s in [ + ClusterHealthStatus::Green, + ClusterHealthStatus::Yellow, + ClusterHealthStatus::Red, + ClusterHealthStatus::Unknown, + ] { + assert_eq!(ClusterHealthStatus::try_from_i32(s.to_i32()).unwrap(), s); + } + } + + #[test] + fn test_cluster_health_status_unknown_value() { + assert!(ClusterHealthStatus::try_from_i32(99).is_err()); + } + + #[test] + fn test_cluster_health_from_pb() { + let pb = GetClusterHealthResponse { + num_replicas: 5, + in_sync_replicas: 4, + num_leader_replicas: 3, + active_leader_replicas: 3, + status: 1, + }; + let h = ClusterHealth::from_pb(&pb).unwrap(); + assert_eq!(h.num_replicas, 5); + assert_eq!(h.status, ClusterHealthStatus::Yellow); + } +} diff --git a/crates/fluss/src/metadata/config.rs b/crates/fluss/src/metadata/config.rs index 4e045c45..4dd7d8bc 100644 --- a/crates/fluss/src/metadata/config.rs +++ b/crates/fluss/src/metadata/config.rs @@ -16,7 +16,7 @@ // under the License. use crate::error::{Error, Result}; -use crate::proto::PbAlterConfig; +use crate::proto::{PbAlterConfig, PbDescribeConfig}; /// Mirrors Java `org.apache.fluss.config.cluster.AlterConfigOpType`. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -88,6 +88,33 @@ impl AlterConfig { } } +/// One entry in the response of `describe_cluster_configs`. Mirrors Java's +/// `org.apache.fluss.config.cluster.DescribeConfig`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DescribeConfig { + pub config_key: String, + pub config_value: Option, + pub config_source: String, +} + +impl DescribeConfig { + pub fn from_pb(pb: &PbDescribeConfig) -> Self { + Self { + config_key: pb.config_key.clone(), + config_value: pb.config_value.clone(), + config_source: pb.config_source.clone(), + } + } + + pub fn to_pb(&self) -> PbDescribeConfig { + PbDescribeConfig { + config_key: self.config_key.clone(), + config_value: self.config_value.clone(), + config_source: self.config_source.clone(), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/fluss/src/metadata/database.rs b/crates/fluss/src/metadata/database.rs index 15fefb54..76fe5114 100644 --- a/crates/fluss/src/metadata/database.rs +++ b/crates/fluss/src/metadata/database.rs @@ -18,6 +18,7 @@ use crate::error::Error::JsonSerdeError; use crate::error::Result; use crate::metadata::JsonSerde; +use crate::proto::PbDatabaseSummary; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; use std::collections::HashMap; @@ -205,6 +206,24 @@ impl DatabaseDescriptor { } } +/// Lightweight summary of a database returned by `list_database_summaries`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DatabaseSummary { + pub database_name: String, + pub created_time: i64, + pub table_count: i32, +} + +impl DatabaseSummary { + pub fn from_pb(pb: &PbDatabaseSummary) -> Self { + Self { + database_name: pb.database_name.clone(), + created_time: pb.created_time, + table_count: pb.table_count, + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/fluss/src/metadata/kv_snapshot.rs b/crates/fluss/src/metadata/kv_snapshot.rs new file mode 100644 index 00000000..054254ad --- /dev/null +++ b/crates/fluss/src/metadata/kv_snapshot.rs @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::{ + AcquireKvSnapshotLeaseResponse, GetKvSnapshotMetadataResponse, GetLatestKvSnapshotsResponse, + ListKvSnapshotsResponse, PbKvSnapshot, PbRemotePathAndLocalFile, +}; +use crate::{BucketId, PartitionId, TableId}; + +use crate::metadata::KvSnapshotLeaseForTable; + +/// Per-bucket KV snapshot info. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KvSnapshot { + pub bucket_id: BucketId, + pub snapshot_id: Option, + pub log_offset: Option, +} + +impl KvSnapshot { + pub fn from_pb(pb: &PbKvSnapshot) -> Self { + Self { + bucket_id: pb.bucket_id, + snapshot_id: pb.snapshot_id, + log_offset: pb.log_offset, + } + } +} + +/// Result of `get_latest_kv_snapshots`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LatestKvSnapshots { + pub table_id: TableId, + pub partition_id: Option, + pub latest_snapshots: Vec, +} + +impl LatestKvSnapshots { + pub fn from_pb(pb: &GetLatestKvSnapshotsResponse) -> Self { + Self { + table_id: pb.table_id, + partition_id: pb.partition_id, + latest_snapshots: pb + .latest_snapshots + .iter() + .map(KvSnapshot::from_pb) + .collect(), + } + } +} + +/// One file in a KV snapshot manifest: its remote path and the local filename +/// the server expects clients to materialize it as. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemotePathAndLocalFile { + pub remote_path: String, + pub local_file_name: String, +} + +impl RemotePathAndLocalFile { + pub fn from_pb(pb: &PbRemotePathAndLocalFile) -> Self { + Self { + remote_path: pb.remote_path.clone(), + local_file_name: pb.local_file_name.clone(), + } + } +} + +/// Result of `get_kv_snapshot_metadata`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KvSnapshotMetadata { + pub log_offset: i64, + pub snapshot_files: Vec, +} + +impl KvSnapshotMetadata { + pub fn from_pb(pb: &GetKvSnapshotMetadataResponse) -> Self { + Self { + log_offset: pb.log_offset, + snapshot_files: pb + .snapshot_files + .iter() + .map(RemotePathAndLocalFile::from_pb) + .collect(), + } + } +} + +/// Result of `acquire_kv_snapshot_lease` — any snapshots the server could not +/// lease (typically because they were evicted concurrently). +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AcquireKvSnapshotLeaseResult { + pub unavailable_snapshots: Vec, +} + +impl AcquireKvSnapshotLeaseResult { + pub fn from_pb(pb: &AcquireKvSnapshotLeaseResponse) -> Self { + Self { + unavailable_snapshots: pb + .unavailable_snapshots + .iter() + .map(KvSnapshotLeaseForTable::from_pb) + .collect(), + } + } +} + +/// Result of `list_kv_snapshots`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ActiveKvSnapshots { + pub table_id: TableId, + pub partition_id: Option, + pub active_snapshots: Vec, +} + +impl ActiveKvSnapshots { + pub fn from_pb(pb: &ListKvSnapshotsResponse) -> Self { + Self { + table_id: pb.table_id, + partition_id: pb.partition_id, + active_snapshots: pb + .active_snapshots + .iter() + .map(KvSnapshot::from_pb) + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_kv_snapshot_from_pb() { + let pb = PbKvSnapshot { + bucket_id: 3, + snapshot_id: Some(7), + log_offset: Some(42), + }; + let s = KvSnapshot::from_pb(&pb); + assert_eq!(s.bucket_id, 3); + assert_eq!(s.snapshot_id, Some(7)); + assert_eq!(s.log_offset, Some(42)); + } + + #[test] + fn test_remote_path_and_local_file_from_pb() { + let pb = PbRemotePathAndLocalFile { + remote_path: "s3://bucket/snap/1.sst".to_string(), + local_file_name: "1.sst".to_string(), + }; + let f = RemotePathAndLocalFile::from_pb(&pb); + assert_eq!(f.remote_path, "s3://bucket/snap/1.sst"); + assert_eq!(f.local_file_name, "1.sst"); + } +} diff --git a/crates/fluss/src/metadata/kv_snapshot_lease.rs b/crates/fluss/src/metadata/kv_snapshot_lease.rs index 22679136..98638f5d 100644 --- a/crates/fluss/src/metadata/kv_snapshot_lease.rs +++ b/crates/fluss/src/metadata/kv_snapshot_lease.rs @@ -16,12 +16,13 @@ // under the License. use crate::proto::{PbKvSnapshotLeaseForBucket, PbKvSnapshotLeaseForTable}; +use crate::{BucketId, PartitionId, TableId}; /// One bucket's slot in a KV-snapshot lease request. #[derive(Debug, Clone, PartialEq, Eq)] pub struct KvSnapshotLeaseForBucket { - pub partition_id: Option, - pub bucket_id: i32, + pub partition_id: Option, + pub bucket_id: BucketId, pub snapshot_id: i64, } @@ -46,7 +47,7 @@ impl KvSnapshotLeaseForBucket { /// All the buckets of a single table that should be leased together. #[derive(Debug, Clone, PartialEq, Eq)] pub struct KvSnapshotLeaseForTable { - pub table_id: i64, + pub table_id: TableId, pub bucket_snapshots: Vec, } diff --git a/crates/fluss/src/metadata/lake_snapshot.rs b/crates/fluss/src/metadata/lake_snapshot.rs new file mode 100644 index 00000000..5ff50ad6 --- /dev/null +++ b/crates/fluss/src/metadata/lake_snapshot.rs @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::{GetLakeSnapshotResponse, PbLakeSnapshotForBucket}; +use crate::{BucketId, PartitionId, TableId}; + +/// One bucket's slice of a lake snapshot. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LakeBucketSnapshot { + pub partition_id: Option, + pub bucket_id: BucketId, + pub log_offset: Option, + pub partition_name: Option, +} + +impl LakeBucketSnapshot { + pub fn from_pb(pb: &PbLakeSnapshotForBucket) -> Self { + Self { + partition_id: pb.partition_id, + bucket_id: pb.bucket_id, + log_offset: pb.log_offset, + partition_name: pb.partition_name.clone(), + } + } +} + +/// Result of `get_lake_snapshot` — a specific snapshot's bucket layout. +/// (Distinct from [`LakeSnapshot`](super::LakeSnapshot), which represents the +/// "latest" snapshot summary returned by `get_latest_lake_snapshot`.) +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LakeSnapshotInfo { + pub table_id: TableId, + pub snapshot_id: i64, + pub bucket_snapshots: Vec, +} + +impl LakeSnapshotInfo { + pub fn from_pb(pb: &GetLakeSnapshotResponse) -> Self { + Self { + table_id: pb.table_id, + snapshot_id: pb.snapshot_id, + bucket_snapshots: pb + .bucket_snapshots + .iter() + .map(LakeBucketSnapshot::from_pb) + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_lake_bucket_snapshot_from_pb() { + let pb = PbLakeSnapshotForBucket { + partition_id: Some(1), + bucket_id: 2, + log_offset: Some(3), + partition_name: Some("date=2024-01-01".to_string()), + }; + let s = LakeBucketSnapshot::from_pb(&pb); + assert_eq!(s.bucket_id, 2); + assert_eq!(s.partition_id, Some(1)); + assert_eq!(s.log_offset, Some(3)); + assert_eq!(s.partition_name.as_deref(), Some("date=2024-01-01")); + } +} diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs index 0249fefe..d992273c 100644 --- a/crates/fluss/src/metadata/mod.rs +++ b/crates/fluss/src/metadata/mod.rs @@ -16,15 +16,21 @@ // under the License. mod acl; +mod cluster_health; mod config; mod data_lake_format; mod database; mod datatype; mod goal_type; mod json_serde; +mod kv_snapshot; mod kv_snapshot_lease; +mod lake_snapshot; mod partition; mod producer_offsets; +mod rebalance; +mod register_producer_result; +mod remote_log; mod schema_util; mod server_tag; mod table; @@ -32,15 +38,21 @@ mod table_change; mod table_stats; pub use acl::*; +pub use cluster_health::*; pub use config::*; pub use data_lake_format::*; pub use database::*; pub use datatype::*; pub use goal_type::*; pub use json_serde::*; +pub use kv_snapshot::*; pub use kv_snapshot_lease::*; +pub use lake_snapshot::*; pub use partition::*; pub use producer_offsets::*; +pub use rebalance::*; +pub use register_producer_result::*; +pub use remote_log::*; pub(crate) use schema_util::{UNEXIST_MAPPING, index_mapping}; pub use server_tag::*; pub use table::*; diff --git a/crates/fluss/src/metadata/producer_offsets.rs b/crates/fluss/src/metadata/producer_offsets.rs index f0daddbc..f367c390 100644 --- a/crates/fluss/src/metadata/producer_offsets.rs +++ b/crates/fluss/src/metadata/producer_offsets.rs @@ -15,13 +15,14 @@ // specific language governing permissions and limitations // under the License. -use crate::proto::{PbBucketOffset, PbProducerTableOffsets}; +use crate::proto::{GetProducerOffsetsResponse, PbBucketOffset, PbProducerTableOffsets}; +use crate::{BucketId, PartitionId, TableId}; /// Per-bucket producer log-end offset. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BucketOffset { - pub partition_id: Option, - pub bucket_id: i32, + pub partition_id: Option, + pub bucket_id: BucketId, pub log_end_offset: Option, } @@ -46,7 +47,7 @@ impl BucketOffset { /// All bucket offsets of a single table belonging to one producer. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ProducerTableOffsets { - pub table_id: i64, + pub table_id: TableId, pub bucket_offsets: Vec, } @@ -74,6 +75,28 @@ impl ProducerTableOffsets { } } +/// Result of `get_producer_offsets`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProducerOffsets { + pub producer_id: Option, + pub expiration_time: Option, + pub table_offsets: Vec, +} + +impl ProducerOffsets { + pub fn from_pb(pb: &GetProducerOffsetsResponse) -> Self { + Self { + producer_id: pb.producer_id.clone(), + expiration_time: pb.expiration_time, + table_offsets: pb + .table_offsets + .iter() + .map(ProducerTableOffsets::from_pb) + .collect(), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/fluss/src/metadata/rebalance.rs b/crates/fluss/src/metadata/rebalance.rs new file mode 100644 index 00000000..13826f9e --- /dev/null +++ b/crates/fluss/src/metadata/rebalance.rs @@ -0,0 +1,187 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; +use crate::proto::{ + ListRebalanceProgressResponse, PbRebalancePlanForBucket, PbRebalanceProgressForBucket, + PbRebalanceProgressForTable, +}; +use crate::{BucketId, PartitionId, TableId}; + +/// Mirrors Java `org.apache.fluss.cluster.rebalance.RebalanceStatus`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RebalanceStatus { + NotStarted, + Rebalancing, + Failed, + Completed, + Canceled, + Timeout, +} + +impl RebalanceStatus { + pub fn to_i32(self) -> i32 { + match self { + Self::NotStarted => 0, + Self::Rebalancing => 1, + Self::Failed => 2, + Self::Completed => 3, + Self::Canceled => 4, + Self::Timeout => 5, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::NotStarted), + 1 => Ok(Self::Rebalancing), + 2 => Ok(Self::Failed), + 3 => Ok(Self::Completed), + 4 => Ok(Self::Canceled), + 5 => Ok(Self::Timeout), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported RebalanceStatus: {value}"), + }), + } + } +} + +/// Per-bucket plan in a rebalance: who the leader was and who it will be, who +/// the replicas were and who they will be. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketRebalancePlan { + pub partition_id: Option, + pub bucket_id: BucketId, + pub original_leader: Option, + pub new_leader: Option, + pub original_replicas: Vec, + pub new_replicas: Vec, +} + +impl BucketRebalancePlan { + pub fn from_pb(pb: &PbRebalancePlanForBucket) -> Self { + Self { + partition_id: pb.partition_id, + bucket_id: pb.bucket_id, + original_leader: pb.original_leader, + new_leader: pb.new_leader, + original_replicas: pb.original_replicas.clone(), + new_replicas: pb.new_replicas.clone(), + } + } +} + +/// Per-bucket rebalance progress: the planned move and its current status code. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketRebalanceProgress { + pub rebalance_plan: BucketRebalancePlan, + pub rebalance_status: RebalanceStatus, +} + +impl BucketRebalanceProgress { + pub fn from_pb(pb: &PbRebalanceProgressForBucket) -> Result { + Ok(Self { + rebalance_plan: BucketRebalancePlan::from_pb(&pb.rebalance_plan), + rebalance_status: RebalanceStatus::try_from_i32(pb.rebalance_status)?, + }) + } +} + +/// All bucket progress for one table. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TableRebalanceProgress { + pub table_id: TableId, + pub buckets_progress: Vec, +} + +impl TableRebalanceProgress { + pub fn from_pb(pb: &PbRebalanceProgressForTable) -> Result { + Ok(Self { + table_id: pb.table_id, + buckets_progress: pb + .buckets_progress + .iter() + .map(BucketRebalanceProgress::from_pb) + .collect::>>()?, + }) + } +} + +/// Result of `list_rebalance_progress`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RebalanceProgress { + pub rebalance_id: Option, + pub rebalance_status: Option, + pub table_progress: Vec, +} + +impl RebalanceProgress { + pub fn from_pb(pb: &ListRebalanceProgressResponse) -> Result { + Ok(Self { + rebalance_id: pb.rebalance_id.clone(), + rebalance_status: pb + .rebalance_status + .map(RebalanceStatus::try_from_i32) + .transpose()?, + table_progress: pb + .table_progress + .iter() + .map(TableRebalanceProgress::from_pb) + .collect::>>()?, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_rebalance_status_roundtrip() { + for s in [ + RebalanceStatus::NotStarted, + RebalanceStatus::Rebalancing, + RebalanceStatus::Failed, + RebalanceStatus::Completed, + RebalanceStatus::Canceled, + RebalanceStatus::Timeout, + ] { + assert_eq!(RebalanceStatus::try_from_i32(s.to_i32()).unwrap(), s); + } + } + + #[test] + fn test_rebalance_status_unknown() { + assert!(RebalanceStatus::try_from_i32(99).is_err()); + } + + #[test] + fn test_bucket_rebalance_plan_from_pb() { + let pb = PbRebalancePlanForBucket { + partition_id: Some(1), + bucket_id: 2, + original_leader: Some(3), + new_leader: Some(4), + original_replicas: vec![3, 5, 6], + new_replicas: vec![4, 5, 6], + }; + let p = BucketRebalancePlan::from_pb(&pb); + assert_eq!(p.bucket_id, 2); + assert_eq!(p.new_leader, Some(4)); + assert_eq!(p.new_replicas, vec![4, 5, 6]); + } +} diff --git a/crates/fluss/src/metadata/register_producer_result.rs b/crates/fluss/src/metadata/register_producer_result.rs new file mode 100644 index 00000000..1c0c8e50 --- /dev/null +++ b/crates/fluss/src/metadata/register_producer_result.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; + +/// Mirrors Java `org.apache.fluss.client.admin.RegisterResult`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RegisterProducerResult { + /// Snapshot was newly created (first-startup scenario; no undo recovery needed). + Created, + /// Snapshot already existed (failover scenario; caller should perform undo recovery). + AlreadyExists, +} + +impl RegisterProducerResult { + pub fn to_i32(self) -> i32 { + match self { + Self::Created => 0, + Self::AlreadyExists => 1, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::Created), + 1 => Ok(Self::AlreadyExists), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported RegisterProducerResult: {value}"), + }), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_register_producer_result_roundtrip() { + for r in [ + RegisterProducerResult::Created, + RegisterProducerResult::AlreadyExists, + ] { + assert_eq!(RegisterProducerResult::try_from_i32(r.to_i32()).unwrap(), r); + } + } + + #[test] + fn test_register_producer_result_unknown() { + assert!(RegisterProducerResult::try_from_i32(99).is_err()); + } +} diff --git a/crates/fluss/src/metadata/remote_log.rs b/crates/fluss/src/metadata/remote_log.rs new file mode 100644 index 00000000..d3fd3e33 --- /dev/null +++ b/crates/fluss/src/metadata/remote_log.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TableBucket; +use crate::proto::PbRemoteLogManifestEntry; + +/// One bucket's remote-log manifest pointer. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RemoteLogManifestEntry { + pub table_bucket: TableBucket, + pub remote_log_manifest_path: String, + pub remote_log_end_offset: i64, +} + +impl RemoteLogManifestEntry { + pub fn from_pb(pb: &PbRemoteLogManifestEntry) -> Self { + Self { + table_bucket: TableBucket::from_pb(&pb.table_bucket), + remote_log_manifest_path: pb.remote_log_manifest_path.clone(), + remote_log_end_offset: pb.remote_log_end_offset, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::proto::PbTableBucket; + + #[test] + fn test_remote_log_manifest_entry_from_pb() { + let pb = PbRemoteLogManifestEntry { + table_bucket: PbTableBucket { + table_id: 1, + partition_id: None, + bucket_id: 2, + }, + remote_log_manifest_path: "s3://bucket/manifest.json".to_string(), + remote_log_end_offset: 999, + }; + let m = RemoteLogManifestEntry::from_pb(&pb); + assert_eq!(m.remote_log_end_offset, 999); + assert_eq!(m.table_bucket.bucket_id(), 2); + } +} diff --git a/crates/fluss/src/metadata/table_change.rs b/crates/fluss/src/metadata/table_change.rs index 2ecf6cfa..3e5866a2 100644 --- a/crates/fluss/src/metadata/table_change.rs +++ b/crates/fluss/src/metadata/table_change.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use super::AlterConfig; use crate::error::{Error, Result}; use crate::proto::{PbAddColumn, PbDropColumn, PbModifyColumn, PbRenameColumn}; @@ -121,6 +122,18 @@ impl RenameColumn { } } +/// Bundle of column-level changes for a single `alter_table` call. Empty `Vec`s +/// mean "no change of that kind"; pass `Default::default()` to send only +/// config changes. +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct AlterTableChanges { + pub config_changes: Vec, + pub add_columns: Vec, + pub drop_columns: Vec, + pub rename_columns: Vec, + pub modify_columns: Vec, +} + /// Modify a column's type/comment/position. Mirrors the `ModifyColumn` variant of /// Java `TableChange`. All fields except `column_name` are optional — only the /// non-`None` ones are applied. diff --git a/crates/fluss/src/metadata/table_stats.rs b/crates/fluss/src/metadata/table_stats.rs index ad2d1fc2..53c6f72f 100644 --- a/crates/fluss/src/metadata/table_stats.rs +++ b/crates/fluss/src/metadata/table_stats.rs @@ -15,18 +15,19 @@ // specific language governing permissions and limitations // under the License. -use crate::proto::PbTableStatsReqForBucket; +use crate::proto::{GetTableStatsResponse, PbTableStatsReqForBucket, PbTableStatsRespForBucket}; +use crate::{BucketId, PartitionId}; /// Per-bucket request item for `GetTableStats`. /// Mirrors the bucket-stats request shape used by the Java client. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BucketStatsRequest { - pub partition_id: Option, - pub bucket_id: i32, + pub partition_id: Option, + pub bucket_id: BucketId, } impl BucketStatsRequest { - pub fn new(partition_id: Option, bucket_id: i32) -> Self { + pub fn new(partition_id: Option, bucket_id: BucketId) -> Self { Self { partition_id, bucket_id, @@ -48,6 +49,51 @@ impl BucketStatsRequest { } } +/// Per-bucket stats result returned by `GetTableStats`. `row_count` is `None` +/// when the server returned an error for the bucket; check `error` in that case. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketStats { + pub bucket_id: BucketId, + pub partition_id: Option, + pub row_count: Option, + pub error: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketStatsError { + pub code: i32, + pub message: Option, +} + +impl BucketStats { + pub fn from_pb(pb: &PbTableStatsRespForBucket) -> Self { + let error = pb.error_code.map(|code| BucketStatsError { + code, + message: pb.error_message.clone(), + }); + Self { + bucket_id: pb.bucket_id, + partition_id: pb.partition_id, + row_count: pb.row_count, + error, + } + } +} + +/// Full result of `GetTableStats` — one entry per requested bucket. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TableStats { + pub buckets: Vec, +} + +impl TableStats { + pub fn from_pb(pb: &GetTableStatsResponse) -> Self { + Self { + buckets: pb.buckets_resp.iter().map(BucketStats::from_pb).collect(), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -62,4 +108,33 @@ mod tests { assert_eq!(BucketStatsRequest::from_pb(&pb), req); } } + + #[test] + fn test_bucket_stats_from_pb_ok() { + let pb = PbTableStatsRespForBucket { + error_code: None, + error_message: None, + partition_id: Some(1), + bucket_id: 7, + row_count: Some(123), + }; + let s = BucketStats::from_pb(&pb); + assert_eq!(s.bucket_id, 7); + assert_eq!(s.row_count, Some(123)); + assert!(s.error.is_none()); + } + + #[test] + fn test_bucket_stats_from_pb_err() { + let pb = PbTableStatsRespForBucket { + error_code: Some(7), + error_message: Some("nope".to_string()), + partition_id: None, + bucket_id: 2, + row_count: None, + }; + let s = BucketStats::from_pb(&pb); + assert_eq!(s.error.as_ref().unwrap().code, 7); + assert_eq!(s.error.as_ref().unwrap().message.as_deref(), Some("nope")); + } } diff --git a/crates/fluss/src/rpc/message/alter_database.rs b/crates/fluss/src/rpc/message/alter_database.rs index 3eb08163..2135e663 100644 --- a/crates/fluss/src/rpc/message/alter_database.rs +++ b/crates/fluss/src/rpc/message/alter_database.rs @@ -33,13 +33,14 @@ impl AlterDatabaseRequest { database_name: &str, ignore_if_not_exists: bool, config_changes: Vec, + comment: Option<&str>, ) -> Self { AlterDatabaseRequest { inner_request: proto::AlterDatabaseRequest { database_name: database_name.to_string(), ignore_if_not_exists, config_changes: config_changes.iter().map(AlterConfig::to_pb).collect(), - comment: None, + comment: comment.map(str::to_string), }, } }