Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions crates/fluss/src/metadata/goal_type.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::error::{Error, Result};

/// Mirrors Java `org.apache.fluss.cluster.rebalance.GoalType`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GoalType {
ReplicaDistribution,
LeaderDistribution,
RackAware,
}

impl GoalType {
pub fn to_i32(self) -> i32 {
match self {
Self::ReplicaDistribution => 0,
Self::LeaderDistribution => 1,
Self::RackAware => 2,
}
}

pub fn try_from_i32(value: i32) -> Result<Self> {
match value {
0 => Ok(Self::ReplicaDistribution),
1 => Ok(Self::LeaderDistribution),
2 => Ok(Self::RackAware),
_ => Err(Error::IllegalArgument {
message: format!("Unsupported GoalType: {value}"),
}),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_goal_type_roundtrip() {
for goal in [
GoalType::ReplicaDistribution,
GoalType::LeaderDistribution,
GoalType::RackAware,
] {
assert_eq!(GoalType::try_from_i32(goal.to_i32()).unwrap(), goal);
}
}

#[test]
fn test_goal_type_unknown() {
assert!(GoalType::try_from_i32(99).is_err());
}
}
118 changes: 118 additions & 0 deletions crates/fluss/src/metadata/kv_snapshot_lease.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::proto::{PbKvSnapshotLeaseForBucket, PbKvSnapshotLeaseForTable};

/// One bucket's slot in a KV-snapshot lease request.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KvSnapshotLeaseForBucket {
pub partition_id: Option<i64>,
pub bucket_id: i32,
pub snapshot_id: i64,
}

impl KvSnapshotLeaseForBucket {
pub fn to_pb(&self) -> PbKvSnapshotLeaseForBucket {
PbKvSnapshotLeaseForBucket {
partition_id: self.partition_id,
bucket_id: self.bucket_id,
snapshot_id: self.snapshot_id,
}
}

pub fn from_pb(pb: &PbKvSnapshotLeaseForBucket) -> Self {
Self {
partition_id: pb.partition_id,
bucket_id: pb.bucket_id,
snapshot_id: pb.snapshot_id,
}
}
}

/// All the buckets of a single table that should be leased together.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct KvSnapshotLeaseForTable {
pub table_id: i64,
pub bucket_snapshots: Vec<KvSnapshotLeaseForBucket>,
}

impl KvSnapshotLeaseForTable {
pub fn to_pb(&self) -> PbKvSnapshotLeaseForTable {
PbKvSnapshotLeaseForTable {
table_id: self.table_id,
bucket_snapshots: self
.bucket_snapshots
.iter()
.map(KvSnapshotLeaseForBucket::to_pb)
.collect(),
}
}

pub fn from_pb(pb: &PbKvSnapshotLeaseForTable) -> Self {
Self {
table_id: pb.table_id,
bucket_snapshots: pb
.bucket_snapshots
.iter()
.map(KvSnapshotLeaseForBucket::from_pb)
.collect(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_kv_snapshot_lease_for_bucket_roundtrip() {
for b in [
KvSnapshotLeaseForBucket {
partition_id: None,
bucket_id: 0,
snapshot_id: 10,
},
KvSnapshotLeaseForBucket {
partition_id: Some(42),
bucket_id: 3,
snapshot_id: 99,
},
] {
assert_eq!(KvSnapshotLeaseForBucket::from_pb(&b.to_pb()), b);
}
}

#[test]
fn test_kv_snapshot_lease_for_table_roundtrip() {
let t = KvSnapshotLeaseForTable {
table_id: 7,
bucket_snapshots: vec![
KvSnapshotLeaseForBucket {
partition_id: None,
bucket_id: 0,
snapshot_id: 10,
},
KvSnapshotLeaseForBucket {
partition_id: Some(42),
bucket_id: 1,
snapshot_id: 11,
},
],
};
assert_eq!(KvSnapshotLeaseForTable::from_pb(&t.to_pb()), t);
}
}
8 changes: 8 additions & 0 deletions crates/fluss/src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ mod config;
mod data_lake_format;
mod database;
mod datatype;
mod goal_type;
mod json_serde;
mod kv_snapshot_lease;
mod partition;
mod producer_offsets;
mod schema_util;
mod server_tag;
mod table;
mod table_change;
mod table_stats;
Expand All @@ -32,9 +36,13 @@ pub use config::*;
pub use data_lake_format::*;
pub use database::*;
pub use datatype::*;
pub use goal_type::*;
pub use json_serde::*;
pub use kv_snapshot_lease::*;
pub use partition::*;
pub use producer_offsets::*;
pub(crate) use schema_util::{UNEXIST_MAPPING, index_mapping};
pub use server_tag::*;
pub use table::*;
pub use table_change::*;
pub use table_stats::*;
118 changes: 118 additions & 0 deletions crates/fluss/src/metadata/producer_offsets.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::proto::{PbBucketOffset, PbProducerTableOffsets};

/// Per-bucket producer log-end offset.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BucketOffset {
pub partition_id: Option<i64>,
pub bucket_id: i32,
pub log_end_offset: Option<i64>,
}

impl BucketOffset {
pub fn to_pb(&self) -> PbBucketOffset {
PbBucketOffset {
partition_id: self.partition_id,
bucket_id: self.bucket_id,
log_end_offset: self.log_end_offset,
}
}

pub fn from_pb(pb: &PbBucketOffset) -> Self {
Self {
partition_id: pb.partition_id,
bucket_id: pb.bucket_id,
log_end_offset: pb.log_end_offset,
}
}
}

/// All bucket offsets of a single table belonging to one producer.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProducerTableOffsets {
pub table_id: i64,
pub bucket_offsets: Vec<BucketOffset>,
}

impl ProducerTableOffsets {
pub fn to_pb(&self) -> PbProducerTableOffsets {
PbProducerTableOffsets {
table_id: self.table_id,
bucket_offsets: self
.bucket_offsets
.iter()
.map(BucketOffset::to_pb)
.collect(),
}
}

pub fn from_pb(pb: &PbProducerTableOffsets) -> Self {
Self {
table_id: pb.table_id,
bucket_offsets: pb
.bucket_offsets
.iter()
.map(BucketOffset::from_pb)
.collect(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_bucket_offset_roundtrip() {
for b in [
BucketOffset {
partition_id: None,
bucket_id: 0,
log_end_offset: None,
},
BucketOffset {
partition_id: Some(42),
bucket_id: 3,
log_end_offset: Some(1234),
},
] {
assert_eq!(BucketOffset::from_pb(&b.to_pb()), b);
}
}

#[test]
fn test_producer_table_offsets_roundtrip() {
let t = ProducerTableOffsets {
table_id: 5,
bucket_offsets: vec![
BucketOffset {
partition_id: None,
bucket_id: 0,
log_end_offset: Some(100),
},
BucketOffset {
partition_id: Some(7),
bucket_id: 1,
log_end_offset: None,
},
],
};
assert_eq!(ProducerTableOffsets::from_pb(&t.to_pb()), t);
}
}
61 changes: 61 additions & 0 deletions crates/fluss/src/metadata/server_tag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use crate::error::{Error, Result};

/// Mirrors Java `org.apache.fluss.cluster.rebalance.ServerTag`.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ServerTag {
PermanentOffline,
TemporaryOffline,
}

impl ServerTag {
pub fn to_i32(self) -> i32 {
match self {
Self::PermanentOffline => 0,
Self::TemporaryOffline => 1,
}
}

pub fn try_from_i32(value: i32) -> Result<Self> {
match value {
0 => Ok(Self::PermanentOffline),
1 => Ok(Self::TemporaryOffline),
_ => Err(Error::IllegalArgument {
message: format!("Unsupported ServerTag: {value}"),
}),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_server_tag_roundtrip() {
for tag in [ServerTag::PermanentOffline, ServerTag::TemporaryOffline] {
assert_eq!(ServerTag::try_from_i32(tag.to_i32()).unwrap(), tag);
}
}

#[test]
fn test_server_tag_unknown() {
assert!(ServerTag::try_from_i32(99).is_err());
}
}
Loading
Loading