diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 7ced57bb0a8..47da42daaa1 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -25,8 +25,8 @@ use futures::stream::FuturesUnordered; use futures::{Future, StreamExt}; use itertools::Itertools; use quickwit_actors::{ - Actor, ActorContext, ActorExitStatus, ActorHandle, DeferableReplyHandler, Handler, Mailbox, - Supervisor, Universe, WeakMailbox, + Actor, ActorContext, ActorExitStatus, ActorHandle, DeferableReplyHandler, Handler, Healthz, + Mailbox, Supervisor, Universe, WeakMailbox, }; use quickwit_cluster::{ClusterChange, ClusterChangeStream, ClusterChangeStreamFactory}; use quickwit_common::pretty::PrettyDisplay; @@ -441,6 +441,19 @@ impl ControlPlane { } } +#[async_trait] +impl Handler for ControlPlane { + type Reply = bool; + + async fn handle( + &mut self, + _message: Healthz, + _ctx: &ActorContext, + ) -> Result { + Ok(true) + } +} + #[async_trait] impl Handler for ControlPlane { type Reply = (); diff --git a/quickwit/quickwit-serve/src/health_check_api/handler.rs b/quickwit/quickwit-serve/src/health_check_api/handler.rs index b2adf83befb..607ceb9f99a 100644 --- a/quickwit/quickwit-serve/src/health_check_api/handler.rs +++ b/quickwit/quickwit-serve/src/health_check_api/handler.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use quickwit_actors::{Healthz, Mailbox}; -use quickwit_cluster::Cluster; -use quickwit_indexing::IndexingService; -use quickwit_janitor::JanitorService; +use std::sync::Arc; + +use quickwit_actors::Healthz; +use quickwit_ingest::try_get_ingester_status; +use quickwit_proto::ingest::ingester::{IngesterService, IngesterStatus}; +use quickwit_proto::metastore::MetastoreService; use tracing::error; use warp::hyper::StatusCode; use warp::reply::with_status; use warp::{Filter, Rejection}; use crate::rest::recover_fn; -use crate::with_arg; +use crate::{QuickwitServices, with_arg}; #[derive(utoipa::OpenApi)] #[openapi(paths(get_liveness, get_readiness))] @@ -30,31 +32,27 @@ pub struct HealthCheckApi; /// Health check handlers. pub(crate) fn health_check_handlers( - cluster: Cluster, - indexer_service_opt: Option>, - janitor_service_opt: Option>, + quickwit_services: Arc, ) -> impl Filter + Clone { - liveness_handler(indexer_service_opt, janitor_service_opt).or(readiness_handler(cluster)) + liveness_handler(quickwit_services.clone()).or(readiness_handler(quickwit_services)) } fn liveness_handler( - indexer_service_opt: Option>, - janitor_service_opt: Option>, + quickwit_services: Arc, ) -> impl Filter + Clone { warp::path!("health" / "livez") .and(warp::get()) - .and(with_arg(indexer_service_opt)) - .and(with_arg(janitor_service_opt)) + .and(with_arg(quickwit_services)) .then(get_liveness) .recover(recover_fn) } fn readiness_handler( - cluster: Cluster, + quickwit_services: Arc, ) -> impl Filter + Clone { warp::path!("health" / "readyz") .and(warp::get()) - .and(with_arg(cluster)) + .and(with_arg(quickwit_services)) .then(get_readiness) .recover(recover_fn) } @@ -69,24 +67,37 @@ fn readiness_handler( ), )] /// Get Node Liveliness -async fn get_liveness( - indexer_service_opt: Option>, - janitor_service_opt: Option>, -) -> impl warp::Reply { +async fn get_liveness(quickwit_services: Arc) -> impl warp::Reply { let mut is_live = true; - if let Some(indexer_service) = indexer_service_opt + if let Some(indexer_service) = &quickwit_services.indexing_service_opt && !indexer_service.ask(Healthz).await.unwrap_or(false) { error!("indexer service is unhealthy"); is_live = false; } - if let Some(janitor_service) = janitor_service_opt + if let Some(janitor_service) = &quickwit_services.janitor_service_opt && !janitor_service.ask(Healthz).await.unwrap_or(false) { error!("janitor service is unhealthy"); is_live = false; } + if let Some(control_plane_service) = &quickwit_services.control_plane_server_opt + && !control_plane_service.ask(Healthz).await.unwrap_or(false) + { + error!("control plane service is unhealthy"); + is_live = false; + } + if !ingester_is_live(&quickwit_services.ingester_opt).await { + error!("ingester service is unhealthy"); + is_live = false; + } + if let Some(metastore_server) = &quickwit_services.metastore_server_opt + && let Err(error) = metastore_server.check_connectivity().await + { + error!(%error, "metastore server is unhealthy"); + is_live = false; + } let status_code = if is_live { StatusCode::OK } else { @@ -95,6 +106,22 @@ async fn get_liveness( with_status(warp::reply::json(&is_live), status_code) } +/// Returns whether the ingester is live, or `true` if no ingester runs on this node. An ingester +/// is considered dead only when it reports a `Failed` status or its status cannot be retrieved. +async fn ingester_is_live(ingester_opt: &Option) -> bool { + let Some(ingester) = ingester_opt else { + return true; + }; + match try_get_ingester_status(ingester).await { + Ok(IngesterStatus::Failed) => false, + Ok(_) => true, + Err(error) => { + error!(%error, "failed to get ingester status"); + false + } + } +} + #[utoipa::path( get, tag = "Node Health", @@ -105,8 +132,8 @@ async fn get_liveness( ), )] /// Get Node Readiness -async fn get_readiness(cluster: Cluster) -> impl warp::Reply { - let is_ready = cluster.is_self_node_ready().await; +async fn get_readiness(quickwit_services: Arc) -> impl warp::Reply { + let is_ready = quickwit_services.cluster.is_self_node_ready().await; let status_code = if is_ready { StatusCode::OK } else { @@ -117,30 +144,189 @@ async fn get_readiness(cluster: Cluster) -> impl warp::Reply { #[cfg(test)] mod tests { + use std::sync::Arc; - use quickwit_cluster::{ChitchatTransport, create_cluster_for_test}; + use quickwit_actors::{Actor, Mailbox, Universe}; + use quickwit_cluster::{ChitchatTransport, Cluster, create_cluster_for_test}; + use quickwit_common::ServiceStream; + use quickwit_config::NodeConfig; + use quickwit_control_plane::control_plane::ControlPlane; + use quickwit_index_management::IndexService; + use quickwit_indexing::IndexingService; + use quickwit_ingest::{IngestApiService, IngestServiceClient}; + use quickwit_janitor::JanitorService; + use quickwit_proto::control_plane::ControlPlaneServiceClient; + use quickwit_proto::ingest::ingester::{ + IngesterServiceClient, IngesterStatus, MockIngesterService, ObservationMessage, + }; + use quickwit_proto::ingest::router::IngestRouterServiceClient; + use quickwit_proto::metastore::{MetastoreServiceClient, MockMetastoreService}; + use quickwit_search::MockSearchService; + use quickwit_storage::StorageResolver; - #[tokio::test] - async fn test_rest_search_api_health_checks() { + use super::{health_check_handlers, ingester_is_live}; + use crate::QuickwitServices; + + async fn test_cluster() -> Cluster { let transport = ChitchatTransport::default(); - let cluster = create_cluster_for_test(Vec::new(), &[], &transport, false) + create_cluster_for_test(Vec::new(), &[], &transport, false) .await - .unwrap(); - let health_check_handler = super::health_check_handlers(cluster.clone(), None, None); - let resp = warp::test::request() + .unwrap() + } + + /// Builds a `QuickwitServices` in which every service is healthy or absent. + fn healthy_services(cluster: Cluster) -> QuickwitServices { + let metastore_client = MetastoreServiceClient::mocked(); + let index_manager = + IndexService::new(metastore_client.clone(), StorageResolver::unconfigured()); + let universe = Universe::new(); + let (ingest_service_mailbox, _ingest_inbox) = + universe.create_test_mailbox::(); + QuickwitServices { + _report_splits_subscription_handle_opt: None, + _local_shards_update_listener_handle_opt: None, + cluster, + control_plane_server_opt: None, + control_plane_client: ControlPlaneServiceClient::mocked(), + indexing_service_opt: None, + index_manager, + ingest_service: IngestServiceClient::from_mailbox(ingest_service_mailbox), + ingest_router_opt: None, + ingest_router_service: IngestRouterServiceClient::mocked(), + ingester_opt: None, + janitor_service_opt: None, + otlp_logs_service_opt: None, + otlp_traces_service_opt: None, + metastore_client, + metastore_server_opt: None, + node_config: Arc::new(NodeConfig::for_test()), + search_service: Arc::new(MockSearchService::new()), + jaeger_service_opt: None, + env_filter_reload_fn: crate::do_nothing_env_filter_reload_fn(), + #[cfg(feature = "datafusion")] + datafusion_session_builder: None, + } + } + + /// A mailbox whose actor never runs and whose inbox is dropped, so `ask` always fails. + fn dead_mailbox() -> Mailbox { + let universe = Universe::new(); + let (mailbox, inbox) = universe.create_test_mailbox::(); + drop(inbox); + mailbox + } + + async fn livez_status(services: QuickwitServices) -> u16 { + let handler = health_check_handlers(Arc::new(services)); + warp::test::request() .path("/health/livez") - .reply(&health_check_handler) - .await; - assert_eq!(resp.status(), 200); + .reply(&handler) + .await + .status() + .as_u16() + } + + /// Builds an ingester whose observation stream reports a single `status` message. + fn ingester_reporting(status: IngesterStatus) -> IngesterServiceClient { + let mut mock_ingester = MockIngesterService::new(); + mock_ingester + .expect_open_observation_stream() + .returning(move |_| { + let (observation_tx, observation_stream) = ServiceStream::new_bounded(1); + let message = ObservationMessage { + node_id: "test-ingester".to_string(), + status: status as i32, + }; + observation_tx.try_send(Ok(message)).unwrap(); + Ok(observation_stream) + }); + IngesterServiceClient::from_mock(mock_ingester) + } + + #[tokio::test] + async fn livez_succeeds_when_all_services_healthy() { + let services = healthy_services(test_cluster().await); + assert_eq!(livez_status(services).await, 200); + } + + #[tokio::test] + async fn livez_fails_when_indexer_unhealthy() { + let mut services = healthy_services(test_cluster().await); + services.indexing_service_opt = Some(dead_mailbox::()); + assert_eq!(livez_status(services).await, 503); + } + + #[tokio::test] + async fn livez_fails_when_janitor_unhealthy() { + let mut services = healthy_services(test_cluster().await); + services.janitor_service_opt = Some(dead_mailbox::()); + assert_eq!(livez_status(services).await, 503); + } + + #[tokio::test] + async fn livez_fails_when_control_plane_unhealthy() { + let mut services = healthy_services(test_cluster().await); + services.control_plane_server_opt = Some(dead_mailbox::()); + assert_eq!(livez_status(services).await, 503); + } + + #[tokio::test] + async fn livez_fails_when_metastore_unavailable() { + let mut services = healthy_services(test_cluster().await); + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore + .expect_check_connectivity() + .returning(|| Err(anyhow::anyhow!("metastore unreachable"))); + services.metastore_server_opt = Some(MetastoreServiceClient::from_mock(mock_metastore)); + assert_eq!(livez_status(services).await, 503); + } + + #[tokio::test] + async fn ingester_is_live_when_status_ready() { + let ingester = ingester_reporting(IngesterStatus::Ready); + assert!(ingester_is_live(&Some(ingester)).await); + } + + #[tokio::test] + async fn ingester_is_live_when_absent() { + let ingester_opt: Option = None; + assert!(ingester_is_live(&ingester_opt).await); + } + + #[tokio::test] + async fn ingester_is_not_live_when_status_failed() { + let ingester = ingester_reporting(IngesterStatus::Failed); + assert!(!ingester_is_live(&Some(ingester)).await); + } + + #[tokio::test] + async fn ingester_is_not_live_when_status_unavailable() { + let mut mock_ingester = MockIngesterService::new(); + mock_ingester + .expect_open_observation_stream() + .returning(|_| { + // dropping the sender ends the stream before any status is produced + let (_observation_tx, observation_stream) = ServiceStream::new_bounded(1); + Ok(observation_stream) + }); + let ingester = IngesterServiceClient::from_mock(mock_ingester); + assert!(!ingester_is_live(&Some(ingester)).await); + } + + #[tokio::test] + async fn readyz_reflects_cluster_readiness() { + let cluster = test_cluster().await; + let services = Arc::new(healthy_services(cluster.clone())); + let handler = health_check_handlers(services); let resp = warp::test::request() .path("/health/readyz") - .reply(&health_check_handler) + .reply(&handler) .await; assert_eq!(resp.status(), 503); cluster.set_self_node_readiness(true).await; let resp = warp::test::request() .path("/health/readyz") - .reply(&health_check_handler) + .reply(&handler) .await; assert_eq!(resp.status(), 200); } diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 39bfb0e2580..7f59f1c963f 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -172,12 +172,7 @@ pub(crate) async fn start_rest_server( .boxed(); // `/health/*` routes. - let health_check_routes = health_check_handlers( - quickwit_services.cluster.clone(), - quickwit_services.indexing_service_opt.clone(), - quickwit_services.janitor_service_opt.clone(), - ) - .boxed(); + let health_check_routes = health_check_handlers(quickwit_services.clone()).boxed(); // `/metrics` route. let metrics_routes = warp::path("metrics") @@ -259,13 +254,9 @@ pub(crate) async fn start_health_check_server( readiness_trigger: BoxFutureInfaillible<()>, shutdown_signal: BoxFutureInfaillible<()>, ) -> anyhow::Result<()> { - let health_check_routes = health_check_handlers( - quickwit_services.cluster.clone(), - quickwit_services.indexing_service_opt.clone(), - quickwit_services.janitor_service_opt.clone(), - ) - .recover(recover_fn_final) - .boxed(); + let health_check_routes = health_check_handlers(quickwit_services.clone()) + .recover(recover_fn_final) + .boxed(); // No TLS: the whole point of this server is to offer a plaintext probe surface that bypasses // the mTLS configured on the main REST server. serve_warp_routes(