From c93a5cfa953ef833dae2ec408a206b1b2d6225b3 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Thu, 18 Jun 2026 11:52:51 +0800 Subject: [PATCH 1/3] [rust] Add RPC message wrappers for extended operations Add message wrappers for the remaining 1.x RPC APIs: - KV snapshot lifecycle: acquire/release/drop lease, list, metadata, latest snapshots, lake snapshot - Server management: add/remove server tag, rebalance + progress + cancel, get cluster health, list remote log manifests - Producer offsets: register/get/delete - ScanKv (API 1061): full KV-table bucket scan request/response --- .../fluss/src/metadata/kv_snapshot_lease.rs | 118 ++++++++++++++++++ crates/fluss/src/metadata/mod.rs | 4 + crates/fluss/src/metadata/producer_offsets.rs | 118 ++++++++++++++++++ crates/fluss/src/metadata/table.rs | 16 +++ .../rpc/message/acquire_kv_snapshot_lease.rs | 56 +++++++++ .../fluss/src/rpc/message/add_server_tag.rs | 47 +++++++ .../fluss/src/rpc/message/cancel_rebalance.rs | 46 +++++++ .../rpc/message/delete_producer_offsets.rs | 46 +++++++ .../src/rpc/message/drop_kv_snapshot_lease.rs | 46 +++++++ .../src/rpc/message/get_cluster_health.rs | 44 +++++++ .../rpc/message/get_kv_snapshot_metadata.rs | 49 ++++++++ .../src/rpc/message/get_lake_snapshot.rs | 50 ++++++++ .../rpc/message/get_latest_kv_snapshots.rs | 49 ++++++++ .../src/rpc/message/get_producer_offsets.rs | 46 +++++++ .../src/rpc/message/list_kv_snapshots.rs | 47 +++++++ .../rpc/message/list_rebalance_progress.rs | 46 +++++++ .../rpc/message/list_remote_log_manifests.rs | 47 +++++++ crates/fluss/src/rpc/message/mod.rs | 36 ++++++ crates/fluss/src/rpc/message/rebalance.rs | 44 +++++++ .../rpc/message/register_producer_offsets.rs | 56 +++++++++ .../rpc/message/release_kv_snapshot_lease.rs | 48 +++++++ .../src/rpc/message/remove_server_tag.rs | 47 +++++++ crates/fluss/src/rpc/message/scan_kv.rs | 57 +++++++++ 23 files changed, 1163 insertions(+) create mode 100644 crates/fluss/src/metadata/kv_snapshot_lease.rs create mode 100644 crates/fluss/src/metadata/producer_offsets.rs create mode 100644 crates/fluss/src/rpc/message/acquire_kv_snapshot_lease.rs create mode 100644 crates/fluss/src/rpc/message/add_server_tag.rs create mode 100644 crates/fluss/src/rpc/message/cancel_rebalance.rs create mode 100644 crates/fluss/src/rpc/message/delete_producer_offsets.rs create mode 100644 crates/fluss/src/rpc/message/drop_kv_snapshot_lease.rs create mode 100644 crates/fluss/src/rpc/message/get_cluster_health.rs create mode 100644 crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs create mode 100644 crates/fluss/src/rpc/message/get_lake_snapshot.rs create mode 100644 crates/fluss/src/rpc/message/get_latest_kv_snapshots.rs create mode 100644 crates/fluss/src/rpc/message/get_producer_offsets.rs create mode 100644 crates/fluss/src/rpc/message/list_kv_snapshots.rs create mode 100644 crates/fluss/src/rpc/message/list_rebalance_progress.rs create mode 100644 crates/fluss/src/rpc/message/list_remote_log_manifests.rs create mode 100644 crates/fluss/src/rpc/message/rebalance.rs create mode 100644 crates/fluss/src/rpc/message/register_producer_offsets.rs create mode 100644 crates/fluss/src/rpc/message/release_kv_snapshot_lease.rs create mode 100644 crates/fluss/src/rpc/message/remove_server_tag.rs create mode 100644 crates/fluss/src/rpc/message/scan_kv.rs diff --git a/crates/fluss/src/metadata/kv_snapshot_lease.rs b/crates/fluss/src/metadata/kv_snapshot_lease.rs new file mode 100644 index 00000000..22679136 --- /dev/null +++ b/crates/fluss/src/metadata/kv_snapshot_lease.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::{PbKvSnapshotLeaseForBucket, PbKvSnapshotLeaseForTable}; + +/// One bucket's slot in a KV-snapshot lease request. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KvSnapshotLeaseForBucket { + pub partition_id: Option, + pub bucket_id: i32, + pub snapshot_id: i64, +} + +impl KvSnapshotLeaseForBucket { + pub fn to_pb(&self) -> PbKvSnapshotLeaseForBucket { + PbKvSnapshotLeaseForBucket { + partition_id: self.partition_id, + bucket_id: self.bucket_id, + snapshot_id: self.snapshot_id, + } + } + + pub fn from_pb(pb: &PbKvSnapshotLeaseForBucket) -> Self { + Self { + partition_id: pb.partition_id, + bucket_id: pb.bucket_id, + snapshot_id: pb.snapshot_id, + } + } +} + +/// All the buckets of a single table that should be leased together. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct KvSnapshotLeaseForTable { + pub table_id: i64, + pub bucket_snapshots: Vec, +} + +impl KvSnapshotLeaseForTable { + pub fn to_pb(&self) -> PbKvSnapshotLeaseForTable { + PbKvSnapshotLeaseForTable { + table_id: self.table_id, + bucket_snapshots: self + .bucket_snapshots + .iter() + .map(KvSnapshotLeaseForBucket::to_pb) + .collect(), + } + } + + pub fn from_pb(pb: &PbKvSnapshotLeaseForTable) -> Self { + Self { + table_id: pb.table_id, + bucket_snapshots: pb + .bucket_snapshots + .iter() + .map(KvSnapshotLeaseForBucket::from_pb) + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_kv_snapshot_lease_for_bucket_roundtrip() { + for b in [ + KvSnapshotLeaseForBucket { + partition_id: None, + bucket_id: 0, + snapshot_id: 10, + }, + KvSnapshotLeaseForBucket { + partition_id: Some(42), + bucket_id: 3, + snapshot_id: 99, + }, + ] { + assert_eq!(KvSnapshotLeaseForBucket::from_pb(&b.to_pb()), b); + } + } + + #[test] + fn test_kv_snapshot_lease_for_table_roundtrip() { + let t = KvSnapshotLeaseForTable { + table_id: 7, + bucket_snapshots: vec![ + KvSnapshotLeaseForBucket { + partition_id: None, + bucket_id: 0, + snapshot_id: 10, + }, + KvSnapshotLeaseForBucket { + partition_id: Some(42), + bucket_id: 1, + snapshot_id: 11, + }, + ], + }; + assert_eq!(KvSnapshotLeaseForTable::from_pb(&t.to_pb()), t); + } +} diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs index 28cda723..ee5e0c41 100644 --- a/crates/fluss/src/metadata/mod.rs +++ b/crates/fluss/src/metadata/mod.rs @@ -21,7 +21,9 @@ mod data_lake_format; mod database; mod datatype; mod json_serde; +mod kv_snapshot_lease; mod partition; +mod producer_offsets; mod schema_util; mod table; mod table_change; @@ -33,7 +35,9 @@ pub use data_lake_format::*; pub use database::*; pub use datatype::*; pub use json_serde::*; +pub use kv_snapshot_lease::*; pub use partition::*; +pub use producer_offsets::*; pub(crate) use schema_util::{UNEXIST_MAPPING, index_mapping}; pub use table::*; pub use table_change::*; diff --git a/crates/fluss/src/metadata/producer_offsets.rs b/crates/fluss/src/metadata/producer_offsets.rs new file mode 100644 index 00000000..f0daddbc --- /dev/null +++ b/crates/fluss/src/metadata/producer_offsets.rs @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::proto::{PbBucketOffset, PbProducerTableOffsets}; + +/// Per-bucket producer log-end offset. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BucketOffset { + pub partition_id: Option, + pub bucket_id: i32, + pub log_end_offset: Option, +} + +impl BucketOffset { + pub fn to_pb(&self) -> PbBucketOffset { + PbBucketOffset { + partition_id: self.partition_id, + bucket_id: self.bucket_id, + log_end_offset: self.log_end_offset, + } + } + + pub fn from_pb(pb: &PbBucketOffset) -> Self { + Self { + partition_id: pb.partition_id, + bucket_id: pb.bucket_id, + log_end_offset: pb.log_end_offset, + } + } +} + +/// All bucket offsets of a single table belonging to one producer. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ProducerTableOffsets { + pub table_id: i64, + pub bucket_offsets: Vec, +} + +impl ProducerTableOffsets { + pub fn to_pb(&self) -> PbProducerTableOffsets { + PbProducerTableOffsets { + table_id: self.table_id, + bucket_offsets: self + .bucket_offsets + .iter() + .map(BucketOffset::to_pb) + .collect(), + } + } + + pub fn from_pb(pb: &PbProducerTableOffsets) -> Self { + Self { + table_id: pb.table_id, + bucket_offsets: pb + .bucket_offsets + .iter() + .map(BucketOffset::from_pb) + .collect(), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bucket_offset_roundtrip() { + for b in [ + BucketOffset { + partition_id: None, + bucket_id: 0, + log_end_offset: None, + }, + BucketOffset { + partition_id: Some(42), + bucket_id: 3, + log_end_offset: Some(1234), + }, + ] { + assert_eq!(BucketOffset::from_pb(&b.to_pb()), b); + } + } + + #[test] + fn test_producer_table_offsets_roundtrip() { + let t = ProducerTableOffsets { + table_id: 5, + bucket_offsets: vec![ + BucketOffset { + partition_id: None, + bucket_id: 0, + log_end_offset: Some(100), + }, + BucketOffset { + partition_id: Some(7), + bucket_id: 1, + log_end_offset: None, + }, + ], + }; + assert_eq!(ProducerTableOffsets::from_pb(&t.to_pb()), t); + } +} diff --git a/crates/fluss/src/metadata/table.rs b/crates/fluss/src/metadata/table.rs index 35d251d7..a796ed98 100644 --- a/crates/fluss/src/metadata/table.rs +++ b/crates/fluss/src/metadata/table.rs @@ -1463,6 +1463,22 @@ impl TableBucket { pub fn partition_id(&self) -> Option { self.partition_id } + + pub fn to_pb(&self) -> crate::proto::PbTableBucket { + crate::proto::PbTableBucket { + table_id: self.table_id, + partition_id: self.partition_id, + bucket_id: self.bucket, + } + } + + pub fn from_pb(pb: &crate::proto::PbTableBucket) -> Self { + Self { + table_id: pb.table_id, + partition_id: pb.partition_id, + bucket: pb.bucket_id, + } + } } impl Display for TableBucket { diff --git a/crates/fluss/src/rpc/message/acquire_kv_snapshot_lease.rs b/crates/fluss/src/rpc/message/acquire_kv_snapshot_lease.rs new file mode 100644 index 00000000..9d0bafee --- /dev/null +++ b/crates/fluss/src/rpc/message/acquire_kv_snapshot_lease.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::KvSnapshotLeaseForTable; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct AcquireKvSnapshotLeaseRequest { + pub(crate) inner_request: proto::AcquireKvSnapshotLeaseRequest, +} + +impl AcquireKvSnapshotLeaseRequest { + pub fn new( + lease_id: &str, + lease_duration_ms: i64, + snapshots_to_lease: Vec, + ) -> Self { + AcquireKvSnapshotLeaseRequest { + inner_request: proto::AcquireKvSnapshotLeaseRequest { + lease_id: lease_id.to_string(), + lease_duration_ms, + snapshots_to_lease: snapshots_to_lease + .iter() + .map(KvSnapshotLeaseForTable::to_pb) + .collect(), + }, + } + } +} + +impl RequestBody for AcquireKvSnapshotLeaseRequest { + type ResponseBody = proto::AcquireKvSnapshotLeaseResponse; + const API_KEY: ApiKey = ApiKey::AcquireKvSnapshotLease; +} + +impl_write_type!(AcquireKvSnapshotLeaseRequest); +impl_read_type!(proto::AcquireKvSnapshotLeaseResponse); diff --git a/crates/fluss/src/rpc/message/add_server_tag.rs b/crates/fluss/src/rpc/message/add_server_tag.rs new file mode 100644 index 00000000..eb168777 --- /dev/null +++ b/crates/fluss/src/rpc/message/add_server_tag.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct AddServerTagRequest { + pub(crate) inner_request: proto::AddServerTagRequest, +} + +impl AddServerTagRequest { + pub fn new(server_ids: Vec, server_tag: i32) -> Self { + AddServerTagRequest { + inner_request: proto::AddServerTagRequest { + server_ids, + server_tag, + }, + } + } +} + +impl RequestBody for AddServerTagRequest { + type ResponseBody = proto::AddServerTagResponse; + const API_KEY: ApiKey = ApiKey::AddServerTag; +} + +impl_write_type!(AddServerTagRequest); +impl_read_type!(proto::AddServerTagResponse); diff --git a/crates/fluss/src/rpc/message/cancel_rebalance.rs b/crates/fluss/src/rpc/message/cancel_rebalance.rs new file mode 100644 index 00000000..89248729 --- /dev/null +++ b/crates/fluss/src/rpc/message/cancel_rebalance.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct CancelRebalanceRequest { + pub(crate) inner_request: proto::CancelRebalanceRequest, +} + +impl CancelRebalanceRequest { + pub fn new(rebalance_id: Option<&str>) -> Self { + CancelRebalanceRequest { + inner_request: proto::CancelRebalanceRequest { + rebalance_id: rebalance_id.map(|s| s.to_string()), + }, + } + } +} + +impl RequestBody for CancelRebalanceRequest { + type ResponseBody = proto::CancelRebalanceResponse; + const API_KEY: ApiKey = ApiKey::CancelRebalance; +} + +impl_write_type!(CancelRebalanceRequest); +impl_read_type!(proto::CancelRebalanceResponse); diff --git a/crates/fluss/src/rpc/message/delete_producer_offsets.rs b/crates/fluss/src/rpc/message/delete_producer_offsets.rs new file mode 100644 index 00000000..5dd80036 --- /dev/null +++ b/crates/fluss/src/rpc/message/delete_producer_offsets.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct DeleteProducerOffsetsRequest { + pub(crate) inner_request: proto::DeleteProducerOffsetsRequest, +} + +impl DeleteProducerOffsetsRequest { + pub fn new(producer_id: &str) -> Self { + DeleteProducerOffsetsRequest { + inner_request: proto::DeleteProducerOffsetsRequest { + producer_id: producer_id.to_string(), + }, + } + } +} + +impl RequestBody for DeleteProducerOffsetsRequest { + type ResponseBody = proto::DeleteProducerOffsetsResponse; + const API_KEY: ApiKey = ApiKey::DeleteProducerOffsets; +} + +impl_write_type!(DeleteProducerOffsetsRequest); +impl_read_type!(proto::DeleteProducerOffsetsResponse); diff --git a/crates/fluss/src/rpc/message/drop_kv_snapshot_lease.rs b/crates/fluss/src/rpc/message/drop_kv_snapshot_lease.rs new file mode 100644 index 00000000..174e75ff --- /dev/null +++ b/crates/fluss/src/rpc/message/drop_kv_snapshot_lease.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct DropKvSnapshotLeaseRequest { + pub(crate) inner_request: proto::DropKvSnapshotLeaseRequest, +} + +impl DropKvSnapshotLeaseRequest { + pub fn new(lease_id: &str) -> Self { + DropKvSnapshotLeaseRequest { + inner_request: proto::DropKvSnapshotLeaseRequest { + lease_id: lease_id.to_string(), + }, + } + } +} + +impl RequestBody for DropKvSnapshotLeaseRequest { + type ResponseBody = proto::DropKvSnapshotLeaseResponse; + const API_KEY: ApiKey = ApiKey::DropKvSnapshotLease; +} + +impl_write_type!(DropKvSnapshotLeaseRequest); +impl_read_type!(proto::DropKvSnapshotLeaseResponse); diff --git a/crates/fluss/src/rpc/message/get_cluster_health.rs b/crates/fluss/src/rpc/message/get_cluster_health.rs new file mode 100644 index 00000000..22606208 --- /dev/null +++ b/crates/fluss/src/rpc/message/get_cluster_health.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug, Default)] +pub struct GetClusterHealthRequest { + pub(crate) inner_request: proto::GetClusterHealthRequest, +} + +impl GetClusterHealthRequest { + pub fn new() -> Self { + GetClusterHealthRequest { + inner_request: proto::GetClusterHealthRequest {}, + } + } +} + +impl RequestBody for GetClusterHealthRequest { + type ResponseBody = proto::GetClusterHealthResponse; + const API_KEY: ApiKey = ApiKey::GetClusterHealth; +} + +impl_write_type!(GetClusterHealthRequest); +impl_read_type!(proto::GetClusterHealthResponse); diff --git a/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs b/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs new file mode 100644 index 00000000..b6dfd426 --- /dev/null +++ b/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct GetKvSnapshotMetadataRequest { + pub(crate) inner_request: proto::GetKvSnapshotMetadataRequest, +} + +impl GetKvSnapshotMetadataRequest { + pub fn new(table_id: i64, partition_id: Option, bucket_id: i32, snapshot_id: i64) -> Self { + GetKvSnapshotMetadataRequest { + inner_request: proto::GetKvSnapshotMetadataRequest { + table_id, + partition_id, + bucket_id, + snapshot_id, + }, + } + } +} + +impl RequestBody for GetKvSnapshotMetadataRequest { + type ResponseBody = proto::GetKvSnapshotMetadataResponse; + const API_KEY: ApiKey = ApiKey::GetKvSnapshotMetadata; +} + +impl_write_type!(GetKvSnapshotMetadataRequest); +impl_read_type!(proto::GetKvSnapshotMetadataResponse); diff --git a/crates/fluss/src/rpc/message/get_lake_snapshot.rs b/crates/fluss/src/rpc/message/get_lake_snapshot.rs new file mode 100644 index 00000000..6273f2f3 --- /dev/null +++ b/crates/fluss/src/rpc/message/get_lake_snapshot.rs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TablePath; +use crate::rpc::api_key::ApiKey; +use crate::rpc::convert::to_table_path; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct GetLakeSnapshotRequest { + pub(crate) inner_request: proto::GetLakeSnapshotRequest, +} + +impl GetLakeSnapshotRequest { + pub fn new(table_path: &TablePath, snapshot_id: Option, readable: Option) -> Self { + GetLakeSnapshotRequest { + inner_request: proto::GetLakeSnapshotRequest { + table_path: to_table_path(table_path), + snapshot_id, + readable, + }, + } + } +} + +impl RequestBody for GetLakeSnapshotRequest { + type ResponseBody = proto::GetLakeSnapshotResponse; + const API_KEY: ApiKey = ApiKey::GetLakeSnapshot; +} + +impl_write_type!(GetLakeSnapshotRequest); +impl_read_type!(proto::GetLakeSnapshotResponse); diff --git a/crates/fluss/src/rpc/message/get_latest_kv_snapshots.rs b/crates/fluss/src/rpc/message/get_latest_kv_snapshots.rs new file mode 100644 index 00000000..b632d54b --- /dev/null +++ b/crates/fluss/src/rpc/message/get_latest_kv_snapshots.rs @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TablePath; +use crate::rpc::api_key::ApiKey; +use crate::rpc::convert::to_table_path; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct GetLatestKvSnapshotsRequest { + pub(crate) inner_request: proto::GetLatestKvSnapshotsRequest, +} + +impl GetLatestKvSnapshotsRequest { + pub fn new(table_path: &TablePath, partition_name: Option<&str>) -> Self { + GetLatestKvSnapshotsRequest { + inner_request: proto::GetLatestKvSnapshotsRequest { + table_path: to_table_path(table_path), + partition_name: partition_name.map(|s| s.to_string()), + }, + } + } +} + +impl RequestBody for GetLatestKvSnapshotsRequest { + type ResponseBody = proto::GetLatestKvSnapshotsResponse; + const API_KEY: ApiKey = ApiKey::GetLatestKvSnapshots; +} + +impl_write_type!(GetLatestKvSnapshotsRequest); +impl_read_type!(proto::GetLatestKvSnapshotsResponse); diff --git a/crates/fluss/src/rpc/message/get_producer_offsets.rs b/crates/fluss/src/rpc/message/get_producer_offsets.rs new file mode 100644 index 00000000..68b666ca --- /dev/null +++ b/crates/fluss/src/rpc/message/get_producer_offsets.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct GetProducerOffsetsRequest { + pub(crate) inner_request: proto::GetProducerOffsetsRequest, +} + +impl GetProducerOffsetsRequest { + pub fn new(producer_id: &str) -> Self { + GetProducerOffsetsRequest { + inner_request: proto::GetProducerOffsetsRequest { + producer_id: producer_id.to_string(), + }, + } + } +} + +impl RequestBody for GetProducerOffsetsRequest { + type ResponseBody = proto::GetProducerOffsetsResponse; + const API_KEY: ApiKey = ApiKey::GetProducerOffsets; +} + +impl_write_type!(GetProducerOffsetsRequest); +impl_read_type!(proto::GetProducerOffsetsResponse); diff --git a/crates/fluss/src/rpc/message/list_kv_snapshots.rs b/crates/fluss/src/rpc/message/list_kv_snapshots.rs new file mode 100644 index 00000000..2b9b10ce --- /dev/null +++ b/crates/fluss/src/rpc/message/list_kv_snapshots.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct ListKvSnapshotsRequest { + pub(crate) inner_request: proto::ListKvSnapshotsRequest, +} + +impl ListKvSnapshotsRequest { + pub fn new(table_id: i64, partition_id: Option) -> Self { + ListKvSnapshotsRequest { + inner_request: proto::ListKvSnapshotsRequest { + table_id, + partition_id, + }, + } + } +} + +impl RequestBody for ListKvSnapshotsRequest { + type ResponseBody = proto::ListKvSnapshotsResponse; + const API_KEY: ApiKey = ApiKey::ListKvSnapshots; +} + +impl_write_type!(ListKvSnapshotsRequest); +impl_read_type!(proto::ListKvSnapshotsResponse); diff --git a/crates/fluss/src/rpc/message/list_rebalance_progress.rs b/crates/fluss/src/rpc/message/list_rebalance_progress.rs new file mode 100644 index 00000000..4b191a4b --- /dev/null +++ b/crates/fluss/src/rpc/message/list_rebalance_progress.rs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct ListRebalanceProgressRequest { + pub(crate) inner_request: proto::ListRebalanceProgressRequest, +} + +impl ListRebalanceProgressRequest { + pub fn new(rebalance_id: Option<&str>) -> Self { + ListRebalanceProgressRequest { + inner_request: proto::ListRebalanceProgressRequest { + rebalance_id: rebalance_id.map(|s| s.to_string()), + }, + } + } +} + +impl RequestBody for ListRebalanceProgressRequest { + type ResponseBody = proto::ListRebalanceProgressResponse; + const API_KEY: ApiKey = ApiKey::ListRebalanceProgress; +} + +impl_write_type!(ListRebalanceProgressRequest); +impl_read_type!(proto::ListRebalanceProgressResponse); diff --git a/crates/fluss/src/rpc/message/list_remote_log_manifests.rs b/crates/fluss/src/rpc/message/list_remote_log_manifests.rs new file mode 100644 index 00000000..ad0946f7 --- /dev/null +++ b/crates/fluss/src/rpc/message/list_remote_log_manifests.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct ListRemoteLogManifestsRequest { + pub(crate) inner_request: proto::ListRemoteLogManifestsRequest, +} + +impl ListRemoteLogManifestsRequest { + pub fn new(table_id: i64, partition_id: Option) -> Self { + ListRemoteLogManifestsRequest { + inner_request: proto::ListRemoteLogManifestsRequest { + table_id, + partition_id, + }, + } + } +} + +impl RequestBody for ListRemoteLogManifestsRequest { + type ResponseBody = proto::ListRemoteLogManifestsResponse; + const API_KEY: ApiKey = ApiKey::ListRemoteLogManifests; +} + +impl_write_type!(ListRemoteLogManifestsRequest); +impl_read_type!(proto::ListRemoteLogManifestsResponse); diff --git a/crates/fluss/src/rpc/message/mod.rs b/crates/fluss/src/rpc/message/mod.rs index f126d214..aba690a5 100644 --- a/crates/fluss/src/rpc/message/mod.rs +++ b/crates/fluss/src/rpc/message/mod.rs @@ -19,24 +19,34 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::frame::{ReadError, WriteError}; use bytes::{Buf, BufMut}; +mod acquire_kv_snapshot_lease; +mod add_server_tag; mod alter_cluster_configs; mod alter_database; mod alter_table; mod api_versions; mod authenticate; +mod cancel_rebalance; mod create_acls; mod create_database; mod create_partition; mod create_table; mod database_exists; +mod delete_producer_offsets; mod describe_cluster_configs; mod drop_acls; mod drop_database; +mod drop_kv_snapshot_lease; mod drop_partition; mod drop_table; mod fetch; +mod get_cluster_health; mod get_database_info; +mod get_kv_snapshot_metadata; +mod get_lake_snapshot; +mod get_latest_kv_snapshots; mod get_latest_lake_snapshot; +mod get_producer_offsets; mod get_security_token; mod get_table; mod get_table_schema; @@ -47,35 +57,53 @@ mod limit_scan; mod list_acls; mod list_database_summaries; mod list_databases; +mod list_kv_snapshots; mod list_offsets; mod list_partition_infos; +mod list_rebalance_progress; +mod list_remote_log_manifests; mod list_tables; mod lookup; mod prefix_lookup; mod produce_log; mod put_kv; +mod rebalance; +mod register_producer_offsets; +mod release_kv_snapshot_lease; +mod remove_server_tag; +mod scan_kv; mod table_exists; mod update_metadata; pub use crate::rpc::RpcError; +pub use acquire_kv_snapshot_lease::*; +pub use add_server_tag::*; pub use alter_cluster_configs::*; pub use alter_database::*; pub use alter_table::*; pub use api_versions::*; pub use authenticate::*; +pub use cancel_rebalance::*; pub use create_acls::*; pub use create_database::*; pub use create_partition::*; pub use create_table::*; pub use database_exists::*; +pub use delete_producer_offsets::*; pub use describe_cluster_configs::*; pub use drop_acls::*; pub use drop_database::*; +pub use drop_kv_snapshot_lease::*; pub use drop_partition::*; pub use drop_table::*; pub use fetch::*; +pub use get_cluster_health::*; pub use get_database_info::*; +pub use get_kv_snapshot_metadata::*; +pub use get_lake_snapshot::*; +pub use get_latest_kv_snapshots::*; pub use get_latest_lake_snapshot::*; +pub use get_producer_offsets::*; pub use get_security_token::*; pub use get_table::*; pub use get_table_schema::*; @@ -86,13 +114,21 @@ pub use limit_scan::*; pub use list_acls::*; pub use list_database_summaries::*; pub use list_databases::*; +pub use list_kv_snapshots::*; pub use list_offsets::*; pub use list_partition_infos::*; +pub use list_rebalance_progress::*; +pub use list_remote_log_manifests::*; pub use list_tables::*; pub use lookup::*; pub use prefix_lookup::*; pub use produce_log::*; pub use put_kv::*; +pub use rebalance::*; +pub use register_producer_offsets::*; +pub use release_kv_snapshot_lease::*; +pub use remove_server_tag::*; +pub use scan_kv::*; pub use table_exists::*; pub use update_metadata::*; diff --git a/crates/fluss/src/rpc/message/rebalance.rs b/crates/fluss/src/rpc/message/rebalance.rs new file mode 100644 index 00000000..f1e47483 --- /dev/null +++ b/crates/fluss/src/rpc/message/rebalance.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct RebalanceRequest { + pub(crate) inner_request: proto::RebalanceRequest, +} + +impl RebalanceRequest { + pub fn new(goals: Vec) -> Self { + RebalanceRequest { + inner_request: proto::RebalanceRequest { goals }, + } + } +} + +impl RequestBody for RebalanceRequest { + type ResponseBody = proto::RebalanceResponse; + const API_KEY: ApiKey = ApiKey::Rebalance; +} + +impl_write_type!(RebalanceRequest); +impl_read_type!(proto::RebalanceResponse); diff --git a/crates/fluss/src/rpc/message/register_producer_offsets.rs b/crates/fluss/src/rpc/message/register_producer_offsets.rs new file mode 100644 index 00000000..5f4ba2a6 --- /dev/null +++ b/crates/fluss/src/rpc/message/register_producer_offsets.rs @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::ProducerTableOffsets; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct RegisterProducerOffsetsRequest { + pub(crate) inner_request: proto::RegisterProducerOffsetsRequest, +} + +impl RegisterProducerOffsetsRequest { + pub fn new( + producer_id: &str, + table_offsets: Vec, + ttl_ms: Option, + ) -> Self { + RegisterProducerOffsetsRequest { + inner_request: proto::RegisterProducerOffsetsRequest { + producer_id: producer_id.to_string(), + table_offsets: table_offsets + .iter() + .map(ProducerTableOffsets::to_pb) + .collect(), + ttl_ms, + }, + } + } +} + +impl RequestBody for RegisterProducerOffsetsRequest { + type ResponseBody = proto::RegisterProducerOffsetsResponse; + const API_KEY: ApiKey = ApiKey::RegisterProducerOffsets; +} + +impl_write_type!(RegisterProducerOffsetsRequest); +impl_read_type!(proto::RegisterProducerOffsetsResponse); diff --git a/crates/fluss/src/rpc/message/release_kv_snapshot_lease.rs b/crates/fluss/src/rpc/message/release_kv_snapshot_lease.rs new file mode 100644 index 00000000..00c1a957 --- /dev/null +++ b/crates/fluss/src/rpc/message/release_kv_snapshot_lease.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::metadata::TableBucket; +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct ReleaseKvSnapshotLeaseRequest { + pub(crate) inner_request: proto::ReleaseKvSnapshotLeaseRequest, +} + +impl ReleaseKvSnapshotLeaseRequest { + pub fn new(lease_id: &str, buckets_to_release: Vec) -> Self { + ReleaseKvSnapshotLeaseRequest { + inner_request: proto::ReleaseKvSnapshotLeaseRequest { + lease_id: lease_id.to_string(), + buckets_to_release: buckets_to_release.iter().map(TableBucket::to_pb).collect(), + }, + } + } +} + +impl RequestBody for ReleaseKvSnapshotLeaseRequest { + type ResponseBody = proto::ReleaseKvSnapshotLeaseResponse; + const API_KEY: ApiKey = ApiKey::ReleaseKvSnapshotLease; +} + +impl_write_type!(ReleaseKvSnapshotLeaseRequest); +impl_read_type!(proto::ReleaseKvSnapshotLeaseResponse); diff --git a/crates/fluss/src/rpc/message/remove_server_tag.rs b/crates/fluss/src/rpc/message/remove_server_tag.rs new file mode 100644 index 00000000..65dd30fb --- /dev/null +++ b/crates/fluss/src/rpc/message/remove_server_tag.rs @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct RemoveServerTagRequest { + pub(crate) inner_request: proto::RemoveServerTagRequest, +} + +impl RemoveServerTagRequest { + pub fn new(server_ids: Vec, server_tag: i32) -> Self { + RemoveServerTagRequest { + inner_request: proto::RemoveServerTagRequest { + server_ids, + server_tag, + }, + } + } +} + +impl RequestBody for RemoveServerTagRequest { + type ResponseBody = proto::RemoveServerTagResponse; + const API_KEY: ApiKey = ApiKey::RemoveServerTag; +} + +impl_write_type!(RemoveServerTagRequest); +impl_read_type!(proto::RemoveServerTagResponse); diff --git a/crates/fluss/src/rpc/message/scan_kv.rs b/crates/fluss/src/rpc/message/scan_kv.rs new file mode 100644 index 00000000..081b4916 --- /dev/null +++ b/crates/fluss/src/rpc/message/scan_kv.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::rpc::api_key::ApiKey; +use crate::rpc::frame::{ReadError, WriteError}; +use crate::rpc::message::{ReadType, RequestBody, WriteType}; +use crate::{impl_read_type, impl_write_type, proto}; +use bytes::{Buf, BufMut}; +use prost::Message; + +#[derive(Debug)] +pub struct ScanKvRequest { + pub(crate) inner_request: proto::ScanKvRequest, +} + +impl ScanKvRequest { + #[allow(dead_code)] + pub(crate) fn new( + scanner_id: Option>, + bucket_scan_req: Option, + call_seq_id: Option, + batch_size_bytes: Option, + close_scanner: Option, + ) -> Self { + ScanKvRequest { + inner_request: proto::ScanKvRequest { + scanner_id, + bucket_scan_req, + call_seq_id, + batch_size_bytes, + close_scanner, + }, + } + } +} + +impl RequestBody for ScanKvRequest { + type ResponseBody = proto::ScanKvResponse; + const API_KEY: ApiKey = ApiKey::ScanKv; +} + +impl_write_type!(ScanKvRequest); +impl_read_type!(proto::ScanKvResponse); From 95ffee9fe4e2ad5210b7c0a0705cbddc8c9e3a35 Mon Sep 17 00:00:00 2001 From: warmbupt Date: Mon, 22 Jun 2026 02:54:57 +0800 Subject: [PATCH 2/3] Add GoalType and ServerTag domain enums for RPC wrappers Replace raw i32 with proper Rust enums matching the Java Fluss definitions: GoalType (ReplicaDistribution/LeaderDistribution/RackAware) and ServerTag (PermanentOffline/TemporaryOffline). Addresses reviewer feedback on PR #630. Co-Authored-By: Claude Opus 4.6 --- crates/fluss/src/metadata/goal_type.rs | 68 +++++++++++++++++++ crates/fluss/src/metadata/mod.rs | 4 ++ crates/fluss/src/metadata/server_tag.rs | 61 +++++++++++++++++ .../fluss/src/rpc/message/add_server_tag.rs | 5 +- crates/fluss/src/rpc/message/rebalance.rs | 7 +- .../src/rpc/message/remove_server_tag.rs | 5 +- 6 files changed, 144 insertions(+), 6 deletions(-) create mode 100644 crates/fluss/src/metadata/goal_type.rs create mode 100644 crates/fluss/src/metadata/server_tag.rs diff --git a/crates/fluss/src/metadata/goal_type.rs b/crates/fluss/src/metadata/goal_type.rs new file mode 100644 index 00000000..3880faae --- /dev/null +++ b/crates/fluss/src/metadata/goal_type.rs @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; + +/// Mirrors Java `org.apache.fluss.cluster.rebalance.GoalType`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum GoalType { + ReplicaDistribution, + LeaderDistribution, + RackAware, +} + +impl GoalType { + pub fn to_i32(self) -> i32 { + match self { + Self::ReplicaDistribution => 0, + Self::LeaderDistribution => 1, + Self::RackAware => 2, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::ReplicaDistribution), + 1 => Ok(Self::LeaderDistribution), + 2 => Ok(Self::RackAware), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported GoalType: {value}"), + }), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_goal_type_roundtrip() { + for goal in [ + GoalType::ReplicaDistribution, + GoalType::LeaderDistribution, + GoalType::RackAware, + ] { + assert_eq!(GoalType::try_from_i32(goal.to_i32()).unwrap(), goal); + } + } + + #[test] + fn test_goal_type_unknown() { + assert!(GoalType::try_from_i32(99).is_err()); + } +} diff --git a/crates/fluss/src/metadata/mod.rs b/crates/fluss/src/metadata/mod.rs index ee5e0c41..0249fefe 100644 --- a/crates/fluss/src/metadata/mod.rs +++ b/crates/fluss/src/metadata/mod.rs @@ -20,11 +20,13 @@ mod config; mod data_lake_format; mod database; mod datatype; +mod goal_type; mod json_serde; mod kv_snapshot_lease; mod partition; mod producer_offsets; mod schema_util; +mod server_tag; mod table; mod table_change; mod table_stats; @@ -34,11 +36,13 @@ pub use config::*; pub use data_lake_format::*; pub use database::*; pub use datatype::*; +pub use goal_type::*; pub use json_serde::*; pub use kv_snapshot_lease::*; pub use partition::*; pub use producer_offsets::*; pub(crate) use schema_util::{UNEXIST_MAPPING, index_mapping}; +pub use server_tag::*; pub use table::*; pub use table_change::*; pub use table_stats::*; diff --git a/crates/fluss/src/metadata/server_tag.rs b/crates/fluss/src/metadata/server_tag.rs new file mode 100644 index 00000000..642bbd6a --- /dev/null +++ b/crates/fluss/src/metadata/server_tag.rs @@ -0,0 +1,61 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{Error, Result}; + +/// Mirrors Java `org.apache.fluss.cluster.rebalance.ServerTag`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ServerTag { + PermanentOffline, + TemporaryOffline, +} + +impl ServerTag { + pub fn to_i32(self) -> i32 { + match self { + Self::PermanentOffline => 0, + Self::TemporaryOffline => 1, + } + } + + pub fn try_from_i32(value: i32) -> Result { + match value { + 0 => Ok(Self::PermanentOffline), + 1 => Ok(Self::TemporaryOffline), + _ => Err(Error::IllegalArgument { + message: format!("Unsupported ServerTag: {value}"), + }), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_server_tag_roundtrip() { + for tag in [ServerTag::PermanentOffline, ServerTag::TemporaryOffline] { + assert_eq!(ServerTag::try_from_i32(tag.to_i32()).unwrap(), tag); + } + } + + #[test] + fn test_server_tag_unknown() { + assert!(ServerTag::try_from_i32(99).is_err()); + } +} diff --git a/crates/fluss/src/rpc/message/add_server_tag.rs b/crates/fluss/src/rpc/message/add_server_tag.rs index eb168777..fdbbd4d0 100644 --- a/crates/fluss/src/rpc/message/add_server_tag.rs +++ b/crates/fluss/src/rpc/message/add_server_tag.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::metadata::ServerTag; use crate::rpc::api_key::ApiKey; use crate::rpc::frame::{ReadError, WriteError}; use crate::rpc::message::{ReadType, RequestBody, WriteType}; @@ -28,11 +29,11 @@ pub struct AddServerTagRequest { } impl AddServerTagRequest { - pub fn new(server_ids: Vec, server_tag: i32) -> Self { + pub fn new(server_ids: Vec, server_tag: ServerTag) -> Self { AddServerTagRequest { inner_request: proto::AddServerTagRequest { server_ids, - server_tag, + server_tag: server_tag.to_i32(), }, } } diff --git a/crates/fluss/src/rpc/message/rebalance.rs b/crates/fluss/src/rpc/message/rebalance.rs index f1e47483..bc278510 100644 --- a/crates/fluss/src/rpc/message/rebalance.rs +++ b/crates/fluss/src/rpc/message/rebalance.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::metadata::GoalType; use crate::rpc::api_key::ApiKey; use crate::rpc::frame::{ReadError, WriteError}; use crate::rpc::message::{ReadType, RequestBody, WriteType}; @@ -28,9 +29,11 @@ pub struct RebalanceRequest { } impl RebalanceRequest { - pub fn new(goals: Vec) -> Self { + pub fn new(goals: Vec) -> Self { RebalanceRequest { - inner_request: proto::RebalanceRequest { goals }, + inner_request: proto::RebalanceRequest { + goals: goals.iter().map(|g| g.to_i32()).collect(), + }, } } } diff --git a/crates/fluss/src/rpc/message/remove_server_tag.rs b/crates/fluss/src/rpc/message/remove_server_tag.rs index 65dd30fb..f8a5258a 100644 --- a/crates/fluss/src/rpc/message/remove_server_tag.rs +++ b/crates/fluss/src/rpc/message/remove_server_tag.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::metadata::ServerTag; use crate::rpc::api_key::ApiKey; use crate::rpc::frame::{ReadError, WriteError}; use crate::rpc::message::{ReadType, RequestBody, WriteType}; @@ -28,11 +29,11 @@ pub struct RemoveServerTagRequest { } impl RemoveServerTagRequest { - pub fn new(server_ids: Vec, server_tag: i32) -> Self { + pub fn new(server_ids: Vec, server_tag: ServerTag) -> Self { RemoveServerTagRequest { inner_request: proto::RemoveServerTagRequest { server_ids, - server_tag, + server_tag: server_tag.to_i32(), }, } } From a404a8c6e3f87d119fe875ec994c56f91c4850cd Mon Sep 17 00:00:00 2001 From: warmbupt Date: Wed, 24 Jun 2026 09:46:08 +0800 Subject: [PATCH 3/3] Apply TableId/PartitionId/BucketId type aliases to RPC messages Use the i64/i32 type aliases from lib.rs instead of raw primitives in the new RPC message wrappers introduced by this PR. Co-Authored-By: Claude Opus 4.7 --- crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs | 9 +++++++-- crates/fluss/src/rpc/message/get_table_stats.rs | 4 ++-- crates/fluss/src/rpc/message/limit_scan.rs | 9 +++++++-- crates/fluss/src/rpc/message/list_kv_snapshots.rs | 4 ++-- .../fluss/src/rpc/message/list_remote_log_manifests.rs | 4 ++-- 5 files changed, 20 insertions(+), 10 deletions(-) diff --git a/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs b/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs index b6dfd426..cf7f9a2c 100644 --- a/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs +++ b/crates/fluss/src/rpc/message/get_kv_snapshot_metadata.rs @@ -18,7 +18,7 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::frame::{ReadError, WriteError}; use crate::rpc::message::{ReadType, RequestBody, WriteType}; -use crate::{impl_read_type, impl_write_type, proto}; +use crate::{BucketId, PartitionId, TableId, impl_read_type, impl_write_type, proto}; use bytes::{Buf, BufMut}; use prost::Message; @@ -28,7 +28,12 @@ pub struct GetKvSnapshotMetadataRequest { } impl GetKvSnapshotMetadataRequest { - pub fn new(table_id: i64, partition_id: Option, bucket_id: i32, snapshot_id: i64) -> Self { + pub fn new( + table_id: TableId, + partition_id: Option, + bucket_id: BucketId, + snapshot_id: i64, + ) -> Self { GetKvSnapshotMetadataRequest { inner_request: proto::GetKvSnapshotMetadataRequest { table_id, diff --git a/crates/fluss/src/rpc/message/get_table_stats.rs b/crates/fluss/src/rpc/message/get_table_stats.rs index c5ef610a..c1c67614 100644 --- a/crates/fluss/src/rpc/message/get_table_stats.rs +++ b/crates/fluss/src/rpc/message/get_table_stats.rs @@ -19,7 +19,7 @@ use crate::metadata::BucketStatsRequest; use crate::rpc::api_key::ApiKey; use crate::rpc::frame::{ReadError, WriteError}; use crate::rpc::message::{ReadType, RequestBody, WriteType}; -use crate::{impl_read_type, impl_write_type, proto}; +use crate::{TableId, impl_read_type, impl_write_type, proto}; use bytes::{Buf, BufMut}; use prost::Message; @@ -30,7 +30,7 @@ pub struct GetTableStatsRequest { impl GetTableStatsRequest { pub fn new( - table_id: i64, + table_id: TableId, buckets_req: Vec, target_columns: Vec, ) -> Self { diff --git a/crates/fluss/src/rpc/message/limit_scan.rs b/crates/fluss/src/rpc/message/limit_scan.rs index cf9a0363..9dfa408e 100644 --- a/crates/fluss/src/rpc/message/limit_scan.rs +++ b/crates/fluss/src/rpc/message/limit_scan.rs @@ -21,7 +21,7 @@ use crate::rpc::frame::ReadError; use crate::rpc::api_key::ApiKey; use crate::rpc::frame::WriteError; use crate::rpc::message::{ReadType, RequestBody, WriteType}; -use crate::{impl_read_type, impl_write_type, proto}; +use crate::{BucketId, PartitionId, TableId, impl_read_type, impl_write_type, proto}; use prost::Message; use bytes::{Buf, BufMut}; @@ -31,7 +31,12 @@ pub struct LimitScanRequest { } impl LimitScanRequest { - pub fn new(table_id: i64, partition_id: Option, bucket_id: i32, limit: i32) -> Self { + pub fn new( + table_id: TableId, + partition_id: Option, + bucket_id: BucketId, + limit: i32, + ) -> Self { let request = proto::LimitScanRequest { table_id, partition_id, diff --git a/crates/fluss/src/rpc/message/list_kv_snapshots.rs b/crates/fluss/src/rpc/message/list_kv_snapshots.rs index 2b9b10ce..63bce05e 100644 --- a/crates/fluss/src/rpc/message/list_kv_snapshots.rs +++ b/crates/fluss/src/rpc/message/list_kv_snapshots.rs @@ -18,7 +18,7 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::frame::{ReadError, WriteError}; use crate::rpc::message::{ReadType, RequestBody, WriteType}; -use crate::{impl_read_type, impl_write_type, proto}; +use crate::{PartitionId, TableId, impl_read_type, impl_write_type, proto}; use bytes::{Buf, BufMut}; use prost::Message; @@ -28,7 +28,7 @@ pub struct ListKvSnapshotsRequest { } impl ListKvSnapshotsRequest { - pub fn new(table_id: i64, partition_id: Option) -> Self { + pub fn new(table_id: TableId, partition_id: Option) -> Self { ListKvSnapshotsRequest { inner_request: proto::ListKvSnapshotsRequest { table_id, diff --git a/crates/fluss/src/rpc/message/list_remote_log_manifests.rs b/crates/fluss/src/rpc/message/list_remote_log_manifests.rs index ad0946f7..3ea00367 100644 --- a/crates/fluss/src/rpc/message/list_remote_log_manifests.rs +++ b/crates/fluss/src/rpc/message/list_remote_log_manifests.rs @@ -18,7 +18,7 @@ use crate::rpc::api_key::ApiKey; use crate::rpc::frame::{ReadError, WriteError}; use crate::rpc::message::{ReadType, RequestBody, WriteType}; -use crate::{impl_read_type, impl_write_type, proto}; +use crate::{PartitionId, TableId, impl_read_type, impl_write_type, proto}; use bytes::{Buf, BufMut}; use prost::Message; @@ -28,7 +28,7 @@ pub struct ListRemoteLogManifestsRequest { } impl ListRemoteLogManifestsRequest { - pub fn new(table_id: i64, partition_id: Option) -> Self { + pub fn new(table_id: TableId, partition_id: Option) -> Self { ListRemoteLogManifestsRequest { inner_request: proto::ListRemoteLogManifestsRequest { table_id,