Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,12 +672,17 @@ mod tests {

{
let mut state_guard = router.state.lock().await;
state_guard.routing_table.apply_capacity_update(
"test-ingester-0".into(),
state_guard.routing_table.merge_from_shards(
IndexUid::for_test("test-index-0", 0),
"test-source".to_string(),
8,
1,
vec![Shard {
index_uid: Some(IndexUid::for_test("test-index-0", 0)),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1u64)),
shard_state: ShardState::Open as i32,
leader_id: "test-ingester-0".to_string(),
..Default::default()
}],
);
}

Expand Down
80 changes: 66 additions & 14 deletions quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ pub(super) struct IngesterNode {
#[derive(Debug, Default)]
pub(super) struct RoutingEntry {
pub nodes: HashMap<NodeId, IngesterNode>,
/// Whether this entry has been seeded from a control plane response. During a rolling
/// deployment, Chitchat broadcasts from already-upgraded nodes may populate the table
/// before the router ever asks the CP, causing it to miss old-version nodes. This flag
/// ensures the router asks the CP at least once per (index, source) pair.
seeded_from_cp: bool,
}

/// Given a slice of candidates, picks the better of two random choices.
Expand Down Expand Up @@ -180,6 +185,10 @@ impl RoutingTable {
let Some(entry) = self.table.get(&key) else {
return false;
};
// Routers must sync with the control plane at least once per (index, source).
if !entry.seeded_from_cp {
return false;
}
entry.nodes.values().any(|node| {
node.capacity_score > 0
&& node.open_shard_count > 0
Expand Down Expand Up @@ -250,6 +259,7 @@ impl RoutingTable {
open_shard_count,
});
}
entry.seeded_from_cp = true;
}
}

Expand Down Expand Up @@ -327,18 +337,33 @@ mod tests {
fn test_has_open_nodes() {
let mut table = RoutingTable::default();
let pool = IngesterPool::default();
let index_uid = IndexUid::for_test("test-index", 0);

// Empty table.
assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new()));

// Seed from CP so has_open_nodes can return true.
let shards = vec![
Shard {
index_uid: Some(index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1u64)),
shard_state: ShardState::Open as i32,
leader_id: "node-1".to_string(),
..Default::default()
},
Shard {
index_uid: Some(index_uid.clone()),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(2u64)),
shard_state: ShardState::Open as i32,
leader_id: "node-2".to_string(),
..Default::default()
},
];
table.merge_from_shards(index_uid.clone(), "test-source".into(), shards);

// Node exists but is not in pool.
table.apply_capacity_update(
"node-1".into(),
IndexUid::for_test("test-index", 0),
"test-source".into(),
8,
3,
);
assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new()));

// Node is in pool → true.
Expand All @@ -350,13 +375,6 @@ mod tests {
assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable));

// Second node available → true despite first being unavailable.
table.apply_capacity_update(
"node-2".into(),
IndexUid::for_test("test-index", 0),
"test-source".into(),
6,
2,
);
pool.insert("node-2".into(), mocked_ingester(None));
assert!(table.has_open_nodes("test-index", "test-source", &pool, &unavailable));

Expand All @@ -371,6 +389,40 @@ mod tests {
assert!(!table.has_open_nodes("test-index", "test-source", &pool, &unavailable));
}

#[test]
fn test_has_open_nodes_requires_cp_seed() {
let mut table = RoutingTable::default();
let pool = IngesterPool::default();
pool.insert("node-1".into(), mocked_ingester(None));

// Chitchat broadcast populates the entry, but has_open_nodes still returns false
// because the entry hasn't been seeded from the control plane yet.
table.apply_capacity_update(
"node-1".into(),
IndexUid::for_test("test-index", 0),
"test-source".into(),
8,
3,
);
assert!(!table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new()));

// After merge_from_shards (CP response), has_open_nodes returns true.
let shards = vec![Shard {
index_uid: Some(IndexUid::for_test("test-index", 0)),
source_id: "test-source".to_string(),
shard_id: Some(ShardId::from(1u64)),
shard_state: ShardState::Open as i32,
leader_id: "node-1".to_string(),
..Default::default()
}];
table.merge_from_shards(
IndexUid::for_test("test-index", 0),
"test-source".into(),
shards,
);
assert!(table.has_open_nodes("test-index", "test-source", &pool, &HashSet::new()));
}

#[test]
fn test_pick_node_prefers_same_az() {
let mut table = RoutingTable::new(Some("az-1".to_string()));
Expand Down
Loading