Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
407 changes: 402 additions & 5 deletions crates/fluss/src/client/admin.rs

Large diffs are not rendered by default.

74 changes: 73 additions & 1 deletion crates/fluss/src/metadata/acl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
// under the License.

use crate::error::{Error, Result};
use crate::proto::{PbAclFilter, PbAclInfo};
use crate::proto::{
PbAclFilter, PbAclInfo, PbCreateAclRespInfo, PbDropAclsFilterResult, PbDropAclsMatchingAcl,
};

/// Mirrors Java `org.apache.fluss.security.acl.ResourceType`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -197,6 +199,76 @@ impl AclFilter {
}
}

/// One per ACL submitted to `create_acls`: success or a server-side error.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CreateAclResult {
pub acl: AclInfo,
pub error: Option<AclError>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AclError {
pub code: i32,
pub message: Option<String>,
}

impl CreateAclResult {
pub fn from_pb(pb: &PbCreateAclRespInfo) -> Result<Self> {
Ok(Self {
acl: AclInfo::from_pb(&pb.acl)?,
error: pb.error_code.map(|code| AclError {
code,
message: pb.error_message.clone(),
}),
})
}
}

/// One ACL matched by a filter in `drop_acls`. Reports the bound ACL and any
/// server-side error encountered while dropping it.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DropAclMatchingAcl {
pub acl: AclInfo,
pub error: Option<AclError>,
}

impl DropAclMatchingAcl {
pub fn from_pb(pb: &PbDropAclsMatchingAcl) -> Result<Self> {
Ok(Self {
acl: AclInfo::from_pb(&pb.acl)?,
error: pb.error_code.map(|code| AclError {
code,
message: pb.error_message.clone(),
}),
})
}
}

/// One per filter submitted to `drop_acls`: the matching ACLs that were
/// targeted plus any filter-level error.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DropAclsFilterResult {
pub matching_acls: Vec<DropAclMatchingAcl>,
pub error: Option<AclError>,
}

impl DropAclsFilterResult {
pub fn from_pb(pb: &PbDropAclsFilterResult) -> Result<Self> {
let matching_acls = pb
.matching_acls
.iter()
.map(DropAclMatchingAcl::from_pb)
.collect::<Result<Vec<_>>>()?;
Ok(Self {
matching_acls,
error: pb.error_code.map(|code| AclError {
code,
message: pb.error_message.clone(),
}),
})
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
109 changes: 109 additions & 0 deletions crates/fluss/src/metadata/cluster_health.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::error::{Error, Result};
use crate::proto::GetClusterHealthResponse;

/// Mirrors Java `org.apache.fluss.client.admin.ClusterHealthStatus`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ClusterHealthStatus {
Green,
Yellow,
Red,
Unknown,
}

impl ClusterHealthStatus {
pub fn to_i32(self) -> i32 {
match self {
Self::Green => 0,
Self::Yellow => 1,
Self::Red => 2,
Self::Unknown => 3,
}
}

pub fn try_from_i32(value: i32) -> Result<Self> {
match value {
0 => Ok(Self::Green),
1 => Ok(Self::Yellow),
2 => Ok(Self::Red),
3 => Ok(Self::Unknown),
_ => Err(Error::IllegalArgument {
message: format!("Unsupported ClusterHealthStatus: {value}"),
}),
}
}
}

/// Result of `get_cluster_health`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ClusterHealth {
pub num_replicas: i32,
pub in_sync_replicas: i32,
pub num_leader_replicas: i32,
pub active_leader_replicas: i32,
pub status: ClusterHealthStatus,
}

impl ClusterHealth {
pub fn from_pb(pb: &GetClusterHealthResponse) -> Result<Self> {
Ok(Self {
num_replicas: pb.num_replicas,
in_sync_replicas: pb.in_sync_replicas,
num_leader_replicas: pb.num_leader_replicas,
active_leader_replicas: pb.active_leader_replicas,
status: ClusterHealthStatus::try_from_i32(pb.status)?,
})
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_cluster_health_status_roundtrip() {
for s in [
ClusterHealthStatus::Green,
ClusterHealthStatus::Yellow,
ClusterHealthStatus::Red,
ClusterHealthStatus::Unknown,
] {
assert_eq!(ClusterHealthStatus::try_from_i32(s.to_i32()).unwrap(), s);
}
}

#[test]
fn test_cluster_health_status_unknown_value() {
assert!(ClusterHealthStatus::try_from_i32(99).is_err());
}

#[test]
fn test_cluster_health_from_pb() {
let pb = GetClusterHealthResponse {
num_replicas: 5,
in_sync_replicas: 4,
num_leader_replicas: 3,
active_leader_replicas: 3,
status: 1,
};
let h = ClusterHealth::from_pb(&pb).unwrap();
assert_eq!(h.num_replicas, 5);
assert_eq!(h.status, ClusterHealthStatus::Yellow);
}
}
29 changes: 28 additions & 1 deletion crates/fluss/src/metadata/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

use crate::error::{Error, Result};
use crate::proto::PbAlterConfig;
use crate::proto::{PbAlterConfig, PbDescribeConfig};

/// Mirrors Java `org.apache.fluss.config.cluster.AlterConfigOpType`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -88,6 +88,33 @@ impl AlterConfig {
}
}

/// One entry in the response of `describe_cluster_configs`. Mirrors Java's
/// `org.apache.fluss.config.cluster.DescribeConfig`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DescribeConfig {
pub config_key: String,
pub config_value: Option<String>,
pub config_source: String,
}

impl DescribeConfig {
pub fn from_pb(pb: &PbDescribeConfig) -> Self {
Self {
config_key: pb.config_key.clone(),
config_value: pb.config_value.clone(),
config_source: pb.config_source.clone(),
}
}

pub fn to_pb(&self) -> PbDescribeConfig {
PbDescribeConfig {
config_key: self.config_key.clone(),
config_value: self.config_value.clone(),
config_source: self.config_source.clone(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
19 changes: 19 additions & 0 deletions crates/fluss/src/metadata/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use crate::error::Error::JsonSerdeError;
use crate::error::Result;
use crate::metadata::JsonSerde;
use crate::proto::PbDatabaseSummary;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::collections::HashMap;
Expand Down Expand Up @@ -205,6 +206,24 @@ impl DatabaseDescriptor {
}
}

/// Lightweight summary of a database returned by `list_database_summaries`.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DatabaseSummary {
pub database_name: String,
pub created_time: i64,
pub table_count: i32,
}

impl DatabaseSummary {
pub fn from_pb(pb: &PbDatabaseSummary) -> Self {
Self {
database_name: pb.database_name.clone(),
created_time: pb.created_time,
table_count: pb.table_count,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading