Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 95 additions & 23 deletions crates/fluss/src/client/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ impl Metadata {
.send_modify(|v| *v = v.wrapping_add(1));
}

fn parse_bootstrap(boot_strap: &str) -> Result<SocketAddr> {
fn parse_bootstrap(bootstrap: &str) -> Result<SocketAddr> {
// Resolve all socket addresses and deterministically choose one.
let addrs = boot_strap
let addrs = bootstrap
.to_socket_addrs()
.map_err(|e| Error::IllegalArgument {
message: format!("Invalid bootstrap address '{boot_strap}': {e}"),
message: format!("Invalid bootstrap address '{bootstrap}': {e}"),
})?;

// Prefer IPv4 addresses; if none are available, fall back to the first IPv6.
Expand All @@ -77,20 +77,67 @@ impl Metadata {
}

let addr = ipv6_candidate.ok_or_else(|| Error::IllegalArgument {
message: format!("Unable to resolve bootstrap address '{boot_strap}'"),
message: format!("Unable to resolve bootstrap address '{bootstrap}'"),
})?;
Ok(addr)
}

async fn init_cluster(boot_strap: &str, connections: Arc<RpcClient>) -> Result<Cluster> {
let socket_address = Self::parse_bootstrap(boot_strap)?;
let server_node = ServerNode::new(
-1,
socket_address.ip().to_string(),
socket_address.port() as u32,
ServerType::Unknown,
);
let con = connections.get_connection(&server_node).await?;
fn parse_bootstrap_servers(bootstrap_servers: &str) -> Result<Vec<SocketAddr>> {
let mut addrs = Vec::new();

for bootstrap in bootstrap_servers.split(',') {
let bootstrap = bootstrap.trim();
if bootstrap.is_empty() {
return Err(Error::IllegalArgument {
message: format!(
"Invalid bootstrap servers '{bootstrap_servers}': empty bootstrap server"
),
});
}
addrs.push(Self::parse_bootstrap(bootstrap)?);
}

if addrs.is_empty() {
return Err(Error::IllegalArgument {
message: "No bootstrap servers configured".to_string(),
});
}

Ok(addrs)
}
Comment on lines +85 to +107

async fn init_cluster(bootstrap_servers: &str, connections: Arc<RpcClient>) -> Result<Cluster> {
let socket_addresses = Self::parse_bootstrap_servers(bootstrap_servers)?;
let mut errors = Vec::new();

for socket_address in socket_addresses {
let server_node = ServerNode::new(
-1,
socket_address.ip().to_string(),
socket_address.port() as u32,
ServerType::Unknown,
);

match Self::fetch_cluster_from_bootstrap(&server_node, connections.clone()).await {
Ok(cluster) => return Ok(cluster),
Err(err) => errors.push(format!("{socket_address}: {err}")),
}
}

Err(Error::UnexpectedError {
message: format!(
"Unable to initialize cluster from bootstrap servers '{bootstrap_servers}': {}",
errors.join("; ")
),
source: None,
})
Comment on lines +110 to +133
}

async fn fetch_cluster_from_bootstrap(
server_node: &ServerNode,
connections: Arc<RpcClient>,
) -> Result<Cluster> {
let con = connections.get_connection(server_node).await?;

let response = con
.request(UpdateMetadataRequest::new(
Expand Down Expand Up @@ -344,27 +391,52 @@ mod tests {
#[test]
fn parse_bootstrap_variants() {
// valid IP
let addr = Metadata::parse_bootstrap("127.0.0.1:8080").unwrap();
assert_eq!(addr.port(), 8080);
let addrs = Metadata::parse_bootstrap_servers("127.0.0.1:8080").unwrap();
assert_eq!(addrs.len(), 1);
assert_eq!(addrs[0].port(), 8080);

// valid hostname
Comment on lines +394 to 398
let addr = Metadata::parse_bootstrap("localhost:9090").unwrap();
assert_eq!(addr.port(), 9090);
let addrs = Metadata::parse_bootstrap_servers("localhost:9090").unwrap();
assert_eq!(addrs.len(), 1);
assert_eq!(addrs[0].port(), 9090);

// valid IPv6 address
let addr = Metadata::parse_bootstrap("[::1]:8080").unwrap();
assert_eq!(addr.port(), 8080);
let addrs = Metadata::parse_bootstrap_servers("[::1]:8080").unwrap();
assert_eq!(addrs.len(), 1);
assert_eq!(addrs[0].port(), 8080);

// invalid input: missing port
assert!(Metadata::parse_bootstrap("localhost").is_err());
assert!(Metadata::parse_bootstrap_servers("localhost").is_err());

// invalid input: out-of-range port
assert!(Metadata::parse_bootstrap("localhost:99999").is_err());
assert!(Metadata::parse_bootstrap_servers("localhost:99999").is_err());

// invalid input: empty string
assert!(Metadata::parse_bootstrap("").is_err());
assert!(Metadata::parse_bootstrap_servers("").is_err());

// invalid input: nonsensical address
assert!(Metadata::parse_bootstrap("invalid_address").is_err());
assert!(Metadata::parse_bootstrap_servers("invalid_address").is_err());
}

#[test]
fn parse_bootstrap_accepts_comma_separated_servers() {
let addrs = Metadata::parse_bootstrap_servers("127.0.0.1:8080, localhost:9090, [::1]:7070")
.unwrap();

assert_eq!(addrs.len(), 3);
assert_eq!(addrs[0].port(), 8080);
assert_eq!(addrs[1].port(), 9090);
assert_eq!(addrs[2].port(), 7070);
Comment on lines +423 to +429
Comment on lines +427 to +429

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should assertion be done on the host/ip address value as well? Also applies on the previous test case.

}

#[test]
fn parse_bootstrap_rejects_empty_comma_separated_entries() {
let err = Metadata::parse_bootstrap_servers("127.0.0.1:8080, , localhost:9090")
.expect_err("empty bootstrap entries should be rejected");

assert!(
err.to_string().contains("empty bootstrap server"),
"unexpected error: {err}"
);
}
}
Loading