diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 581b648ae71..d27c5ebfb77 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -672,12 +672,17 @@ mod tests { { let mut state_guard = router.state.lock().await; - state_guard.routing_table.apply_capacity_update( - "test-ingester-0".into(), + state_guard.routing_table.merge_from_shards( IndexUid::for_test("test-index-0", 0), "test-source".to_string(), - 8, - 1, + vec![Shard { + index_uid: Some(IndexUid::for_test("test-index-0", 0)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1u64)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-0".to_string(), + ..Default::default() + }], ); } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index f5475387287..529922d657a 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -41,6 +41,11 @@ pub(super) struct IngesterNode { #[derive(Debug, Default)] pub(super) struct RoutingEntry { pub nodes: HashMap, + /// Whether this entry has been seeded from a control plane response. During a rolling + /// deployment, Chitchat broadcasts from already-upgraded nodes may populate the table + /// before the router ever asks the CP, causing it to miss old-version nodes. This flag + /// ensures the router asks the CP at least once per (index, source) pair. + seeded_from_cp: bool, } /// Given a slice of candidates, picks the better of two random choices. @@ -180,6 +185,10 @@ impl RoutingTable { let Some(entry) = self.table.get(&key) else { return false; }; + // Routers must sync with the control plane at least once per (index, source). + if !entry.seeded_from_cp { + return false; + } entry.nodes.values().any(|node| { node.capacity_score > 0 && node.open_shard_count > 0 @@ -250,6 +259,7 @@ impl RoutingTable { open_shard_count, }); } + entry.seeded_from_cp = true; } } @@ -327,18 +337,33 @@ mod tests { fn test_has_open_nodes() { let mut table = RoutingTable::default(); let pool = IngesterPool::default(); + let index_uid = IndexUid::for_test("test-index", 0); // Empty table. assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); + // Seed from CP so has_open_nodes can return true. + let shards = vec![ + Shard { + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1u64)), + shard_state: ShardState::Open as i32, + leader_id: "node-1".to_string(), + ..Default::default() + }, + Shard { + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(2u64)), + shard_state: ShardState::Open as i32, + leader_id: "node-2".to_string(), + ..Default::default() + }, + ]; + table.merge_from_shards(index_uid.clone(), "test-source".into(), shards); + // Node exists but is not in pool. - table.apply_capacity_update( - "node-1".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 8, - 3, - ); assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); // Node is in pool → true. @@ -350,13 +375,6 @@ mod tests { assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); // Second node available → true despite first being unavailable. - table.apply_capacity_update( - "node-2".into(), - IndexUid::for_test("test-index", 0), - "test-source".into(), - 6, - 2, - ); pool.insert("node-2".into(), mocked_ingester(None)); assert!(table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); @@ -371,6 +389,40 @@ mod tests { assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable)); } + #[test] + fn test_has_open_nodes_requires_cp_seed() { + let mut table = RoutingTable::default(); + let pool = IngesterPool::default(); + pool.insert("node-1".into(), mocked_ingester(None)); + + // Chitchat broadcast populates the entry, but has_open_nodes still returns false + // because the entry hasn't been seeded from the control plane yet. + table.apply_capacity_update( + "node-1".into(), + IndexUid::for_test("test-index", 0), + "test-source".into(), + 8, + 3, + ); + assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); + + // After merge_from_shards (CP response), has_open_nodes returns true. + let shards = vec![Shard { + index_uid: Some(IndexUid::for_test("test-index", 0)), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1u64)), + shard_state: ShardState::Open as i32, + leader_id: "node-1".to_string(), + ..Default::default() + }]; + table.merge_from_shards( + IndexUid::for_test("test-index", 0), + "test-source".into(), + shards, + ); + assert!(table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new())); + } + #[test] fn test_pick_node_prefers_same_az() { let mut table = RoutingTable::new(Some("az-1".to_string()));