From 04c3da80d071d97481c4cfc47c485ffbffbdba5c Mon Sep 17 00:00:00 2001 From: slfan1989 Date: Sat, 20 Jun 2026 15:06:01 +0800 Subject: [PATCH] fix: support comma-separated bootstrap servers. --- crates/fluss/src/client/metadata.rs | 118 ++++++++++++++++++++++------ 1 file changed, 95 insertions(+), 23 deletions(-) diff --git a/crates/fluss/src/client/metadata.rs b/crates/fluss/src/client/metadata.rs index 20a8a159..566c6519 100644 --- a/crates/fluss/src/client/metadata.rs +++ b/crates/fluss/src/client/metadata.rs @@ -57,12 +57,12 @@ impl Metadata { .send_modify(|v| *v = v.wrapping_add(1)); } - fn parse_bootstrap(boot_strap: &str) -> Result { + fn parse_bootstrap(bootstrap: &str) -> Result { // Resolve all socket addresses and deterministically choose one. - let addrs = boot_strap + let addrs = bootstrap .to_socket_addrs() .map_err(|e| Error::IllegalArgument { - message: format!("Invalid bootstrap address '{boot_strap}': {e}"), + message: format!("Invalid bootstrap address '{bootstrap}': {e}"), })?; // Prefer IPv4 addresses; if none are available, fall back to the first IPv6. @@ -77,20 +77,67 @@ impl Metadata { } let addr = ipv6_candidate.ok_or_else(|| Error::IllegalArgument { - message: format!("Unable to resolve bootstrap address '{boot_strap}'"), + message: format!("Unable to resolve bootstrap address '{bootstrap}'"), })?; Ok(addr) } - async fn init_cluster(boot_strap: &str, connections: Arc) -> Result { - let socket_address = Self::parse_bootstrap(boot_strap)?; - let server_node = ServerNode::new( - -1, - socket_address.ip().to_string(), - socket_address.port() as u32, - ServerType::Unknown, - ); - let con = connections.get_connection(&server_node).await?; + fn parse_bootstrap_servers(bootstrap_servers: &str) -> Result> { + let mut addrs = Vec::new(); + + for bootstrap in bootstrap_servers.split(',') { + let bootstrap = bootstrap.trim(); + if bootstrap.is_empty() { + return Err(Error::IllegalArgument { + message: format!( + "Invalid bootstrap servers '{bootstrap_servers}': empty bootstrap server" + ), + }); + } + addrs.push(Self::parse_bootstrap(bootstrap)?); + } + + if addrs.is_empty() { + return Err(Error::IllegalArgument { + message: "No bootstrap servers configured".to_string(), + }); + } + + Ok(addrs) + } + + async fn init_cluster(bootstrap_servers: &str, connections: Arc) -> Result { + let socket_addresses = Self::parse_bootstrap_servers(bootstrap_servers)?; + let mut errors = Vec::new(); + + for socket_address in socket_addresses { + let server_node = ServerNode::new( + -1, + socket_address.ip().to_string(), + socket_address.port() as u32, + ServerType::Unknown, + ); + + match Self::fetch_cluster_from_bootstrap(&server_node, connections.clone()).await { + Ok(cluster) => return Ok(cluster), + Err(err) => errors.push(format!("{socket_address}: {err}")), + } + } + + Err(Error::UnexpectedError { + message: format!( + "Unable to initialize cluster from bootstrap servers '{bootstrap_servers}': {}", + errors.join("; ") + ), + source: None, + }) + } + + async fn fetch_cluster_from_bootstrap( + server_node: &ServerNode, + connections: Arc, + ) -> Result { + let con = connections.get_connection(server_node).await?; let response = con .request(UpdateMetadataRequest::new( @@ -344,27 +391,52 @@ mod tests { #[test] fn parse_bootstrap_variants() { // valid IP - let addr = Metadata::parse_bootstrap("127.0.0.1:8080").unwrap(); - assert_eq!(addr.port(), 8080); + let addrs = Metadata::parse_bootstrap_servers("127.0.0.1:8080").unwrap(); + assert_eq!(addrs.len(), 1); + assert_eq!(addrs[0].port(), 8080); // valid hostname - let addr = Metadata::parse_bootstrap("localhost:9090").unwrap(); - assert_eq!(addr.port(), 9090); + let addrs = Metadata::parse_bootstrap_servers("localhost:9090").unwrap(); + assert_eq!(addrs.len(), 1); + assert_eq!(addrs[0].port(), 9090); // valid IPv6 address - let addr = Metadata::parse_bootstrap("[::1]:8080").unwrap(); - assert_eq!(addr.port(), 8080); + let addrs = Metadata::parse_bootstrap_servers("[::1]:8080").unwrap(); + assert_eq!(addrs.len(), 1); + assert_eq!(addrs[0].port(), 8080); // invalid input: missing port - assert!(Metadata::parse_bootstrap("localhost").is_err()); + assert!(Metadata::parse_bootstrap_servers("localhost").is_err()); // invalid input: out-of-range port - assert!(Metadata::parse_bootstrap("localhost:99999").is_err()); + assert!(Metadata::parse_bootstrap_servers("localhost:99999").is_err()); // invalid input: empty string - assert!(Metadata::parse_bootstrap("").is_err()); + assert!(Metadata::parse_bootstrap_servers("").is_err()); // invalid input: nonsensical address - assert!(Metadata::parse_bootstrap("invalid_address").is_err()); + assert!(Metadata::parse_bootstrap_servers("invalid_address").is_err()); + } + + #[test] + fn parse_bootstrap_accepts_comma_separated_servers() { + let addrs = Metadata::parse_bootstrap_servers("127.0.0.1:8080, localhost:9090, [::1]:7070") + .unwrap(); + + assert_eq!(addrs.len(), 3); + assert_eq!(addrs[0].port(), 8080); + assert_eq!(addrs[1].port(), 9090); + assert_eq!(addrs[2].port(), 7070); + } + + #[test] + fn parse_bootstrap_rejects_empty_comma_separated_entries() { + let err = Metadata::parse_bootstrap_servers("127.0.0.1:8080, , localhost:9090") + .expect_err("empty bootstrap entries should be rejected"); + + assert!( + err.to_string().contains("empty bootstrap server"), + "unexpected error: {err}" + ); } }