diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 68d9ad60f96..61aa6959f3b 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7150,6 +7150,8 @@ version = "0.8.0" dependencies = [ "anyhow", "async-trait", + "itertools 0.14.0", + "once_cell", "quickwit-actors", "quickwit-common", "quickwit-config", @@ -7494,6 +7496,7 @@ dependencies = [ "once_cell", "quickwit-actors", "quickwit-common", + "quickwit-compaction", "quickwit-config", "quickwit-doc-mapper", "quickwit-index-management", diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 4a1f9ce036e..0d7d5f7c91a 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -150,7 +150,7 @@ mod tests { }; use quickwit_cli::split::{DescribeSplitArgs, SplitCliCommand}; use quickwit_cli::tool::{ - ExtractSplitArgs, GarbageCollectIndexArgs, LocalIngestDocsArgs, LocalSearchArgs, MergeArgs, + ExtractSplitArgs, GarbageCollectIndexArgs, LocalIngestDocsArgs, LocalSearchArgs, ToolCliCommand, }; use quickwit_common::uri::Uri; @@ -740,31 +740,6 @@ mod tests { Ok(()) } - #[test] - fn test_parse_merge_args() -> anyhow::Result<()> { - let app = build_cli().no_binary_name(true); - let matches = app.try_get_matches_from([ - "tool", - "merge", - "--index", - "wikipedia", - "--source", - "ingest-source", - "--config", - "/config.yaml", - ])?; - let command = CliCommand::parse_cli_args(matches)?; - assert!(matches!( - command, - CliCommand::Tool(ToolCliCommand::Merge(MergeArgs { - index_id, - source_id, - .. - })) if &index_id == "wikipedia" && source_id == "ingest-source" - )); - Ok(()) - } - #[test] fn test_parse_no_color() { // SAFETY: this test may not be entirely sound if not run with nextest or --test-threads=1 diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index 04843d7e801..557d61921e0 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -17,6 +17,7 @@ use std::io::{IsTerminal, Stdout, Write, stdout}; use std::num::NonZeroUsize; use std::path::PathBuf; use std::str::FromStr; +use std::sync::Arc; use std::time::{Duration, Instant}; use std::{env, fmt, io}; @@ -24,7 +25,7 @@ use anyhow::{Context, bail}; use clap::{ArgMatches, Command, arg}; use colored::{ColoredString, Colorize}; use humantime::format_duration; -use quickwit_actors::{ActorExitStatus, ActorHandle, Mailbox, Universe}; +use quickwit_actors::{ActorHandle, Universe}; use quickwit_cluster::{ ChannelTransport, Cluster, ClusterMember, FailureDetectorConfig, make_client_grpc_config, }; @@ -34,28 +35,26 @@ use quickwit_common::uri::Uri; use quickwit_config::service::QuickwitService; use quickwit_config::{ CLI_SOURCE_ID, IndexerConfig, NodeConfig, SourceConfig, SourceInputFormat, SourceParams, - TransformConfig, VecSourceParams, + TransformConfig, }; use quickwit_index_management::{IndexService, clear_cache_directory}; -use quickwit_indexing::IndexingPipeline; -use quickwit_indexing::actors::{IndexingService, MergePipeline, MergeSchedulerService}; -use quickwit_indexing::models::{ - DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline, -}; +use quickwit_indexing::actors::IndexingService; +use quickwit_indexing::models::{DetachIndexingPipeline, IndexingStatistics, SpawnPipeline}; +use quickwit_indexing::{IndexingPipeline, IndexingSplitCache}; use quickwit_ingest::IngesterPool; use quickwit_metastore::IndexMetadataResponseExt; use quickwit_proto::indexing::CpuCapacity; use quickwit_proto::ingest::ingester::IngesterStatus; use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient}; use quickwit_proto::search::{CountHits, SearchResponse}; -use quickwit_proto::types::{IndexId, PipelineUid, SourceId, SplitId}; +use quickwit_proto::types::{IndexId, PipelineUid, SplitId}; use quickwit_search::{SearchResponseRest, single_node_search}; use quickwit_serve::{ BodyFormat, SearchRequestQueryString, SortBy, search_request_from_api_request, }; use quickwit_storage::{BundleStorage, Storage}; use thousands::Separable; -use tracing::{debug, info}; +use tracing::debug; use crate::checklist::{GREEN_COLOR, RED_COLOR}; use crate::{ @@ -201,13 +200,6 @@ pub struct GarbageCollectIndexArgs { pub dry_run: bool, } -#[derive(Debug, Eq, PartialEq)] -pub struct MergeArgs { - pub config_uri: Uri, - pub index_id: IndexId, - pub source_id: SourceId, -} - #[derive(Debug, Eq, PartialEq)] pub struct ExtractSplitArgs { pub config_uri: Uri, @@ -221,7 +213,6 @@ pub enum ToolCliCommand { GarbageCollect(GarbageCollectIndexArgs), LocalIngest(LocalIngestDocsArgs), LocalSearch(LocalSearchArgs), - Merge(MergeArgs), ExtractSplit(ExtractSplitArgs), } @@ -234,7 +225,6 @@ impl ToolCliCommand { "gc" => Self::parse_garbage_collect_args(submatches), "local-ingest" => Self::parse_local_ingest_args(submatches), "local-search" => Self::parse_local_search_args(submatches), - "merge" => Self::parse_merge_args(submatches), "extract-split" => Self::parse_extract_split_args(submatches), _ => bail!("unknown tool subcommand `{subcommand}`"), } @@ -324,24 +314,6 @@ impl ToolCliCommand { })) } - fn parse_merge_args(mut matches: ArgMatches) -> anyhow::Result { - let config_uri = matches - .remove_one::("config") - .map(|uri_str| Uri::from_str(&uri_str)) - .expect("`config` should be a required arg.")?; - let index_id = matches - .remove_one::("index") - .expect("'index-id' should be a required arg."); - let source_id = matches - .remove_one::("source") - .expect("'source-id' should be a required arg."); - Ok(Self::Merge(MergeArgs { - index_id, - source_id, - config_uri, - })) - } - fn parse_garbage_collect_args(mut matches: ArgMatches) -> anyhow::Result { let config_uri = matches .get_one("config") @@ -391,7 +363,6 @@ impl ToolCliCommand { Self::GarbageCollect(args) => garbage_collect_index_cli(args).await, Self::LocalIngest(args) => local_ingest_docs_cli(args).await, Self::LocalSearch(args) => local_search_cli(args).await, - Self::Merge(args) => merge_cli(args).await, Self::ExtractSplit(args) => extract_split_cli(args).await, } } @@ -447,7 +418,8 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< &HashSet::from_iter([QuickwitService::Indexer]), )?; let universe = Universe::new(); - let merge_scheduler_service_mailbox = universe.get_or_spawn_one(); + let split_cache = + Arc::new(IndexingSplitCache::from_config(&indexer_config, &config.data_dir_path).await?); let indexing_server = IndexingService::new( config.node_id.clone(), config.data_dir_path.clone(), @@ -456,10 +428,10 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< cluster, metastore, None, - Some(merge_scheduler_service_mailbox), IngesterPool::default(), storage_resolver, EventBroker::default(), + split_cache, ) .await?; let (indexing_server_mailbox, indexing_server_handle) = @@ -471,11 +443,6 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< pipeline_uid: PipelineUid::random(), }) .await?; - let merge_pipeline_handle = indexing_server_mailbox - .ask_for_res(DetachMergePipeline { - pipeline_id: pipeline_id.merge_pipeline_id(), - }) - .await?; let indexing_pipeline_handle = indexing_server_mailbox .ask_for_res(DetachIndexingPipeline { pipeline_id }) .await?; @@ -493,11 +460,6 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< let statistics = start_statistics_reporting_loop(indexing_pipeline_handle, args.input_path_opt.is_none()) .await?; - merge_pipeline_handle - .mailbox() - .ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline) - .await?; - merge_pipeline_handle.join().await; // Shutdown the indexing server. universe .send_exit_with_success(&indexing_server_mailbox) @@ -565,92 +527,6 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> { Ok(()) } -pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> { - debug!(args=?args, "run-merge-operations"); - println!("❯ Merging splits locally..."); - let config = load_node_config(&args.config_uri).await?; - let (storage_resolver, metastore_resolver) = - get_resolvers(&config.storage_configs, &config.metastore_configs); - let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?; - run_index_checklist(&mut metastore, &storage_resolver, &args.index_id, None).await?; - // The indexing service needs to update its cluster chitchat state so that the control plane is - // aware of the running tasks. We thus create a fake cluster to instantiate the indexing service - // and avoid impacting potential control plane running on the cluster. - let cluster = create_empty_cluster(&config).await?; - let runtimes_config = RuntimesConfig::default(); - start_actor_runtimes( - runtimes_config, - &HashSet::from_iter([QuickwitService::Indexer]), - )?; - let indexer_config = IndexerConfig::default(); - let universe = Universe::new(); - let merge_scheduler_service: Mailbox = universe.get_or_spawn_one(); - let indexing_server = IndexingService::new( - config.node_id, - config.data_dir_path, - indexer_config, - runtimes_config.num_threads_blocking, - cluster, - metastore, - None, - Some(merge_scheduler_service), - IngesterPool::default(), - storage_resolver, - EventBroker::default(), - ) - .await?; - let (indexing_service_mailbox, indexing_service_handle) = - universe.spawn_builder().spawn(indexing_server); - let pipeline_id = indexing_service_mailbox - .ask_for_res(SpawnPipeline { - index_id: args.index_id, - source_config: SourceConfig { - source_id: args.source_id, - num_pipelines: NonZeroUsize::MIN, - enabled: true, - source_params: SourceParams::Vec(VecSourceParams::default()), - transform_config: None, - input_format: SourceInputFormat::Json, - }, - pipeline_uid: PipelineUid::random(), - }) - .await?; - let pipeline_handle: ActorHandle = indexing_service_mailbox - .ask_for_res(DetachMergePipeline { - pipeline_id: pipeline_id.merge_pipeline_id(), - }) - .await?; - - let mut check_interval = tokio::time::interval(Duration::from_secs(1)); - loop { - check_interval.tick().await; - - pipeline_handle.refresh_observe(); - let observation = pipeline_handle.last_observation(); - - if observation.num_ongoing_merges == 0 { - info!("merge pipeline has no more ongoing merges, exiting"); - break; - } - - if pipeline_handle.state().is_exit() { - info!("merge pipeline has exited, exiting"); - break; - } - } - - let (pipeline_exit_status, _pipeline_statistics) = pipeline_handle.quit().await; - indexing_service_handle.quit().await; - if !matches!( - pipeline_exit_status, - ActorExitStatus::Success | ActorExitStatus::Quit - ) { - bail!(pipeline_exit_status); - } - println!("{} Merge successful.", "✔".color(GREEN_COLOR)); - Ok(()) -} - pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow::Result<()> { debug!(args=?args, "garbage-collect-index"); println!("❯ Garbage collecting index..."); diff --git a/quickwit/quickwit-compaction/Cargo.toml b/quickwit/quickwit-compaction/Cargo.toml index c018327c8c4..556fe491a5b 100644 --- a/quickwit/quickwit-compaction/Cargo.toml +++ b/quickwit/quickwit-compaction/Cargo.toml @@ -26,6 +26,8 @@ time = { workspace = true } tracing = { workspace = true } tokio = { workspace = true } ulid = { workspace = true } +itertools = "0.14.0" +once_cell = "1.21.4" [dev-dependencies] quickwit-actors = { workspace = true, features = ["testsuite"] } diff --git a/quickwit/quickwit-compaction/src/compaction_pipeline.rs b/quickwit/quickwit-compaction/src/compaction_pipeline.rs index 021b3fa27c5..a35017384a0 100644 --- a/quickwit/quickwit-compaction/src/compaction_pipeline.rs +++ b/quickwit/quickwit-compaction/src/compaction_pipeline.rs @@ -13,8 +13,9 @@ // limitations under the License. use std::sync::Arc; +use std::time::Instant; -use quickwit_actors::{ActorHandle, Health, SpawnContext, Supervisable}; +use quickwit_actors::{ActorHandle, HEARTBEAT, Health, SpawnContext, Supervisable}; use quickwit_common::KillSwitch; use quickwit_common::io::{IoControls, Limiter}; use quickwit_common::pubsub::EventBroker; @@ -29,7 +30,10 @@ use quickwit_indexing::{IndexingSplitStore, PublisherType, SplitsUpdateMailbox}; use quickwit_proto::indexing::MergePipelineId; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::types::{IndexUid, SourceId, SplitId}; -use tracing::{debug, error, info}; +use tracing::{error, info}; + +use crate::metrics::COMPACTOR_METRICS; +use crate::{TaskId, source_uid_metrics_label}; #[derive(Clone, Debug, PartialEq)] pub enum PipelineStatus { @@ -38,12 +42,20 @@ pub enum PipelineStatus { Failed { error: String }, } +impl PipelineStatus { + pub fn is_terminal(&self) -> bool { + matches!( + self, + PipelineStatus::Completed | PipelineStatus::Failed { .. } + ) + } +} + pub struct PipelineStatusUpdate { - pub task_id: String, + pub task_id: TaskId, pub index_uid: IndexUid, pub source_id: SourceId, pub split_ids: Vec, - pub merged_split_id: SplitId, pub status: PipelineStatus, } @@ -53,6 +65,22 @@ struct CompactionPipelineHandles { merge_packager: ActorHandle, merge_uploader: ActorHandle, merge_publisher: ActorHandle, + next_check_for_progress: Instant, +} + +impl CompactionPipelineHandles { + /// Returns true once per HEARTBEAT interval. Between heartbeats we only + /// check whether actors are alive, not whether they are making progress. + /// This gives CPU-intensive actors (like the packager) up to 30s to + /// complete before being declared unhealthy. + fn should_check_for_progress(&mut self) -> bool { + let now = Instant::now(); + let check_for_progress = now > self.next_check_for_progress; + if check_for_progress { + self.next_check_for_progress = now + *HEARTBEAT; + } + check_for_progress + } } /// A single-use compaction pipeline. Processes one merge task and terminates. @@ -61,7 +89,7 @@ struct CompactionPipelineHandles { /// `check_actor_health()` to collect status updates. The pipeline manages /// its own retry logic internally. pub struct CompactionPipeline { - task_id: String, + task_id: TaskId, merge_operation: MergeOperation, pipeline_id: MergePipelineId, status: PipelineStatus, @@ -76,12 +104,13 @@ pub struct CompactionPipeline { io_throughput_limiter: Option, max_concurrent_split_uploads: usize, event_broker: EventBroker, + pipeline_start: Option, } impl CompactionPipeline { #[allow(clippy::too_many_arguments)] pub fn new( - task_id: String, + task_id: TaskId, scratch_directory: TempDirectory, merge_operation: MergeOperation, pipeline_id: MergePipelineId, @@ -110,6 +139,7 @@ impl CompactionPipeline { io_throughput_limiter, max_concurrent_split_uploads, event_broker, + pipeline_start: None, } } @@ -149,16 +179,21 @@ impl CompactionPipeline { ) { return; } - // Pipeline is not initialized yet. - if self.handles.is_none() { + let Some(handles) = self.handles.as_mut() else { return; - } + }; + + // We check whether actors are alive on every tick (1s), but only + // check whether they are making progress once per HEARTBEAT (30s). + // This gives CPU-intensive actors like the packager time to finish + // without being falsely declared unhealthy. + let check_for_progress = handles.should_check_for_progress(); let mut has_healthy = false; let mut failure_actor_names: Vec = Vec::new(); for supervisable in self.supervisables() { - match supervisable.check_health(true) { + match supervisable.check_health(check_for_progress) { Health::Healthy => { has_healthy = true; } @@ -170,14 +205,55 @@ impl CompactionPipeline { } if !failure_actor_names.is_empty() { + self.record_pipeline_duration(); let error_msg = format!("failed actors: {:?}", failure_actor_names); error!(task_id=%self.task_id, "{error_msg}"); self.status = PipelineStatus::Failed { error: error_msg }; + self.record_terminal_metrics(false); return; } if !has_healthy { - debug!(task_id=%self.task_id, "all compaction pipeline actors completed"); + self.record_pipeline_duration(); + info!(task_id=%self.task_id, "all compaction pipeline actors completed"); self.status = PipelineStatus::Completed; + self.record_terminal_metrics(true); + } + } + + /// Fires once when the pipeline transitions from InProgress to a terminal state. + /// Pairs with the `compactions_in_progress.inc()` in `CompactorSupervisor::spawn_task`. + fn record_terminal_metrics(&self, succeeded: bool) { + let merge_level = self.merge_operation.merge_level(); + let index_label = + source_uid_metrics_label(&self.pipeline_id.index_uid, &self.pipeline_id.source_id); + let label_values = [index_label.as_str(), &merge_level.to_string()]; + COMPACTOR_METRICS + .compactions_in_progress + .with_label_values(label_values) + .dec(); + if succeeded { + COMPACTOR_METRICS + .compactions_succeeded + .with_label_values(label_values) + .inc(); + } else { + COMPACTOR_METRICS + .compactions_failed + .with_label_values(label_values) + .inc(); + } + } + + fn record_pipeline_duration(&self) { + if let Some(pipeline_start_time) = self.pipeline_start { + let elapsed = pipeline_start_time.elapsed().as_secs_f64(); + let merge_level = self.merge_operation.merge_level(); + let index_label = + source_uid_metrics_label(&self.pipeline_id.index_uid, &self.pipeline_id.source_id); + COMPACTOR_METRICS + .compaction_duration + .with_label_values([index_label.as_str(), &merge_level.to_string()]) + .observe(elapsed); } } @@ -192,7 +268,6 @@ impl CompactionPipeline { .iter() .map(|split| split.split_id().to_string()) .collect(), - merged_split_id: self.merge_operation.merge_split_id.clone(), status: self.status.clone(), } } @@ -273,6 +348,8 @@ impl CompactionPipeline { .set_kill_switch(self.kill_switch.child()) .spawn(merge_split_downloader); + let now = Instant::now(); + self.pipeline_start = Some(now); // Kick off the pipeline. merge_split_downloader_mailbox .try_send_message(self.merge_operation.clone()) @@ -286,6 +363,7 @@ impl CompactionPipeline { merge_packager: merge_packager_handle, merge_uploader: merge_uploader_handle, merge_publisher: merge_publisher_handle, + next_check_for_progress: Instant::now() + *HEARTBEAT, }); Ok(()) diff --git a/quickwit/quickwit-compaction/src/compactor_supervisor.rs b/quickwit/quickwit-compaction/src/compactor_supervisor.rs index 292c861972c..52da22c4951 100644 --- a/quickwit/quickwit-compaction/src/compactor_supervisor.rs +++ b/quickwit/quickwit-compaction/src/compactor_supervisor.rs @@ -35,9 +35,11 @@ use quickwit_proto::indexing::MergePipelineId; use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_proto::types::NodeId; use quickwit_storage::StorageResolver; -use tracing::{error, info, warn}; +use tracing::{error, info}; use crate::compaction_pipeline::{CompactionPipeline, PipelineStatus, PipelineStatusUpdate}; +use crate::metrics::COMPACTOR_METRICS; +use crate::source_uid_metrics_label; const CHECK_PIPELINE_STATUSES_INTERVAL: Duration = Duration::from_secs(1); @@ -55,9 +57,9 @@ pub struct CompactorSupervisor { // Shared resources distributed to pipelines when spawning actor chains. io_throughput_limiter: Option, - split_cache: Arc, metastore: MetastoreServiceClient, storage_resolver: StorageResolver, + split_cache: Arc, max_concurrent_split_uploads: usize, event_broker: EventBroker, @@ -72,9 +74,9 @@ impl CompactorSupervisor { planner_client: CompactionPlannerServiceClient, num_pipeline_slots: usize, io_throughput_limiter: Option, - split_cache: Arc, metastore: MetastoreServiceClient, storage_resolver: StorageResolver, + split_cache: Arc, max_concurrent_split_uploads: usize, event_broker: EventBroker, compaction_root_directory: TempDirectory, @@ -85,9 +87,9 @@ impl CompactorSupervisor { planner_client, pipelines, io_throughput_limiter, - split_cache, metastore, storage_resolver, + split_cache, max_concurrent_split_uploads, event_broker, compaction_root_directory, @@ -110,10 +112,16 @@ impl CompactorSupervisor { assignments: Vec, spawn_ctx: &SpawnContext, ) { + // The planner has acknowledged the statuses we just sent — drop any + // Completed/Failed pipelines so their scratch directories and actor + // handles are released. + for slot in &mut self.pipelines { + slot.take_if(|p| p.status().is_terminal()); + } for assignment in assignments { let task_id = assignment.task_id.clone(); - if let Err(err) = self.spawn_task(assignment, spawn_ctx).await { - error!(task_id=%task_id, error=%err, "failed to spawn compaction task"); + if let Err(error) = self.spawn_task(assignment, spawn_ctx).await { + error!(%task_id, %error, "failed to spawn compaction task"); } } } @@ -123,16 +131,17 @@ impl CompactorSupervisor { assignment: MergeTaskAssignment, spawn_ctx: &SpawnContext, ) -> anyhow::Result<()> { + let source_uid_label = source_uid_metrics_label( + assignment.index_uid.as_ref().unwrap(), + &assignment.source_id, + ); + let merge_level = assignment.merge_level.to_string(); + let label_values = [source_uid_label.as_str(), merge_level.as_str()]; + let slot_idx = self .pipelines .iter() - .position(|slot| match slot { - None => true, - Some(p) => matches!( - p.status(), - PipelineStatus::Completed | PipelineStatus::Failed { .. } - ), - }) + .position(Option::is_none) .ok_or_else(|| anyhow::anyhow!("no free pipeline slot"))?; let scratch_directory = self .compaction_root_directory @@ -142,6 +151,10 @@ impl CompactorSupervisor { .await?; pipeline.spawn_pipeline(spawn_ctx)?; self.pipelines[slot_idx] = Some(pipeline); + COMPACTOR_METRICS + .compactions_in_progress + .with_label_values(label_values) + .inc(); Ok(()) } @@ -207,7 +220,8 @@ impl CompactorSupervisor { .iter() .filter(|s| matches!(s.status, PipelineStatus::InProgress)) .count(); - let available_slots = (self.pipelines.len() - in_progress_count) as u32; + let available_slots = (self.pipelines.len() - in_progress_count) as i64; + COMPACTOR_METRICS.available_slots.set(available_slots); let mut in_progress = Vec::new(); let mut successes = Vec::new(); @@ -226,7 +240,6 @@ impl CompactorSupervisor { PipelineStatus::Completed => { successes.push(CompactionSuccess { task_id: update.task_id.clone(), - merged_split_id: update.merged_split_id.clone(), }); } PipelineStatus::Failed { error } => { @@ -240,7 +253,7 @@ impl CompactorSupervisor { ReportStatusRequest { node_id: self.node_id.to_string(), - available_slots, + available_slots: available_slots as u32, in_progress, successes, failures, @@ -279,13 +292,16 @@ impl Handler for CompactorSupervisor { ) -> Result<(), ActorExitStatus> { let statuses = self.check_pipeline_statuses(); let request = self.build_report_status_request(&statuses); - match self.planner_client.report_status(request).await { + match ctx + .protect_future(self.planner_client.report_status(request)) + .await + { Ok(response) => { - self.process_new_tasks(response.new_tasks, ctx.spawn_ctx()) + ctx.protect_future(self.process_new_tasks(response.new_tasks, ctx.spawn_ctx())) .await; } - Err(err) => { - warn!(error=%err, "failed to report status to compaction planner"); + Err(error) => { + error!(%error, "failed to report status to compaction planner"); } } ctx.schedule_self_msg(CHECK_PIPELINE_STATUSES_INTERVAL, CheckPipelineStatuses); @@ -295,8 +311,6 @@ impl Handler for CompactorSupervisor { #[cfg(test)] mod tests { - use std::sync::Arc; - use quickwit_actors::Universe; use quickwit_common::temp_dir::TempDirectory; use quickwit_proto::compaction::{ @@ -318,9 +332,9 @@ mod tests { compaction_client, num_slots, None, - Arc::new(IndexingSplitCache::no_caching()), metastore, StorageResolver::for_test(), + Arc::new(IndexingSplitCache::no_caching()), 2, EventBroker::default(), TempDirectory::for_test(), @@ -394,9 +408,9 @@ mod tests { let index_metadata = quickwit_metastore::IndexMetadata::for_test("test-index", "ram:///test-index"); let config = &index_metadata.index_config; - let splits: Vec = split_ids + let splits: Vec = split_ids .iter() - .map(|id| quickwit_metastore::SplitMetadata::for_test(id.to_string())) + .map(|id| SplitMetadata::for_test(id.to_string())) .collect(); MergeTaskAssignment { task_id: task_id.to_string(), @@ -411,6 +425,7 @@ mod tests { index_uid: Some(index_metadata.index_uid.clone()), source_id: "test-source".to_string(), index_storage_uri: config.index_uri.to_string(), + merge_level: 1, } } @@ -513,7 +528,7 @@ mod tests { let assignment = test_assignment("planner-task-1", &["s1", "s2"]); let assignments = vec![assignment]; let assignments_clone = assignments.clone(); - let mut mock = quickwit_proto::compaction::MockCompactionPlannerService::new(); + let mut mock = MockCompactionPlannerService::new(); mock.expect_report_status().times(1).returning(move |_req| { Ok(quickwit_proto::compaction::ReportStatusResponse { new_tasks: assignments_clone.clone(), @@ -527,9 +542,9 @@ mod tests { client, 3, None, - Arc::new(IndexingSplitCache::no_caching()), metastore, StorageResolver::for_test(), + Arc::new(IndexingSplitCache::no_caching()), 2, EventBroker::default(), TempDirectory::for_test(), @@ -569,7 +584,6 @@ mod tests { index_uid: quickwit_proto::types::IndexUid::for_test("test-index", 0), source_id: "src".to_string(), split_ids: vec!["s1".to_string(), "s2".to_string()], - merged_split_id: "merged-1".to_string(), status: PipelineStatus::InProgress, }, PipelineStatusUpdate { @@ -577,7 +591,6 @@ mod tests { index_uid: quickwit_proto::types::IndexUid::for_test("test-index", 0), source_id: "src".to_string(), split_ids: vec!["s3".to_string()], - merged_split_id: "merged-2".to_string(), status: PipelineStatus::Completed, }, PipelineStatusUpdate { @@ -585,7 +598,6 @@ mod tests { index_uid: quickwit_proto::types::IndexUid::for_test("test-index", 0), source_id: "src".to_string(), split_ids: vec!["s4".to_string()], - merged_split_id: "merged-3".to_string(), status: PipelineStatus::Failed { error: "boom".to_string(), }, @@ -600,7 +612,6 @@ mod tests { assert_eq!(request.in_progress[0].split_ids, vec!["s1", "s2"]); assert_eq!(request.successes.len(), 1); assert_eq!(request.successes[0].task_id, "task-2"); - assert_eq!(request.successes[0].merged_split_id, "merged-2"); assert_eq!(request.failures.len(), 1); assert_eq!(request.failures[0].task_id, "task-3"); assert_eq!(request.failures[0].error_message, "boom"); diff --git a/quickwit/quickwit-compaction/src/lib.rs b/quickwit/quickwit-compaction/src/lib.rs index 98e5fc9a3d1..5307f267932 100644 --- a/quickwit/quickwit-compaction/src/lib.rs +++ b/quickwit/quickwit-compaction/src/lib.rs @@ -14,12 +14,13 @@ #![deny(clippy::disallowed_methods)] -#[allow(dead_code)] mod compaction_pipeline; -#[allow(dead_code)] mod compactor_supervisor; +mod metrics; pub mod planner; +pub type TaskId = String; + use std::sync::Arc; pub use compactor_supervisor::CompactorSupervisor; @@ -31,33 +32,36 @@ use quickwit_config::CompactorConfig; use quickwit_indexing::IndexingSplitCache; use quickwit_proto::compaction::CompactionPlannerServiceClient; use quickwit_proto::metastore::MetastoreServiceClient; -use quickwit_proto::types::NodeId; +use quickwit_proto::types::{IndexUid, NodeId, SourceId}; use quickwit_storage::StorageResolver; use tracing::info; +pub fn source_uid_metrics_label(index_uid: &IndexUid, source_id: &SourceId) -> String { + format!("{}-{}", index_uid, source_id) +} + #[allow(clippy::too_many_arguments)] pub async fn start_compactor_service( universe: &Universe, node_id: NodeId, compaction_client: CompactionPlannerServiceClient, compactor_config: &CompactorConfig, - split_cache: Arc, metastore: MetastoreServiceClient, storage_resolver: StorageResolver, + split_cache: Arc, event_broker: EventBroker, compaction_root_directory: TempDirectory, ) -> anyhow::Result> { info!("starting compactor service"); - // TODO: configure this for real let io_throughput_limiter = compactor_config.max_merge_write_throughput.map(io::limiter); let supervisor = CompactorSupervisor::new( node_id, compaction_client, compactor_config.max_concurrent_pipelines.get(), io_throughput_limiter, - split_cache, metastore, storage_resolver, + split_cache, compactor_config.max_concurrent_split_uploads, event_broker, compaction_root_directory, diff --git a/quickwit/quickwit-compaction/src/metrics.rs b/quickwit/quickwit-compaction/src/metrics.rs new file mode 100644 index 00000000000..1d6bb234065 --- /dev/null +++ b/quickwit/quickwit-compaction/src/metrics.rs @@ -0,0 +1,75 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use once_cell::sync::Lazy; +use quickwit_common::metrics::{ + HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, exponential_buckets, new_counter_vec, + new_gauge, new_gauge_vec, new_histogram_vec, +}; + +pub struct CompactorMetrics { + pub compactions_in_progress: IntGaugeVec<2>, + pub compactions_failed: IntCounterVec<2>, + pub compactions_succeeded: IntCounterVec<2>, + pub available_slots: IntGauge, + pub compaction_duration: HistogramVec<2>, +} + +fn compaction_duration_buckets() -> Vec { + exponential_buckets(0.5, 2.0, 14).expect("compaction duration buckets should be valid") +} + +impl Default for CompactorMetrics { + fn default() -> Self { + CompactorMetrics { + compactions_in_progress: new_gauge_vec( + "compactions_in_progress", + "number of compaction merge operations currently running on this compactor", + "compactor", + &[], + ["source_uid", "merge_level"], + ), + compactions_failed: new_counter_vec( + "compactions_failed", + "total number of compaction merge operations that have failed", + "compactor", + &[], + ["source_uid", "merge_level"], + ), + compactions_succeeded: new_counter_vec( + "compactions_succeeded", + "total number of compaction merge operations that have completed successfully", + "compactor", + &[], + ["source_uid", "merge_level"], + ), + available_slots: new_gauge( + "available_slots", + "number of compaction slots currently available on this compactor", + "compactor", + &[], + ), + compaction_duration: new_histogram_vec( + "compaction_duration_seconds", + "duration of compaction merge operations in seconds", + "compactor", + &[], + ["source_uid", "merge_level"], + compaction_duration_buckets(), + ), + } + } +} + +pub static COMPACTOR_METRICS: Lazy = Lazy::new(CompactorMetrics::default); diff --git a/quickwit/quickwit-compaction/src/planner/compaction_planner.rs b/quickwit/quickwit-compaction/src/planner/compaction_planner.rs index 0fb6ccc6d50..ff46948e429 100644 --- a/quickwit/quickwit-compaction/src/planner/compaction_planner.rs +++ b/quickwit/quickwit-compaction/src/planner/compaction_planner.rs @@ -17,6 +17,7 @@ use std::time::Duration; use anyhow::Result; use async_trait::async_trait; +use itertools::Itertools; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler}; use quickwit_indexing::merge_policy::MergeOperation; use quickwit_metastore::{ @@ -26,29 +27,27 @@ use quickwit_proto::compaction::{ CompactionResult, MergeTaskAssignment, ReportStatusRequest, ReportStatusResponse, }; use quickwit_proto::metastore::{ListSplitsRequest, MetastoreService, MetastoreServiceClient}; -use quickwit_proto::types::{IndexUid, NodeId, SourceId}; +use quickwit_proto::types::NodeId; use time::OffsetDateTime; -use tracing::error; +use tracing::{error, info}; use ulid::Ulid; use super::compaction_state::CompactionState; -use super::index_config_store::{IndexConfigStore, IndexEntry}; - +use super::index_config_metastore::{IndexConfigMetastore, IndexEntry}; +use crate::planner::metrics::COMPACTION_PLANNER_METRICS; + +/// Cap on splits fetched per tick. Every tick, the planner re-scans the immature published set, +/// sorted by `maturity_timestamp` ASC so the most-urgent splits are processed first when a backlog +/// exists. Splits beyond this cap aren't lost -- they bubble into range as the front of the queue +/// is merged off. +const SCAN_PAGE_SIZE: usize = 5_000; +#[derive(Debug)] pub struct CompactionPlanner { state: CompactionState, - index_config_store: IndexConfigStore, - cursor: i64, + index_config_metastore: IndexConfigMetastore, metastore: MetastoreServiceClient, } -impl Debug for CompactionPlanner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("CompactionPlanner") - .field("cursor", &self.cursor) - .finish() - } -} - const SCAN_AND_PLAN_INTERVAL: Duration = Duration::from_secs(5); /// On initialization, we want to wait for two intervals to allow any in-progress workers to report /// their progress, preventing us from frivolously rescheduling work. @@ -68,7 +67,7 @@ impl Actor for CompactionPlanner { fn observable_state(&self) -> Self::ObservableState {} async fn initialize(&mut self, ctx: &ActorContext) -> Result<(), ActorExitStatus> { - tracing::info!("compaction planner starting, scanning metastore for immature splits"); + info!("compaction planner starting, scanning metastore for immature splits"); ctx.schedule_self_msg(INITIAL_SCAN_AND_PLAN_INTERVAL, ScanAndPlan); Ok(()) } @@ -83,8 +82,8 @@ impl Handler for CompactionPlanner { _msg: ScanAndPlan, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - if let Err(err) = self.scan_and_plan().await { - error!(error=%err, "error scanning metastore and planning merges"); + if let Err(error) = ctx.protect_future(self.scan_and_plan()).await { + error!(%error, "error scanning metastore and planning merges"); } self.state.check_heartbeat_timeouts(); ctx.schedule_self_msg(SCAN_AND_PLAN_INTERVAL, ScanAndPlan); @@ -110,26 +109,22 @@ impl Handler for CompactionPlanner { } } -const STARTUP_LOOKBACK: Duration = Duration::from_secs(24 * 60 * 60); - impl CompactionPlanner { pub fn new(metastore: MetastoreServiceClient) -> Self { - let cursor = OffsetDateTime::now_utc().unix_timestamp() - STARTUP_LOOKBACK.as_secs() as i64; CompactionPlanner { state: CompactionState::new(), - index_config_store: IndexConfigStore::new(metastore.clone()), - cursor, + index_config_metastore: IndexConfigMetastore::new(metastore.clone()), metastore, } } async fn ingest_splits(&mut self, splits: Vec) { for split in splits { - if self.state.is_split_known(&split.split_metadata.split_id) { + if self.state.is_split_tracked(&split.split_metadata.split_id) { continue; } let Ok(index_entry) = self - .index_config_store + .index_config_metastore .get_for_split(&split.split_metadata) .await else { @@ -139,7 +134,6 @@ impl CompactionPlanner { if index_entry.is_split_mature(&split.split_metadata) { continue; } - self.cursor = self.cursor.max(split.update_timestamp); self.state.track_split(split.split_metadata); } } @@ -148,14 +142,30 @@ impl CompactionPlanner { let query = ListSplitsQuery::for_all_indexes() .with_split_state(SplitState::Published) .retain_immature(OffsetDateTime::now_utc()) - .with_update_timestamp_gte(self.cursor); + .sort_by_maturity_timestamp() + .with_limit(SCAN_PAGE_SIZE); let request = ListSplitsRequest::try_from_list_splits_query(&query)?; let splits = self .metastore .list_splits(request) - .await? + .await + .inspect_err(|error| { + error!(%error, "[compaction-planner] error calling metastore list_splits"); + COMPACTION_PLANNER_METRICS + .metastore_errors + .with_label_values(["scan"]) + .inc(); + })? .collect_splits() - .await?; + .await + .inspect_err(|error| { + error!(%error, "[compaction-planner] error collecting metastore splits"); + COMPACTION_PLANNER_METRICS + .metastore_errors + .with_label_values(["collect_splits"]) + .inc(); + })?; + emit_metastore_scan_metrics(&splits); Ok(splits) } @@ -163,12 +173,13 @@ impl CompactionPlanner { let splits = self.scan_metastore().await?; self.ingest_splits(splits).await; self.run_merge_policies(); + self.state.emit_metrics(); Ok(()) } fn run_merge_policies(&mut self) { for partition_key in self.state.partition_keys() { - if let Some(index_entry) = self.index_config_store.get(&partition_key.index_uid) { + if let Some(index_entry) = self.index_config_metastore.get(&partition_key.index_uid) { self.state .plan_partition(&partition_key, index_entry.merge_policy()); } @@ -179,19 +190,13 @@ impl CompactionPlanner { let pending = self.state.pop_pending(available_slots as usize); let mut assignments = Vec::with_capacity(pending.len()); - for (partition_key, operation) in pending { + for operation in pending { let task_id = Ulid::new().to_string(); - let Some(index_entry) = self.index_config_store.get(&partition_key.index_uid) else { - error!(index_uid=%partition_key.index_uid, "index config not found for pending operation, skipping"); + let Some(index_entry) = self.index_config_metastore.get(&operation.index_uid) else { + error!(index_uid=%operation.index_uid, "index config not found for pending operation, skipping"); continue; }; - let assignment = build_task_assignment( - &task_id, - index_entry, - &operation, - &partition_key.index_uid, - &partition_key.source_id, - ); + let assignment = build_task_assignment(&task_id, index_entry, &operation); let split_ids = operation .splits_as_slice() @@ -207,12 +212,24 @@ impl CompactionPlanner { } } +fn emit_metastore_scan_metrics(new_splits: &[Split]) { + let size = new_splits.len(); + info!(%size, "[compaction planner] new splits scanned from metastore"); + let counts = new_splits + .iter() + .counts_by(|split| &split.split_metadata.index_uid); + for (&index_uid, &count) in counts.iter() { + COMPACTION_PLANNER_METRICS + .new_splits_scanned + .with_label_values([&index_uid.to_string()]) + .inc_by(count as u64); + } +} + fn build_task_assignment( task_id: &str, index_entry: &IndexEntry, operation: &MergeOperation, - index_uid: &IndexUid, - source_id: &SourceId, ) -> MergeTaskAssignment { MergeTaskAssignment { task_id: task_id.to_string(), @@ -227,14 +244,16 @@ fn build_task_assignment( search_settings_json: index_entry.search_settings_json(), indexing_settings_json: index_entry.indexing_settings_json(), retention_policy_json: index_entry.retention_policy_json(), - index_uid: Some(index_uid.clone()), - source_id: source_id.to_string(), + index_uid: Some(operation.index_uid.clone()), + source_id: operation.source_id.clone(), index_storage_uri: index_entry.index_storage_uri(), + merge_level: operation.merge_level() as u64, } } #[cfg(test)] mod tests { + use std::ops::Bound; use std::time::Duration; use quickwit_common::ServiceStream; @@ -243,14 +262,15 @@ mod tests { ConstWriteAmplificationMergePolicyConfig, MergePolicyConfig, }; use quickwit_metastore::{ - IndexMetadata, IndexMetadataResponseExt, ListSplitsResponseExt, Split, SplitMaturity, - SplitMetadata, SplitState, + IndexMetadata, IndexMetadataResponseExt, ListSplitsRequestExt, ListSplitsResponseExt, + SortBy, Split, SplitMaturity, SplitMetadata, SplitState, }; use quickwit_proto::compaction::CompactionSuccess; use quickwit_proto::metastore::{ IndexMetadataResponse, ListSplitsResponse, MetastoreError, MockMetastoreService, }; use quickwit_proto::types::IndexUid; + use time::OffsetDateTime; use super::*; @@ -299,17 +319,34 @@ mod tests { } #[tokio::test] - async fn test_scan_metastore() { + async fn test_scan_metastore_query_shape_and_passthrough() { let index_uid = IndexUid::for_test("test-index", 0); - let splits = vec![ - test_split("split-1", &index_uid, 1000), - test_split("split-2", &index_uid, 2000), + let returned_splits = vec![ + test_split("a", &index_uid, 1000), + test_split("b", &index_uid, 2000), ]; - let splits_clone = splits.clone(); + let returned_clone = returned_splits.clone(); + let scan_started_at = OffsetDateTime::now_utc().unix_timestamp(); let mut mock = MockMetastoreService::new(); - mock.expect_list_splits().returning(move |_| { - let response = ListSplitsResponse::try_from_splits(splits_clone.clone()).unwrap(); + mock.expect_list_splits().returning(move |req| { + let query = req.deserialize_list_splits_query().unwrap(); + + assert_eq!(query.split_states, vec![SplitState::Published]); + assert_eq!(query.limit, Some(SCAN_PAGE_SIZE)); + assert_eq!(query.sort_by, SortBy::MaturityTimestamp); + + let Bound::Excluded(mature_at) = query.mature else { + panic!("expected Excluded mature bound, got {:?}", query.mature); + }; + let now_secs = OffsetDateTime::now_utc().unix_timestamp(); + assert!(mature_at.unix_timestamp() >= scan_started_at); + assert!(mature_at.unix_timestamp() <= now_secs); + + assert_eq!(query.update_timestamp.start, Bound::Unbounded); + assert_eq!(query.update_timestamp.end, Bound::Unbounded); + + let response = ListSplitsResponse::try_from_splits(returned_clone.clone()).unwrap(); Ok(ServiceStream::from(vec![Ok(response)])) }); @@ -317,12 +354,12 @@ mod tests { let result = planner.scan_metastore().await.unwrap(); assert_eq!(result.len(), 2); - assert_eq!(result[0].split_metadata.split_id, "split-1"); - assert_eq!(result[1].split_metadata.split_id, "split-2"); + assert_eq!(result[0].split_metadata.split_id, "a"); + assert_eq!(result[1].split_metadata.split_id, "b"); } #[tokio::test] - async fn test_ingest_splits_dedup_maturity_and_cursor() { + async fn test_ingest_splits_dedups_and_skips_mature() { let index_metadata = test_index_metadata(); let response = test_index_metadata_response(&index_metadata); let index_uid = index_metadata.index_uid.clone(); @@ -332,7 +369,6 @@ mod tests { .returning(move |_| Ok(response.clone())); let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); - planner.cursor = 0; // Pre-populate: "in-flight" is already being compacted. planner.state.track_split(SplitMetadata { @@ -352,14 +388,13 @@ mod tests { planner.ingest_splits(splits).await; - assert!(planner.state.is_split_known("fresh")); - assert!(planner.state.is_split_known("in-flight")); - assert!(!planner.state.is_split_known("mature")); - assert_eq!(planner.cursor, 3000); + assert!(planner.state.is_split_tracked("fresh")); + assert!(planner.state.is_split_tracked("in-flight")); + assert!(!planner.state.is_split_tracked("mature")); } #[tokio::test] - async fn test_scan_and_plan_metastore_error() { + async fn test_scan_and_plan_propagates_metastore_error() { let mut mock = MockMetastoreService::new(); mock.expect_list_splits().returning(|_| { Err(MetastoreError::Internal { @@ -369,11 +404,7 @@ mod tests { }); let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); - let original_cursor = planner.cursor; - - let result = planner.scan_and_plan().await; - assert!(result.is_err()); - assert_eq!(planner.cursor, original_cursor); + assert!(planner.scan_and_plan().await.is_err()); } #[tokio::test] @@ -390,10 +421,9 @@ mod tests { }); let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); - planner.cursor = 0; planner.ingest_splits(splits).await; - assert!(!planner.state.is_split_known("orphan")); + assert!(!planner.state.is_split_tracked("orphan")); } #[tokio::test] @@ -417,12 +447,60 @@ mod tests { .returning(move |_| Ok(index_metadata_response.clone())); let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); - planner.cursor = 0; planner.scan_and_plan().await.unwrap(); - assert!(planner.state.is_split_known("s1")); - assert!(planner.state.is_split_known("s2")); - assert_eq!(planner.cursor, 6000); + assert!(planner.state.is_split_tracked("s1")); + assert!(planner.state.is_split_tracked("s2")); + } + + #[tokio::test] + async fn test_failed_task_is_retracked_on_next_scan() { + // After a worker reports failure (or times out), planner-local + // tracking is cleared. Because there is no cursor, the next scan + // rediscovers the still-Published, still-immature splits and + // re-tracks them. + let index_metadata = test_index_metadata_with_merge_factor_2(); + let index_metadata_response = test_index_metadata_response(&index_metadata); + let index_uid = index_metadata.index_uid.clone(); + + let splits = vec![ + test_split("s1", &index_uid, 1000), + test_split("s2", &index_uid, 2000), + ]; + let splits_clone = splits.clone(); + + let mut mock = MockMetastoreService::new(); + mock.expect_list_splits().returning(move |_| { + let response = ListSplitsResponse::try_from_splits(splits_clone.clone()).unwrap(); + Ok(ServiceStream::from(vec![Ok(response)])) + }); + mock.expect_index_metadata() + .returning(move |_| Ok(index_metadata_response.clone())); + + let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); + let node_id = NodeId::from("worker-1"); + + planner.scan_and_plan().await.unwrap(); + let assignments = planner.assign_tasks(&node_id, 10); + assert_eq!(assignments.len(), 1); + let task_id = assignments[0].task_id.clone(); + assert!(planner.state.is_split_tracked("s1")); + assert!(planner.state.is_split_tracked("s2")); + + // Worker reports failure; planner forgets the splits. + planner + .state + .process_failures(&[quickwit_proto::compaction::CompactionFailure { + task_id, + error_message: "boom".to_string(), + }]); + assert!(!planner.state.is_split_tracked("s1")); + assert!(!planner.state.is_split_tracked("s2")); + + // Next scan rediscovers them and re-tracks them. + planner.scan_and_plan().await.unwrap(); + assert!(planner.state.is_split_tracked("s1")); + assert!(planner.state.is_split_tracked("s2")); } /// Helper: creates a planner with merge_factor=2, ingests the given splits, @@ -442,7 +520,6 @@ mod tests { }); let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); - planner.cursor = 0; let splits: Vec = split_ids .iter() @@ -501,10 +578,9 @@ mod tests { let task_id = assignments[0].task_id.clone(); // Report success for the task. - planner.state.process_successes(&[CompactionSuccess { - task_id, - merged_split_id: "merged-1".to_string(), - }]); + planner + .state + .process_successes(&[CompactionSuccess { task_id }]); // The original splits are no longer tracked. Re-ingesting them // (simulating the merged output being immature) creates new work. diff --git a/quickwit/quickwit-compaction/src/planner/compaction_state.rs b/quickwit/quickwit-compaction/src/planner/compaction_state.rs index 1d1ea07e91d..6f9dc2069c7 100644 --- a/quickwit/quickwit-compaction/src/planner/compaction_state.rs +++ b/quickwit/quickwit-compaction/src/planner/compaction_state.rs @@ -12,16 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, HashSet}; +use std::fmt::Debug; use std::sync::Arc; use std::time::{Duration, Instant}; +use itertools::Itertools; use quickwit_indexing::merge_policy::{MergeOperation, MergePolicy}; use quickwit_metastore::SplitMetadata; use quickwit_proto::compaction::{CompactionFailure, CompactionInProgress, CompactionSuccess}; use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId, SourceId, SplitId}; use tracing::{error, info, warn}; +use crate::planner::PendingOperations; +use crate::planner::metrics::COMPACTION_PLANNER_METRICS; +use crate::{TaskId, source_uid_metrics_label}; + const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(60); #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -43,8 +49,8 @@ impl CompactionPartitionKey { } } +#[derive(Debug)] struct InFlightCompaction { - task_id: String, split_ids: Vec, node_id: NodeId, last_heartbeat: Instant, @@ -53,14 +59,13 @@ struct InFlightCompaction { /// Tracks all split-level state for the compaction planner: /// which splits need compaction, which are in-flight, and which /// operations are pending assignment to workers. +#[derive(Debug)] pub struct CompactionState { needs_compaction: HashMap>, needs_compaction_split_ids: HashSet, - in_flight: HashMap, + in_flight: HashMap, in_flight_split_ids: HashSet, - /// TODO: add index_uid and source_id to MergeOperation so we don't need the partition key - /// here. - pending_operations: VecDeque<(CompactionPartitionKey, MergeOperation)>, + pending_operations: PendingOperations, } impl CompactionState { @@ -70,12 +75,12 @@ impl CompactionState { needs_compaction_split_ids: HashSet::new(), in_flight: HashMap::new(), in_flight_split_ids: HashSet::new(), - pending_operations: VecDeque::new(), + pending_operations: PendingOperations::new(), } } /// Returns true if the split is already tracked (either awaiting compaction or in-flight). - pub fn is_split_known(&self, split_id: &str) -> bool { + pub fn is_split_tracked(&self, split_id: &str) -> bool { self.needs_compaction_split_ids.contains(split_id) || self.in_flight_split_ids.contains(split_id) } @@ -102,14 +107,21 @@ impl CompactionState { let Some(splits) = self.needs_compaction.get_mut(partition_key) else { return; }; - for operation in merge_policy.operations(splits) { - for split in operation.splits_as_slice() { - self.needs_compaction_split_ids.remove(split.split_id()); - self.in_flight_split_ids - .insert(split.split_id().to_string()); + // `MergePolicy::operations` emits at most one op per level per call, which under a backlog + // leaves the bulk of `splits` untouched per tick. Loop until no new operations are created. + loop { + let operations = merge_policy.operations(splits); + if operations.is_empty() { + break; + } + for operation in operations { + for split in operation.splits_as_slice() { + self.needs_compaction_split_ids.remove(split.split_id()); + self.in_flight_split_ids + .insert(split.split_id().to_string()); + } + self.pending_operations.push(operation); } - self.pending_operations - .push_back((partition_key.clone(), operation)); } if splits.is_empty() { self.needs_compaction.remove(partition_key); @@ -132,6 +144,7 @@ impl CompactionState { if let Some(inflight) = self.in_flight.remove(&failure.task_id) { warn!(task_id=%failure.task_id, error=%failure.error_message, "compaction task failed"); for split_id in &inflight.split_ids { + // these splits will be picked up again on the next metastore scan. self.in_flight_split_ids.remove(split_id.as_str()); } } @@ -153,7 +166,6 @@ impl CompactionState { self.in_flight.insert( task.task_id.clone(), InFlightCompaction { - task_id: task.task_id.clone(), split_ids: task.split_ids.clone(), node_id: node_id.clone(), last_heartbeat: Instant::now(), @@ -165,7 +177,7 @@ impl CompactionState { pub fn check_heartbeat_timeouts(&mut self) { let now = Instant::now(); - let timed_out_task_ids: Vec = self + let timed_out_task_ids: Vec = self .in_flight .iter() .filter(|(_, inflight)| now.duration_since(inflight.last_heartbeat) > HEARTBEAT_TIMEOUT) @@ -174,36 +186,61 @@ impl CompactionState { for task_id in timed_out_task_ids { if let Some(inflight) = self.in_flight.remove(&task_id) { - error!(task_id=%task_id, node_id=%inflight.node_id, "compaction task timed out"); + error!(%task_id, node_id=%inflight.node_id, "compaction task timed out"); + COMPACTION_PLANNER_METRICS.timed_out_operations.inc(); for split_id in &inflight.split_ids { + // these splits will be picked up again on the next metastore scan. self.in_flight_split_ids.remove(split_id.as_str()); } } } } - /// Pops up to `count` pending operations for assignment. - pub fn pop_pending(&mut self, count: usize) -> Vec<(CompactionPartitionKey, MergeOperation)> { + /// Pops up to `count` pending operations for assignment, highest-score first. + pub fn pop_pending(&mut self, count: usize) -> Vec { let count = count.min(self.pending_operations.len()); let mut operations = Vec::with_capacity(count); for _ in 0..count { - operations.push(self.pending_operations.pop_front().unwrap()); + operations.push(self.pending_operations.pop().unwrap()); } operations } /// Records that an operation has been assigned to a worker. - pub fn record_assignment(&mut self, task_id: String, split_ids: Vec, node_id: NodeId) { + pub fn record_assignment(&mut self, task_id: TaskId, split_ids: Vec, node_id: NodeId) { self.in_flight.insert( - task_id.clone(), + task_id, InFlightCompaction { - task_id, split_ids, node_id, last_heartbeat: Instant::now(), }, ); } + + pub fn emit_metrics(&self) { + // total number of splits that need to be merged by compaction partition key + self.needs_compaction + .iter() + .map(|(compaction_partition_key, splits)| { + ( + source_uid_metrics_label( + &compaction_partition_key.index_uid, + &compaction_partition_key.source_id, + ), + splits.len() as i64, + ) + }) + .into_grouping_map() + .sum() + .iter() + .for_each(|(partition_key, &total_splits)| { + COMPACTION_PLANNER_METRICS + .splits_needing_compaction + .with_label_values([partition_key.as_str()]) + .set(total_splits) + }); + } } #[cfg(test)] @@ -253,11 +290,11 @@ mod tests { let index_uid = IndexUid::for_test("test-index", 0); let mut state = CompactionState::new(); - assert!(!state.is_split_known("s1")); + assert!(!state.is_split_tracked("s1")); state.track_split(test_split("s1", &index_uid)); - assert!(state.is_split_known("s1")); - assert!(!state.is_split_known("s2")); + assert!(state.is_split_tracked("s1")); + assert!(!state.is_split_tracked("s2")); } #[test] @@ -287,7 +324,7 @@ mod tests { // Splits moved from needs_compaction to in_flight. assert!(!state.pending_operations.is_empty()); - for (_, op) in &state.pending_operations { + for op in state.pending_operations.iter() { for split in op.splits_as_slice() { assert!(!state.needs_compaction_split_ids.contains(split.split_id())); assert!(state.in_flight_split_ids.contains(split.split_id())); @@ -313,21 +350,20 @@ mod tests { state.in_flight_split_ids.insert("s3".to_string()); state.record_assignment("task-2".to_string(), vec!["s3".to_string()], node_id); - assert!(state.is_split_known("s1")); - assert!(state.is_split_known("s3")); + assert!(state.is_split_tracked("s1")); + assert!(state.is_split_tracked("s3")); state.process_successes(&[CompactionSuccess { task_id: "task-1".to_string(), - merged_split_id: "merged-1".to_string(), }]); - assert!(!state.is_split_known("s1")); - assert!(!state.is_split_known("s2")); + assert!(!state.is_split_tracked("s1")); + assert!(!state.is_split_tracked("s2")); state.process_failures(&[CompactionFailure { task_id: "task-2".to_string(), error_message: "boom".to_string(), }]); - assert!(!state.is_split_known("s3")); + assert!(!state.is_split_tracked("s3")); } #[test] @@ -371,7 +407,7 @@ mod tests { let pending = state.pop_pending(1); assert_eq!(pending.len(), 1); - let (_, operation) = &pending[0]; + let operation = &pending[0]; let split_ids: Vec = operation .splits_as_slice() .iter() @@ -382,7 +418,7 @@ mod tests { state.record_assignment("task-1".to_string(), split_ids.clone(), NodeId::from("w1")); for split_id in &split_ids { - assert!(state.is_split_known(split_id)); + assert!(state.is_split_tracked(split_id)); } } diff --git a/quickwit/quickwit-compaction/src/planner/index_config_store.rs b/quickwit/quickwit-compaction/src/planner/index_config_metastore.rs similarity index 88% rename from quickwit/quickwit-compaction/src/planner/index_config_store.rs rename to quickwit/quickwit-compaction/src/planner/index_config_metastore.rs index e47044d21e3..ba37b11c758 100644 --- a/quickwit/quickwit-compaction/src/planner/index_config_store.rs +++ b/quickwit/quickwit-compaction/src/planner/index_config_metastore.rs @@ -21,9 +21,12 @@ use quickwit_indexing::merge_policy::{MergePolicy, merge_policy_from_settings}; use quickwit_metastore::{IndexMetadataResponseExt, SplitMaturity, SplitMetadata}; use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient}; use quickwit_proto::types::{DocMappingUid, IndexUid}; +use tracing::error; + +use crate::planner::metrics::COMPACTION_PLANNER_METRICS; /// Everything the planner needs to know about a single index. -#[derive(Clone)] +#[derive(Debug)] pub struct IndexEntry { config: IndexConfig, merge_policy: Arc, @@ -74,16 +77,17 @@ impl IndexEntry { /// Caches per-index configuration, merge policies, and doc mappers. /// Fetches from the metastore on demand. All accessors panic if called /// for an index that hasn't been loaded. -pub struct IndexConfigStore { +#[derive(Debug)] +pub struct IndexConfigMetastore { indexes: HashMap, - metastore: MetastoreServiceClient, + metastore_client: MetastoreServiceClient, } -impl IndexConfigStore { - pub fn new(metastore: MetastoreServiceClient) -> Self { - IndexConfigStore { +impl IndexConfigMetastore { + pub fn new(metastore_client: MetastoreServiceClient) -> Self { + IndexConfigMetastore { indexes: HashMap::new(), - metastore, + metastore_client, } } @@ -95,12 +99,20 @@ impl IndexConfigStore { doc_mapping_uid: &DocMappingUid, ) -> anyhow::Result<()> { let response = self - .metastore + .metastore_client .index_metadata(IndexMetadataRequest { index_uid: Some(index_uid.clone()), index_id: None, }) - .await?; + .await + .inspect_err(|error| { + error!(%error, "[compaction-planner] error getting index metadata from metastore"); + COMPACTION_PLANNER_METRICS + .metastore_errors + .with_label_values(["index_metadata"]) + .inc(); + })?; + let index_metadata = response.deserialize_index_metadata()?; let doc_mapper = build_doc_mapper( @@ -171,7 +183,7 @@ mod tests { .times(1) .returning(move |_| Ok(response.clone())); - let mut store = IndexConfigStore::new(MetastoreServiceClient::from_mock(mock)); + let mut store = IndexConfigMetastore::new(MetastoreServiceClient::from_mock(mock)); // First call fetches from metastore. let entry = store @@ -200,7 +212,7 @@ mod tests { }) }); - let mut store = IndexConfigStore::new(MetastoreServiceClient::from_mock(mock)); + let mut store = IndexConfigMetastore::new(MetastoreServiceClient::from_mock(mock)); let result = store .get_or_fetch(&IndexUid::for_test("missing", 0), &DocMappingUid::default()) .await; @@ -210,7 +222,7 @@ mod tests { #[tokio::test] async fn test_get_returns_none_before_fetch() { let mock = MockMetastoreService::new(); - let store = IndexConfigStore::new(MetastoreServiceClient::from_mock(mock)); + let store = IndexConfigMetastore::new(MetastoreServiceClient::from_mock(mock)); assert!(store.get(&IndexUid::for_test("test-index", 0)).is_none()); } @@ -224,7 +236,7 @@ mod tests { mock.expect_index_metadata() .returning(move |_| Ok(response.clone())); - let mut store = IndexConfigStore::new(MetastoreServiceClient::from_mock(mock)); + let mut store = IndexConfigMetastore::new(MetastoreServiceClient::from_mock(mock)); store .get_or_fetch(&index_uid, &DocMappingUid::default()) .await @@ -243,7 +255,7 @@ mod tests { mock.expect_index_metadata() .returning(move |_| Ok(response.clone())); - let mut store = IndexConfigStore::new(MetastoreServiceClient::from_mock(mock)); + let mut store = IndexConfigMetastore::new(MetastoreServiceClient::from_mock(mock)); let split = SplitMetadata { split_id: "split-1".to_string(), index_uid: index_uid.clone(), @@ -264,7 +276,7 @@ mod tests { mock.expect_index_metadata() .returning(move |_| Ok(response.clone())); - let mut store = IndexConfigStore::new(MetastoreServiceClient::from_mock(mock)); + let mut store = IndexConfigMetastore::new(MetastoreServiceClient::from_mock(mock)); let entry = store .get_or_fetch(&index_uid, &DocMappingUid::default()) .await @@ -295,7 +307,7 @@ mod tests { mock.expect_index_metadata() .returning(move |_| Ok(response.clone())); - let mut store = IndexConfigStore::new(MetastoreServiceClient::from_mock(mock)); + let mut store = IndexConfigMetastore::new(MetastoreServiceClient::from_mock(mock)); let entry = store .get_or_fetch(&index_uid, &DocMappingUid::default()) .await diff --git a/quickwit/quickwit-compaction/src/planner/metrics.rs b/quickwit/quickwit-compaction/src/planner/metrics.rs new file mode 100644 index 00000000000..4d8312d02c7 --- /dev/null +++ b/quickwit/quickwit-compaction/src/planner/metrics.rs @@ -0,0 +1,71 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use once_cell::sync::Lazy; +use quickwit_common::metrics::{ + IntCounter, IntCounterVec, IntGaugeVec, new_counter, new_counter_vec, new_gauge_vec, +}; + +pub struct CompactionPlannerMetrics { + pub new_splits_scanned: IntCounterVec<1>, + pub splits_needing_compaction: IntGaugeVec<1>, + pub pending_merge_operations: IntGaugeVec<2>, + pub timed_out_operations: IntCounter, + pub metastore_errors: IntCounterVec<1>, +} + +impl Default for CompactionPlannerMetrics { + fn default() -> Self { + CompactionPlannerMetrics { + new_splits_scanned: new_counter_vec( + "new_splits_scanned", + "cumulative number of immature splits scanned from the metastore", + "compaction_planner", + &[], + ["source_uid"], + ), + splits_needing_compaction: new_gauge_vec( + "splits_needing_compaction", + "number of splits currently tracked as needing compaction", + "compaction_planner", + &[], + ["source_uid"], + ), + pending_merge_operations: new_gauge_vec( + "pending_merge_operations", + "number of pending merge operations awaiting assignment", + "compaction_planner", + &[], + ["source_uid", "merge_level"], + ), + timed_out_operations: new_counter( + "timed_out_operations", + "cumulative number of merge operations that timed out waiting for a worker \ + heartbeat", + "compaction_planner", + &[], + ), + metastore_errors: new_counter_vec( + "metastore_errors", + "cumulative number of metastore errors encountered by the compaction planner", + "compaction_planner", + &[], + ["operation"], + ), + } + } +} + +pub static COMPACTION_PLANNER_METRICS: Lazy = + Lazy::new(CompactionPlannerMetrics::default); diff --git a/quickwit/quickwit-compaction/src/planner/mod.rs b/quickwit/quickwit-compaction/src/planner/mod.rs index b89d67ee8a2..18b408580d3 100644 --- a/quickwit/quickwit-compaction/src/planner/mod.rs +++ b/quickwit/quickwit-compaction/src/planner/mod.rs @@ -13,9 +13,65 @@ // limitations under the License. mod compaction_planner; -#[allow(dead_code)] mod compaction_state; -#[allow(dead_code)] -mod index_config_store; +mod index_config_metastore; +pub(crate) mod metrics; + +use std::collections::BinaryHeap; pub use compaction_planner::CompactionPlanner; +use quickwit_indexing::merge_policy::MergeOperation; + +use crate::planner::metrics::COMPACTION_PLANNER_METRICS; +use crate::source_uid_metrics_label; + +/// Max-heap of merge operations awaiting assignment, ordered by +/// `MergeOperation`'s score-based `Ord`. The `pending_merge_operations` gauge +/// is maintained inline; push/pop are the only mutation paths so the metric +/// stays consistent with `len()`. +#[derive(Debug)] +struct PendingOperations { + inner: BinaryHeap, +} + +impl PendingOperations { + fn new() -> Self { + Self { + inner: BinaryHeap::new(), + } + } + + fn push(&mut self, operation: MergeOperation) { + Self::adjust_gauge(&operation, 1); + self.inner.push(operation); + } + + fn pop(&mut self) -> Option { + let operation = self.inner.pop()?; + Self::adjust_gauge(&operation, -1); + Some(operation) + } + + fn len(&self) -> usize { + self.inner.len() + } + + #[cfg(test)] + fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + #[cfg(test)] + fn iter(&self) -> impl Iterator { + self.inner.iter() + } + + fn adjust_gauge(operation: &MergeOperation, delta: i64) { + let source_uid_label = source_uid_metrics_label(&operation.index_uid, &operation.source_id); + let merge_level = operation.merge_level().to_string(); + COMPACTION_PLANNER_METRICS + .pending_merge_operations + .with_label_values([source_uid_label.as_str(), merge_level.as_str()]) + .add(delta); + } +} diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json index 7269b37ae22..996d682b6e8 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.json +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.json @@ -52,9 +52,7 @@ "enable_otlp_endpoint": true, "split_store_max_num_bytes": "1T", "split_store_max_num_splits": 10000, - "max_concurrent_split_uploads": 8, - "max_merge_write_throughput": "100mb", - "merge_concurrency": 2 + "max_concurrent_split_uploads": 8 }, "ingest_api": { "replication_factor": 2 diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml index ea715dcffe0..6a63ff31883 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.toml @@ -43,8 +43,6 @@ enable_otlp_endpoint = true split_store_max_num_bytes = "1T" split_store_max_num_splits = 10_000 max_concurrent_split_uploads = 8 -max_merge_write_throughput = "100mb" -merge_concurrency = 2 [ingest_api] replication_factor = 2 diff --git a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml index face0852972..49e418ed924 100644 --- a/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml +++ b/quickwit/quickwit-config/resources/tests/node_config/quickwit.yaml @@ -47,8 +47,6 @@ indexer: split_store_max_num_bytes: 1T split_store_max_num_splits: 10000 max_concurrent_split_uploads: 8 - max_merge_write_throughput: 100mb - merge_concurrency: 2 ingest_api: replication_factor: 2 diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index ff981fcf31c..71eaa16c1d7 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -146,15 +146,6 @@ pub struct IndexerConfig { pub split_store_max_num_splits: usize, #[serde(default = "IndexerConfig::default_max_concurrent_split_uploads")] pub max_concurrent_split_uploads: usize, - /// Limits the IO throughput of the `SplitDownloader` and the `MergeExecutor`. - /// On hardware where IO is constrained, it makes sure that Merges (a batch operation) - /// does not starve indexing itself (as it is a latency sensitive operation). - #[serde(default)] - pub max_merge_write_throughput: Option, - /// Maximum number of merge or delete operation that can be executed concurrently. - /// (defaults to num_cpu / 2). - #[serde(default = "IndexerConfig::default_merge_concurrency")] - pub merge_concurrency: NonZeroUsize, /// Enables the OpenTelemetry exporter endpoint to ingest logs and traces via the OpenTelemetry /// Protocol (OTLP). #[serde(default = "IndexerConfig::default_enable_otlp_endpoint")] @@ -202,10 +193,6 @@ impl IndexerConfig { 1_000 } - pub fn default_merge_concurrency() -> NonZeroUsize { - NonZeroUsize::new(quickwit_common::num_cpus() * 2 / 3).unwrap_or(NonZeroUsize::MIN) - } - fn default_cpu_capacity() -> CpuCapacity { CpuCapacity::one_cpu_thread() * (quickwit_common::num_cpus() as u32) } @@ -220,8 +207,6 @@ impl IndexerConfig { split_store_max_num_splits: 3, max_concurrent_split_uploads: 4, cpu_capacity: PIPELINE_FULL_CAPACITY * 4u32, - max_merge_write_throughput: None, - merge_concurrency: NonZeroUsize::new(3).unwrap(), enable_standalone_compactors: false, }; Ok(indexer_config) @@ -237,8 +222,6 @@ impl Default for IndexerConfig { split_store_max_num_splits: Self::default_split_store_max_num_splits(), max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(), cpu_capacity: Self::default_cpu_capacity(), - merge_concurrency: Self::default_merge_concurrency(), - max_merge_write_throughput: None, enable_standalone_compactors: Self::default_enable_standalone_compactors(), } } @@ -247,7 +230,7 @@ impl Default for IndexerConfig { #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct CompactorConfig { - /// Maximum number of concurrent merge pipelines. Defaults to 2/3 of CPU count. + /// Maximum number of concurrent merge pipelines. Defaults to CPU count. #[serde(default = "CompactorConfig::default_max_concurrent_pipelines")] pub max_concurrent_pipelines: NonZeroUsize, /// Maximum number of concurrent split uploads across all pipelines. @@ -256,39 +239,23 @@ pub struct CompactorConfig { /// Limits the IO throughput of the split downloader and the merge executor. #[serde(default)] pub max_merge_write_throughput: Option, - /// Maximum size of the local split store cache in bytes. - #[serde(default = "CompactorConfig::default_split_store_max_num_bytes")] - pub split_store_max_num_bytes: ByteSize, - /// Maximum number of splits in the local split store cache. - #[serde(default = "CompactorConfig::default_split_store_max_num_splits")] - pub split_store_max_num_splits: usize, } impl CompactorConfig { fn default_max_concurrent_pipelines() -> NonZeroUsize { - NonZeroUsize::new(quickwit_common::num_cpus() * 2 / 3).unwrap_or(NonZeroUsize::MIN) + NonZeroUsize::new(quickwit_common::num_cpus()).unwrap_or(NonZeroUsize::MIN) } fn default_max_concurrent_split_uploads() -> usize { 12 } - pub fn default_split_store_max_num_bytes() -> ByteSize { - ByteSize::gib(100) - } - - pub fn default_split_store_max_num_splits() -> usize { - 1_000 - } - #[cfg(any(test, feature = "testsuite"))] pub fn for_test() -> Self { CompactorConfig { max_concurrent_pipelines: NonZeroUsize::new(2).unwrap(), max_concurrent_split_uploads: 4, max_merge_write_throughput: None, - split_store_max_num_bytes: ByteSize::mb(1), - split_store_max_num_splits: 3, } } } @@ -299,8 +266,6 @@ impl Default for CompactorConfig { max_concurrent_pipelines: Self::default_max_concurrent_pipelines(), max_concurrent_split_uploads: Self::default_max_concurrent_split_uploads(), max_merge_write_throughput: None, - split_store_max_num_bytes: Self::default_split_store_max_num_bytes(), - split_store_max_num_splits: Self::default_split_store_max_num_splits(), } } } @@ -949,23 +914,6 @@ mod tests { "1500m" ); } - { - let indexer_config: IndexerConfig = - serde_yaml::from_str(r#"merge_concurrency: 5"#).unwrap(); - assert_eq!( - indexer_config.merge_concurrency, - NonZeroUsize::new(5).unwrap() - ); - let indexer_config_json = serde_json::to_value(&indexer_config).unwrap(); - assert_eq!( - indexer_config_json - .get("merge_concurrency") - .unwrap() - .as_u64() - .unwrap(), - 5 - ); - } { let indexer_config: IndexerConfig = serde_yaml::from_str(r#"cpu_capacity: 1500m"#).unwrap(); diff --git a/quickwit/quickwit-config/src/node_config/serialize.rs b/quickwit/quickwit-config/src/node_config/serialize.rs index 0bf765df123..1f9dc30d6d3 100644 --- a/quickwit/quickwit-config/src/node_config/serialize.rs +++ b/quickwit/quickwit-config/src/node_config/serialize.rs @@ -673,10 +673,8 @@ mod tests { split_store_max_num_bytes: ByteSize::tb(1), split_store_max_num_splits: 10_000, max_concurrent_split_uploads: 8, - merge_concurrency: NonZeroUsize::new(2).unwrap(), cpu_capacity: IndexerConfig::default_cpu_capacity(), enable_cooperative_indexing: false, - max_merge_write_throughput: Some(ByteSize::mb(100)), enable_standalone_compactors: false, } ); diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 04a9ab004b6..0acd20706cb 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -27,7 +27,7 @@ use quickwit_common::metrics::OwnedGaugeGuard; use quickwit_common::pubsub::EventBroker; use quickwit_common::temp_dir::TempDirectory; use quickwit_common::{KillSwitch, is_metrics_index}; -use quickwit_config::{IndexingSettings, RetentionPolicy, SourceConfig}; +use quickwit_config::{IndexingSettings, SourceConfig}; use quickwit_doc_mapper::DocMapper; use quickwit_ingest::IngesterPool; use quickwit_proto::indexing::IndexingPipelineId; @@ -37,7 +37,6 @@ use quickwit_storage::{Storage, StorageResolver}; use tokio::sync::Semaphore; use tracing::{debug, error, info, instrument}; -use super::MergePlanner; use crate::SplitsUpdateMailbox; use crate::actors::doc_processor::DocProcessor; use crate::actors::index_serializer::IndexSerializer; @@ -431,7 +430,7 @@ impl IndexingPipeline { let publisher = Publisher::new( PublisherType::MainPublisher, self.params.metastore.clone(), - self.params.merge_planner_mailbox_opt.clone(), + None, Some(source_mailbox.clone()), ); let (publisher_mailbox, publisher_handle) = ctx @@ -460,10 +459,10 @@ impl IndexingPipeline { UploaderType::IndexUploader, self.params.metastore.clone(), self.params.merge_policy.clone(), - self.params.retention_policy.clone(), + None, self.params.split_store.clone(), SplitsUpdateMailbox::Sequencer(sequencer_mailbox), - self.params.max_concurrent_split_uploads_index, + self.params.max_concurrent_split_uploads, self.params.event_broker.clone(), ); let (uploader_mailbox, uploader_handle) = ctx @@ -629,7 +628,7 @@ impl IndexingPipeline { self.params.metastore.clone(), self.params.storage.clone(), SplitsUpdateMailbox::Sequencer(parquet_sequencer_mailbox), - self.params.max_concurrent_split_uploads_index, + self.params.max_concurrent_split_uploads, ); let (parquet_uploader_mailbox, parquet_uploader_handle) = ctx .spawn_actor() @@ -848,14 +847,11 @@ pub struct IndexingPipelineParams { pub indexing_directory: TempDirectory, pub indexing_settings: IndexingSettings, pub split_store: IndexingSplitStore, - pub max_concurrent_split_uploads_index: usize, + pub max_concurrent_split_uploads: usize, pub cooperative_indexing_permits: Option>, // Merge-related parameters pub merge_policy: Arc, - pub retention_policy: Option, - pub merge_planner_mailbox_opt: Option>, - pub max_concurrent_split_uploads_merge: usize, // Source-related parameters pub source_config: SourceConfig, @@ -873,8 +869,7 @@ mod tests { use std::path::PathBuf; use std::sync::Arc; - use quickwit_actors::{Command, Universe}; - use quickwit_common::ServiceStream; + use quickwit_actors::Universe; use quickwit_config::{IndexingSettings, SourceInputFormat, SourceParams}; use quickwit_doc_mapper::{DocMapper, default_doc_mapper_for_test}; use quickwit_metastore::checkpoint::IndexCheckpointDelta; @@ -887,7 +882,6 @@ mod tests { use quickwit_storage::RamStorage; use super::{IndexingPipeline, *}; - use crate::actors::merge_pipeline::{MergePipeline, MergePipelineParams}; use crate::merge_policy::default_merge_policy; #[test] @@ -971,7 +965,6 @@ mod tests { .returning(|_| Ok(EmptyResponse {})); let universe = Universe::new(); - let (merge_planner_mailbox, _) = universe.create_test_mailbox(); let storage = Arc::new(RamStorage::default()); let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); let pipeline_params = IndexingPipelineParams { @@ -986,12 +979,9 @@ mod tests { storage, split_store, merge_policy: default_merge_policy(), - retention_policy: None, queues_dir_path: PathBuf::from("./queues"), - max_concurrent_split_uploads_index: 4, - max_concurrent_split_uploads_merge: 5, + max_concurrent_split_uploads: 4, cooperative_indexing_permits: None, - merge_planner_mailbox_opt: Some(merge_planner_mailbox), event_broker: EventBroker::default(), params_fingerprint: 42u64, }; @@ -1097,7 +1087,6 @@ mod tests { let universe = Universe::new(); let storage = Arc::new(RamStorage::default()); let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); - let (merge_planner_mailbox, _) = universe.create_test_mailbox(); let pipeline_params = IndexingPipelineParams { pipeline_id, doc_mapper: Arc::new(default_doc_mapper_for_test()), @@ -1111,11 +1100,8 @@ mod tests { storage, split_store, merge_policy: default_merge_policy(), - retention_policy: None, - max_concurrent_split_uploads_index: 4, - max_concurrent_split_uploads_merge: 5, + max_concurrent_split_uploads: 4, cooperative_indexing_permits: None, - merge_planner_mailbox_opt: Some(merge_planner_mailbox), event_broker: Default::default(), params_fingerprint: 42u64, }; @@ -1140,112 +1126,6 @@ mod tests { indexing_pipeline_simple("data/test_corpus.json.gz").await } - #[tokio::test] - async fn test_merge_pipeline_does_not_stop_on_indexing_pipeline_failure() { - let node_id = NodeId::from("test-node"); - let pipeline_id = IndexingPipelineId { - node_id, - index_uid: IndexUid::new_with_random_ulid("test-index"), - source_id: "test-source".to_string(), - pipeline_uid: PipelineUid::for_test(0u128), - }; - let source_config = SourceConfig { - source_id: "test-source".to_string(), - num_pipelines: NonZeroUsize::MIN, - enabled: true, - source_params: SourceParams::void(), - transform_config: None, - input_format: SourceInputFormat::Json, - }; - let source_config_clone = source_config.clone(); - - let mut mock_metastore = MockMetastoreService::new(); - mock_metastore - .expect_index_metadata() - .withf(|index_metadata_request| { - index_metadata_request.index_uid.as_ref().unwrap() == &("test-index", 2) - }) - .returning(move |_| { - let mut index_metadata = - IndexMetadata::for_test("test-index", "ram:///indexes/test-index"); - index_metadata - .add_source(source_config_clone.clone()) - .unwrap(); - Ok(IndexMetadataResponse::try_from_index_metadata(&index_metadata).unwrap()) - }); - mock_metastore - .expect_list_splits() - .returning(|_| Ok(ServiceStream::empty())); - let metastore = MetastoreServiceClient::from_mock(mock_metastore); - - let universe = Universe::with_accelerated_time(); - let doc_mapper = Arc::new(default_doc_mapper_for_test()); - let storage = Arc::new(RamStorage::default()); - let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); - let merge_pipeline_params = MergePipelineParams { - pipeline_id: pipeline_id.merge_pipeline_id(), - doc_mapper: doc_mapper.clone(), - indexing_directory: TempDirectory::for_test(), - metastore: metastore.clone(), - split_store: split_store.clone(), - merge_policy: default_merge_policy(), - retention_policy: None, - max_concurrent_split_uploads: 2, - merge_io_throughput_limiter_opt: None, - merge_scheduler_service: universe.get_or_spawn_one(), - event_broker: Default::default(), - }; - let merge_pipeline = MergePipeline::new(merge_pipeline_params, None, universe.spawn_ctx()); - let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone(); - let (_merge_pipeline_mailbox, merge_pipeline_handler) = - universe.spawn_builder().spawn(merge_pipeline); - let indexing_pipeline_params = IndexingPipelineParams { - pipeline_id, - doc_mapper, - source_config, - source_storage_resolver: StorageResolver::for_test(), - indexing_directory: TempDirectory::for_test(), - indexing_settings: IndexingSettings::for_test(), - ingester_pool: IngesterPool::default(), - metastore, - queues_dir_path: PathBuf::from("./queues"), - storage, - split_store, - merge_policy: default_merge_policy(), - retention_policy: None, - max_concurrent_split_uploads_index: 4, - max_concurrent_split_uploads_merge: 5, - cooperative_indexing_permits: None, - merge_planner_mailbox_opt: Some(merge_planner_mailbox.clone()), - event_broker: Default::default(), - params_fingerprint: 42u64, - }; - let indexing_pipeline = IndexingPipeline::new(indexing_pipeline_params); - let (_indexing_pipeline_mailbox, indexing_pipeline_handler) = - universe.spawn_builder().spawn(indexing_pipeline); - let obs = indexing_pipeline_handler - .process_pending_and_observe() - .await; - assert_eq!(obs.generation, 1); - // Let's shutdown the indexer, this will trigger the indexing pipeline failure and the - // restart. - let indexer = universe.get::().into_iter().next().unwrap(); - let _ = indexer.ask(Command::Quit).await; - for _ in 0..10 { - universe.sleep(*quickwit_actors::HEARTBEAT).await; - // Check indexing pipeline has restarted. - let obs = indexing_pipeline_handler - .process_pending_and_observe() - .await; - if obs.generation == 2 { - assert_eq!(merge_pipeline_handler.check_health(true), Health::Healthy); - universe.quit().await; - return; - } - } - panic!("Pipeline was apparently not restarted."); - } - async fn indexing_pipeline_all_failures_handling(test_file: &str) -> anyhow::Result<()> { let node_id = NodeId::from("test-node"); let index_uid: IndexUid = IndexUid::for_test("test-index", 2); @@ -1308,7 +1188,6 @@ mod tests { let universe = Universe::new(); let storage = Arc::new(RamStorage::default()); let split_store = IndexingSplitStore::create_without_local_store_for_test(storage.clone()); - let (merge_planner_mailbox, _) = universe.create_test_mailbox(); // Create a minimal mapper with wrong date format to ensure that all documents will fail let broken_mapper = serde_json::from_str::( r#" @@ -1340,11 +1219,8 @@ mod tests { storage, split_store, merge_policy: default_merge_policy(), - retention_policy: None, - max_concurrent_split_uploads_index: 4, - max_concurrent_split_uploads_merge: 5, + max_concurrent_split_uploads: 4, cooperative_indexing_permits: None, - merge_planner_mailbox_opt: Some(merge_planner_mailbox), params_fingerprint: 42u64, event_broker: Default::default(), }; diff --git a/quickwit/quickwit-indexing/src/actors/indexing_service.rs b/quickwit/quickwit-indexing/src/actors/indexing_service.rs index 5442a98589e..7ae957937aa 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_service.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_service.rs @@ -19,17 +19,14 @@ use std::sync::Arc; use anyhow::Context; use async_trait::async_trait; -use futures::TryStreamExt; use itertools::Itertools; use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Healthz, Mailbox, Observation, }; use quickwit_cluster::Cluster; -use quickwit_common::fs::get_cache_directory_path; -use quickwit_common::io::Limiter; use quickwit_common::pubsub::EventBroker; -use quickwit_common::{io, temp_dir}; +use quickwit_common::temp_dir; use quickwit_config::{ INGEST_API_SOURCE_ID, IndexConfig, IndexerConfig, SourceConfig, build_doc_mapper, indexing_pipeline_params_fingerprint, @@ -40,31 +37,25 @@ use quickwit_ingest::{ }; use quickwit_metastore::{ IndexMetadata, IndexMetadataResponseExt, IndexesMetadataResponseExt, - ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, - SplitMetadata, SplitState, + ListIndexesMetadataResponseExt, }; use quickwit_proto::indexing::{ ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, IndexingError, IndexingPipelineId, - IndexingTask, MergePipelineId, PipelineMetrics, + IndexingTask, PipelineMetrics, }; use quickwit_proto::metastore::{ IndexMetadataRequest, IndexMetadataSubrequest, IndexesMetadataRequest, - ListIndexesMetadataRequest, ListSplitsRequest, MetastoreResult, MetastoreService, - MetastoreServiceClient, + ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient, }; use quickwit_proto::types::{IndexId, IndexUid, NodeId, PipelineUid, ShardId}; use quickwit_storage::StorageResolver; use serde::{Deserialize, Serialize}; -use time::OffsetDateTime; use tokio::sync::Semaphore; use tracing::{debug, error, info, warn}; -use super::merge_pipeline::{MergePipeline, MergePipelineParams}; -use super::{MergePlanner, MergeSchedulerService}; -use crate::actors::merge_pipeline::FinishPendingMergesAndShutdownPipeline; -use crate::models::{DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, SpawnPipeline}; +use crate::models::{DetachIndexingPipeline, ObservePipeline, SpawnPipeline}; use crate::source::{AssignShards, Assignment}; -use crate::split_store::{IndexingSplitCache, SplitStoreQuota}; +use crate::split_store::IndexingSplitCache; use crate::{IndexingPipeline, IndexingPipelineParams, IndexingSplitStore, IndexingStatistics}; /// Name of the indexing directory, usually located at `/indexing`. @@ -75,16 +66,10 @@ pub struct IndexingServiceCounters { pub num_running_pipelines: usize, pub num_successful_pipelines: usize, pub num_failed_pipelines: usize, - pub num_running_merge_pipelines: usize, pub num_deleted_queues: usize, pub num_delete_queue_failures: usize, } -struct MergePipelineHandle { - mailbox: Mailbox, - handle: ActorHandle, -} - struct PipelineHandle { mailbox: Mailbox, handle: ActorHandle, @@ -104,16 +89,13 @@ pub struct IndexingService { cluster: Cluster, metastore: MetastoreServiceClient, ingest_api_service_opt: Option>, - merge_scheduler_service_opt: Option>, ingester_pool: IngesterPool, storage_resolver: StorageResolver, indexing_pipelines: HashMap, counters: IndexingServiceCounters, - local_split_store: Arc, + split_cache: Arc, max_concurrent_split_uploads: usize, - merge_pipeline_handles: HashMap, cooperative_indexing_permits: Option>, - merge_io_throughput_limiter_opt: Option, event_broker: EventBroker, } @@ -138,20 +120,11 @@ impl IndexingService { cluster: Cluster, metastore: MetastoreServiceClient, ingest_api_service_opt: Option>, - merge_scheduler_service_opt: Option>, ingester_pool: IngesterPool, storage_resolver: StorageResolver, event_broker: EventBroker, + split_cache: Arc, ) -> anyhow::Result { - let split_store_space_quota = SplitStoreQuota::try_new( - indexer_config.split_store_max_num_splits, - indexer_config.split_store_max_num_bytes, - )?; - let merge_io_throughput_limiter_opt = - indexer_config.max_merge_write_throughput.map(io::limiter); - let split_cache_dir_path = get_cache_directory_path(&data_dir_path); - let local_split_store = - IndexingSplitCache::open(split_cache_dir_path, split_store_space_quota).await?; let indexing_root_directory = temp_dir::create_or_purge_directory(&data_dir_path.join(INDEXING_DIR_NAME)).await?; let queue_dir_path = data_dir_path.join(QUEUES_DIR_NAME); @@ -167,15 +140,12 @@ impl IndexingService { cluster, metastore, ingest_api_service_opt, - merge_scheduler_service_opt, ingester_pool, storage_resolver, - local_split_store: Arc::new(local_split_store), + split_cache, indexing_pipelines: Default::default(), counters: Default::default(), max_concurrent_split_uploads: indexer_config.max_concurrent_split_uploads, - merge_pipeline_handles: HashMap::new(), - merge_io_throughput_limiter_opt, cooperative_indexing_permits, event_broker, }) @@ -196,21 +166,6 @@ impl IndexingService { Ok(pipeline_handle.handle) } - async fn detach_merge_pipeline( - &mut self, - pipeline_id: &MergePipelineId, - ) -> Result, IndexingError> { - let pipeline_handle = self - .merge_pipeline_handles - .remove(pipeline_id) - .ok_or_else(|| { - let message = format!("could not find merge pipeline `{pipeline_id}`"); - IndexingError::Internal(message) - })?; - self.counters.num_running_merge_pipelines -= 1; - Ok(pipeline_handle.handle) - } - async fn observe_pipeline( &mut self, pipeline_uid: &PipelineUid, @@ -242,15 +197,8 @@ impl IndexingService { pipeline_uid, }; let index_config = index_metadata.into_index_config(); - self.spawn_pipeline_inner( - ctx, - pipeline_id.clone(), - index_config, - source_config, - None, - None, - ) - .await?; + self.spawn_pipeline_inner(ctx, pipeline_id.clone(), index_config, source_config, None) + .await?; Ok(pipeline_id) } @@ -260,7 +208,6 @@ impl IndexingService { indexing_pipeline_id: IndexingPipelineId, index_config: IndexConfig, source_config: SourceConfig, - immature_splits_opt: Option>, expected_params_fingerprint: Option, ) -> Result<(), IndexingError> { if self @@ -291,44 +238,11 @@ impl IndexingService { })?; let merge_policy = crate::merge_policy::merge_policy_from_settings(&index_config.indexing_settings); - let retention_policy = index_config.retention_policy_opt.clone(); - let split_store = IndexingSplitStore::new(storage.clone(), self.local_split_store.clone()); + let split_store = IndexingSplitStore::new(storage.clone(), self.split_cache.clone()); let doc_mapper = build_doc_mapper(&index_config.doc_mapping, &index_config.search_settings) .map_err(|error| IndexingError::Internal(error.to_string()))?; - let merge_planner_mailbox_opt = if let Some(merge_scheduler_service) = - self.merge_scheduler_service_opt.clone() - { - let merge_pipeline_id = indexing_pipeline_id.merge_pipeline_id(); - let merge_pipeline_params = MergePipelineParams { - pipeline_id: merge_pipeline_id, - doc_mapper: doc_mapper.clone(), - indexing_directory: indexing_directory.clone(), - metastore: self.metastore.clone(), - split_store: split_store.clone(), - merge_scheduler_service, - merge_policy: merge_policy.clone(), - retention_policy: retention_policy.clone(), - merge_io_throughput_limiter_opt: self.merge_io_throughput_limiter_opt.clone(), - max_concurrent_split_uploads: self.max_concurrent_split_uploads, - event_broker: self.event_broker.clone(), - }; - Some(self.get_or_create_merge_pipeline(merge_pipeline_params, immature_splits_opt, ctx)) - } else { - None - }; - // The concurrent uploads budget is split in 2: 1/2 for the indexing pipeline, 1/2 for the - // merge pipeline. When there is no local merge pipeline, the indexing pipeline gets the - // full budget. - let max_concurrent_split_uploads_index = if self.merge_scheduler_service_opt.is_some() { - (self.max_concurrent_split_uploads / 2).max(1) - } else { - self.max_concurrent_split_uploads - }; - let max_concurrent_split_uploads_merge = - self.max_concurrent_split_uploads - max_concurrent_split_uploads_index; - let params_fingerprint = indexing_pipeline_params_fingerprint(&index_config, &source_config); if let Some(expected_params_fingerprint) = expected_params_fingerprint { @@ -357,14 +271,10 @@ impl IndexingService { indexing_directory, indexing_settings: index_config.indexing_settings.clone(), split_store, - max_concurrent_split_uploads_index, + max_concurrent_split_uploads: self.max_concurrent_split_uploads, cooperative_indexing_permits: self.cooperative_indexing_permits.clone(), - - // Merge-related parameters + // The merge policy is needed in the uploader for determining split maturity merge_policy, - retention_policy, - max_concurrent_split_uploads_merge, - merge_planner_mailbox_opt, // Source-related parameters source_config, @@ -431,70 +341,6 @@ impl IndexingService { Ok(indexes_metadata) } - /// Fetches the immature splits candidates for merge for all the indexing pipelines for which a - /// merge pipeline is not running. - async fn fetch_immature_splits_for_new_merge_pipelines( - &mut self, - indexing_pipeline_ids: &[IndexingPipelineId], - ctx: &ActorContext, - ) -> MetastoreResult>> { - if self.merge_scheduler_service_opt.is_none() { - return Ok(Default::default()); - } - let mut index_uids = Vec::new(); - - for indexing_pipeline_id in indexing_pipeline_ids { - let merge_pipeline_id = indexing_pipeline_id.merge_pipeline_id(); - - if !self.merge_pipeline_handles.contains_key(&merge_pipeline_id) { - index_uids.push(merge_pipeline_id.index_uid); - } - } - if index_uids.is_empty() { - return Ok(Default::default()); - } - index_uids.sort_unstable(); - index_uids.dedup(); - - let list_splits_query = ListSplitsQuery::try_from_index_uids(index_uids) - .expect("`index_uids` should not be empty") - .with_node_id(self.node_id.clone()) - .with_split_state(SplitState::Published) - .retain_immature(OffsetDateTime::now_utc()); - let list_splits_request = - ListSplitsRequest::try_from_list_splits_query(&list_splits_query)?; - - let mut immature_splits_stream = ctx - .protect_future(self.metastore.list_splits(list_splits_request)) - .await?; - - let mut per_merge_pipeline_immature_splits: HashMap> = - indexing_pipeline_ids - .iter() - .map(|indexing_pipeline_id| (indexing_pipeline_id.merge_pipeline_id(), Vec::new())) - .collect(); - - let mut num_immature_splits = 0usize; - - while let Some(list_splits_response) = immature_splits_stream.try_next().await? { - for split_metadata in list_splits_response.deserialize_splits_metadata().await? { - num_immature_splits += 1; - - let merge_pipeline_id = MergePipelineId { - node_id: self.node_id.clone(), - index_uid: split_metadata.index_uid.clone(), - source_id: split_metadata.source_id.clone(), - }; - per_merge_pipeline_immature_splits - .entry(merge_pipeline_id) - .or_default() - .push(split_metadata); - } - } - info!("fetched {num_immature_splits} splits candidates for merge"); - Ok(per_merge_pipeline_immature_splits) - } - async fn handle_supervise(&mut self) -> Result<(), ActorExitStatus> { self.indexing_pipelines .retain(|pipeline_uid, pipeline_handle| { @@ -522,48 +368,6 @@ impl IndexingService { } } }); - let merge_pipelines_to_retain: HashSet = self - .indexing_pipelines - .values() - .map(|pipeline_handle| pipeline_handle.indexing_pipeline_id.merge_pipeline_id()) - .collect(); - - let merge_pipelines_to_shutdown: Vec = self - .merge_pipeline_handles - .keys() - .filter(|running_merge_pipeline_id| { - !merge_pipelines_to_retain.contains(running_merge_pipeline_id) - }) - .cloned() - .collect(); - - for merge_pipeline_to_shutdown in merge_pipelines_to_shutdown { - if let Some((_, merge_pipeline_handle)) = self - .merge_pipeline_handles - .remove_entry(&merge_pipeline_to_shutdown) - { - // We gracefully shutdown the merge pipeline, so we can complete the in-flight - // merges. - info!( - index_uid=%merge_pipeline_to_shutdown.index_uid, - source_id=%merge_pipeline_to_shutdown.source_id, - "shutting down orphan merge pipeline" - ); - // The queue capacity of the merge pipeline is unbounded, so `.send_message(...)` - // should not block. - // We avoid using `.quit()` here because it waits for the actor to exit. - merge_pipeline_handle - .handle - .mailbox() - .send_message(FinishPendingMergesAndShutdownPipeline) - .await - .expect("merge pipeline mailbox should not be full"); - } - } - // Finally, we remove the completed or failed merge pipelines. - self.merge_pipeline_handles - .retain(|_, merge_pipeline_handle| merge_pipeline_handle.handle.state().is_running()); - self.counters.num_running_merge_pipelines = self.merge_pipeline_handles.len(); self.update_chitchat_running_plan().await; let pipeline_metrics: HashMap<&IndexingPipelineId, PipelineMetrics> = self @@ -581,33 +385,6 @@ impl IndexingService { Ok(()) } - fn get_or_create_merge_pipeline( - &mut self, - merge_pipeline_params: MergePipelineParams, - immature_splits_opt: Option>, - ctx: &ActorContext, - ) -> Mailbox { - if let Some(merge_pipeline_handle) = self - .merge_pipeline_handles - .get(&merge_pipeline_params.pipeline_id) - { - return merge_pipeline_handle.mailbox.clone(); - } - let merge_pipeline_id = merge_pipeline_params.pipeline_id.clone(); - let merge_pipeline = - MergePipeline::new(merge_pipeline_params, immature_splits_opt, ctx.spawn_ctx()); - let merge_planner_mailbox = merge_pipeline.merge_planner_mailbox().clone(); - let (_pipeline_mailbox, pipeline_handle) = ctx.spawn_actor().spawn(merge_pipeline); - let merge_pipeline_handle = MergePipelineHandle { - mailbox: merge_planner_mailbox.clone(), - handle: pipeline_handle, - }; - self.merge_pipeline_handles - .insert(merge_pipeline_id, merge_pipeline_handle); - self.counters.num_running_merge_pipelines += 1; - merge_planner_mailbox - } - /// For all Ingest V2 pipelines, assigns the set of shards they should be working on. /// This is done regardless of whether there has been a change in their shard list /// or not. @@ -714,10 +491,6 @@ impl IndexingService { .map(|index_metadata| (index_metadata.index_uid.clone(), index_metadata)) .collect(); - let mut per_merge_pipeline_immature_splits: HashMap> = - self.fetch_immature_splits_for_new_merge_pipelines(&pipelines_to_spawn_ids, ctx) - .await?; - let mut spawn_pipeline_failures: Vec = Vec::new(); for (task_to_spawn, id_to_spawn) in pipelines_to_spawn.iter().zip(pipelines_to_spawn_ids) { @@ -725,17 +498,12 @@ impl IndexingService { per_index_uid_indexes_metadata.get(task_to_spawn.index_uid()) { if let Some(source_config) = index_metadata.sources.get(&task_to_spawn.source_id) { - let merge_pipeline_id = id_to_spawn.merge_pipeline_id(); - let immature_splits_opt = - per_merge_pipeline_immature_splits.remove(&merge_pipeline_id); - if let Err(error) = self .spawn_pipeline_inner( ctx, id_to_spawn.clone(), index_metadata.index_config.clone(), source_config.clone(), - immature_splits_opt, Some(task_to_spawn.params_fingerprint), ) .await @@ -914,19 +682,6 @@ impl Handler for IndexingService { } } -#[async_trait] -impl Handler for IndexingService { - type Reply = Result, IndexingError>; - - async fn handle( - &mut self, - msg: DetachMergePipeline, - _ctx: &ActorContext, - ) -> Result { - Ok(self.detach_merge_pipeline(&msg.pipeline_id).await) - } -} - #[derive(Debug)] struct SuperviseLoop; @@ -1030,18 +785,16 @@ mod tests { }; use quickwit_ingest::{CreateQueueIfNotExistsRequest, init_ingest_api}; use quickwit_metastore::{ - AddSourceRequestExt, CreateIndexRequestExt, ListIndexesMetadataResponseExt, Split, + AddSourceRequestExt, CreateIndexRequestExt, ListIndexesMetadataResponseExt, metastore_for_test, }; use quickwit_proto::indexing::IndexingTask; use quickwit_proto::metastore::{ AddSourceRequest, CreateIndexRequest, DeleteIndexRequest, IndexMetadataResponse, - IndexesMetadataResponse, ListIndexesMetadataResponse, ListSplitsResponse, - MockMetastoreService, + IndexesMetadataResponse, ListIndexesMetadataResponse, MockMetastoreService, }; use super::*; - use crate::actors::merge_pipeline::SUPERVISE_LOOP_INTERVAL; async fn spawn_indexing_service_for_test( data_dir_path: &Path, @@ -1057,7 +810,6 @@ mod tests { init_ingest_api(universe, &queues_dir_path, &IngestApiConfig::default()) .await .unwrap(); - let merge_scheduler_mailbox: Mailbox = universe.get_or_spawn_one(); let indexing_server = IndexingService::new( NodeId::from("test-node"), data_dir_path.to_path_buf(), @@ -1066,10 +818,10 @@ mod tests { cluster, metastore, Some(ingest_api_service), - Some(merge_scheduler_mailbox), IngesterPool::default(), storage_resolver.clone(), EventBroker::default(), + Arc::new(IndexingSplitCache::no_caching()), ) .await .unwrap(); @@ -1166,15 +918,8 @@ mod tests { .await .unwrap(); pipeline_handle.kill().await; - let _merge_pipeline = indexing_service - .ask_for_res(DetachMergePipeline { - pipeline_id: pipeline_id.merge_pipeline_id(), - }) - .await - .unwrap(); let observation = indexing_service_handle.process_pending_and_observe().await; assert_eq!(observation.num_running_pipelines, 0); - assert_eq!(observation.num_running_merge_pipelines, 0); universe.assert_quit().await; } @@ -1525,263 +1270,6 @@ mod tests { universe.assert_quit().await; } - #[tokio::test] - async fn test_indexing_service_shutdown_merge_pipeline_when_no_indexing_pipeline() { - quickwit_common::setup_logging_for_tests(); - let transport = ChannelTransport::default(); - let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) - .await - .unwrap(); - let metastore = metastore_for_test(); - - let index_id = append_random_suffix("test-indexing-service"); - let index_uri = format!("ram:///indexes/{index_id}"); - let index_config = IndexConfig::for_test(&index_id, &index_uri); - - let source_config = SourceConfig { - source_id: "test-indexing-service--source".to_string(), - num_pipelines: NonZeroUsize::MIN, - enabled: true, - source_params: SourceParams::void(), - transform_config: None, - input_format: SourceInputFormat::Json, - }; - let create_index_request = - CreateIndexRequest::try_from_index_config(&index_config).unwrap(); - let index_uid: IndexUid = metastore - .create_index(create_index_request) - .await - .unwrap() - .index_uid() - .clone(); - let add_source_request = - AddSourceRequest::try_from_source_config(index_uid.clone(), &source_config).unwrap(); - metastore.add_source(add_source_request).await.unwrap(); - - // Test `IndexingService::new`. - let temp_dir = tempfile::tempdir().unwrap(); - let data_dir_path = temp_dir.path().to_path_buf(); - let indexer_config = IndexerConfig::for_test().unwrap(); - let num_blocking_threads = 1; - let storage_resolver = StorageResolver::unconfigured(); - let universe = Universe::with_accelerated_time(); - let queues_dir_path = data_dir_path.join(QUEUES_DIR_NAME); - let ingest_api_service = - init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default()) - .await - .unwrap(); - let merge_scheduler_service = universe.get_or_spawn_one(); - let indexing_server = IndexingService::new( - NodeId::from("test-node"), - data_dir_path, - indexer_config, - num_blocking_threads, - cluster.clone(), - metastore.clone(), - Some(ingest_api_service), - Some(merge_scheduler_service), - IngesterPool::default(), - storage_resolver.clone(), - EventBroker::default(), - ) - .await - .unwrap(); - let (indexing_server_mailbox, indexing_server_handle) = - universe.spawn_builder().spawn(indexing_server); - let pipeline_id = indexing_server_mailbox - .ask_for_res(SpawnPipeline { - index_id: index_id.clone(), - source_config, - pipeline_uid: PipelineUid::default(), - }) - .await - .unwrap(); - let observation = indexing_server_handle.observe().await; - assert_eq!(observation.num_running_pipelines, 1); - assert_eq!(observation.num_failed_pipelines, 0); - assert_eq!(observation.num_successful_pipelines, 0); - assert_eq!(observation.num_running_merge_pipelines, 1); - - // Test `shutdown_pipeline` - let pipeline = indexing_server_mailbox - .ask_for_res(DetachIndexingPipeline { pipeline_id }) - .await - .unwrap(); - pipeline.quit().await; - - // Let the service cleanup the merge pipelines. - universe.sleep(*HEARTBEAT).await; - - let observation = indexing_server_handle.process_pending_and_observe().await; - assert_eq!(observation.num_running_pipelines, 0); - assert_eq!(observation.num_running_merge_pipelines, 0); - universe.sleep(SUPERVISE_LOOP_INTERVAL).await; - // Check that the merge pipeline is also shut down as they are no more indexing pipeilne on - // the index. - assert!(universe.get_one::().is_none()); - // It may or may not panic - universe.quit().await; - } - - #[tokio::test] - async fn test_indexing_service_no_merge_pipeline_when_no_merge_scheduler() { - quickwit_common::setup_logging_for_tests(); - let transport = ChannelTransport::default(); - let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) - .await - .unwrap(); - let metastore = metastore_for_test(); - - let index_id = append_random_suffix("test-indexing-service-no-merge"); - let index_uri = format!("ram:///indexes/{index_id}"); - let index_config = IndexConfig::for_test(&index_id, &index_uri); - - let source_config = SourceConfig { - source_id: "test-source".to_string(), - num_pipelines: NonZeroUsize::MIN, - enabled: true, - source_params: SourceParams::void(), - transform_config: None, - input_format: SourceInputFormat::Json, - }; - let create_index_request = - CreateIndexRequest::try_from_index_config(&index_config).unwrap(); - let index_uid: IndexUid = metastore - .create_index(create_index_request) - .await - .unwrap() - .index_uid() - .clone(); - let add_source_request = - AddSourceRequest::try_from_source_config(index_uid.clone(), &source_config).unwrap(); - metastore.add_source(add_source_request).await.unwrap(); - - let temp_dir = tempfile::tempdir().unwrap(); - let data_dir_path = temp_dir.path().to_path_buf(); - let indexer_config = IndexerConfig::for_test().unwrap(); - let num_blocking_threads = 1; - let storage_resolver = StorageResolver::unconfigured(); - let universe = Universe::with_accelerated_time(); - let queues_dir_path = data_dir_path.join(QUEUES_DIR_NAME); - let ingest_api_service = - init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default()) - .await - .unwrap(); - let indexing_server = IndexingService::new( - NodeId::from("test-node"), - data_dir_path, - indexer_config, - num_blocking_threads, - cluster.clone(), - metastore.clone(), - Some(ingest_api_service), - None, // No merge scheduler — external merge service handles compaction. - IngesterPool::default(), - storage_resolver.clone(), - EventBroker::default(), - ) - .await - .unwrap(); - let (indexing_server_mailbox, indexing_server_handle) = - universe.spawn_builder().spawn(indexing_server); - - indexing_server_mailbox - .ask_for_res(SpawnPipeline { - index_id: index_id.clone(), - source_config, - pipeline_uid: PipelineUid::default(), - }) - .await - .unwrap(); - - let observation = indexing_server_handle.observe().await; - assert_eq!(observation.num_running_pipelines, 1); - assert_eq!(observation.num_running_merge_pipelines, 0); - assert!(universe.get_one::().is_none()); - - universe.quit().await; - } - - #[tokio::test] - async fn test_indexing_service_spawns_merge_pipeline_with_merge_scheduler() { - quickwit_common::setup_logging_for_tests(); - let transport = ChannelTransport::default(); - let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) - .await - .unwrap(); - let metastore = metastore_for_test(); - - let index_id = append_random_suffix("test-indexing-service-with-merge"); - let index_uri = format!("ram:///indexes/{index_id}"); - let index_config = IndexConfig::for_test(&index_id, &index_uri); - - let source_config = SourceConfig { - source_id: "test-source".to_string(), - num_pipelines: NonZeroUsize::MIN, - enabled: true, - source_params: SourceParams::void(), - transform_config: None, - input_format: SourceInputFormat::Json, - }; - let create_index_request = - CreateIndexRequest::try_from_index_config(&index_config).unwrap(); - let index_uid: IndexUid = metastore - .create_index(create_index_request) - .await - .unwrap() - .index_uid() - .clone(); - let add_source_request = - AddSourceRequest::try_from_source_config(index_uid.clone(), &source_config).unwrap(); - metastore.add_source(add_source_request).await.unwrap(); - - let temp_dir = tempfile::tempdir().unwrap(); - let data_dir_path = temp_dir.path().to_path_buf(); - let indexer_config = IndexerConfig::for_test().unwrap(); - let num_blocking_threads = 1; - let storage_resolver = StorageResolver::unconfigured(); - let universe = Universe::with_accelerated_time(); - let queues_dir_path = data_dir_path.join(QUEUES_DIR_NAME); - let ingest_api_service = - init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default()) - .await - .unwrap(); - let merge_scheduler_mailbox: Mailbox = universe.get_or_spawn_one(); - let indexing_server = IndexingService::new( - NodeId::from("test-node"), - data_dir_path, - indexer_config, - num_blocking_threads, - cluster.clone(), - metastore.clone(), - Some(ingest_api_service), - Some(merge_scheduler_mailbox), - IngesterPool::default(), - storage_resolver.clone(), - EventBroker::default(), - ) - .await - .unwrap(); - let (indexing_server_mailbox, indexing_server_handle) = - universe.spawn_builder().spawn(indexing_server); - - indexing_server_mailbox - .ask_for_res(SpawnPipeline { - index_id: index_id.clone(), - source_config, - pipeline_uid: PipelineUid::default(), - }) - .await - .unwrap(); - - let observation = indexing_server_handle.observe().await; - assert_eq!(observation.num_running_pipelines, 1); - assert_eq!(observation.num_running_merge_pipelines, 1); - assert!(universe.get_one::().is_some()); - - universe.quit().await; - } - #[derive(Debug)] struct FreezePipeline; #[async_trait] @@ -1886,7 +1374,6 @@ mod tests { let observation = indexing_service_handle.observe().await; assert_eq!(observation.num_running_pipelines, 1); assert_eq!(observation.num_failed_pipelines, 0); - assert_eq!(observation.num_running_merge_pipelines, 1); // Might generate panics universe.quit().await; } @@ -1931,7 +1418,6 @@ mod tests { let indexer_config = IndexerConfig::for_test().unwrap(); let num_blocking_threads = 1; let storage_resolver = StorageResolver::unconfigured(); - let merge_scheduler_service: Mailbox = universe.get_or_spawn_one(); let mut indexing_server = IndexingService::new( NodeId::from("test-ingest-api-gc-node"), data_dir_path, @@ -1940,10 +1426,10 @@ mod tests { cluster.clone(), metastore.clone(), Some(ingest_api_service.clone()), - Some(merge_scheduler_service), IngesterPool::default(), storage_resolver.clone(), EventBroker::default(), + Arc::new(IndexingSplitCache::no_caching()), ) .await .unwrap(); @@ -2008,30 +1494,6 @@ mod tests { let response = IndexesMetadataResponse::for_test(indexes_metadata, failures); Ok(response) }); - mock_metastore - .expect_list_splits() - .withf(|request| { - let list_splits_query = request.deserialize_list_splits_query().unwrap(); - list_splits_query.index_uids.unwrap() == [("test-index-0", 0)] - }) - .return_once(|_request| Ok(ServiceStream::empty())); - mock_metastore - .expect_list_splits() - .withf(|request| { - let list_splits_query = request.deserialize_list_splits_query().unwrap(); - list_splits_query.index_uids.unwrap() == [("test-index-1", 0), ("test-index-2", 0)] - }) - .return_once(|_request| { - let splits = vec![Split { - split_metadata: SplitMetadata::for_test("test-split".to_string()), - split_state: SplitState::Published, - update_timestamp: 0, - publish_timestamp: Some(0), - }]; - let list_splits_response = ListSplitsResponse::try_from_splits(splits).unwrap(); - let response = ServiceStream::from(vec![Ok(list_splits_response)]); - Ok(response) - }); let transport = ChannelTransport::default(); let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true) diff --git a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs index bbe5267d514..fcdbad2c815 100644 --- a/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs +++ b/quickwit/quickwit-indexing/src/actors/merge_scheduler_service.rs @@ -209,30 +209,12 @@ struct ScheduleMerge { split_downloader_mailbox: Mailbox, } -/// The higher, the sooner we will execute the merge operation. -/// A good merge operation -/// - strongly reduces the number splits -/// - is light. -fn score_merge_operation(merge_operation: &MergeOperation) -> u64 { - let total_num_bytes: u64 = merge_operation.total_num_bytes(); - if total_num_bytes == 0 { - // Silly corner case that should never happen. - return u64::MAX; - } - // We will remove splits.len() and add 1 merge splits. - let delta_num_splits = (merge_operation.splits.len() - 1) as u64; - // We use integer arithmetic to avoid `f64 are not ordered` silliness. - (delta_num_splits << 48) - .checked_div(total_num_bytes) - .unwrap_or(1u64) -} - impl ScheduleMerge { pub fn new( merge_operation: TrackedObject, split_downloader_mailbox: Mailbox, ) -> ScheduleMerge { - let score = score_merge_operation(&merge_operation); + let score = merge_operation.score; ScheduleMerge { score, merge_operation, @@ -315,24 +297,6 @@ mod tests { MergeOperation::new_merge_operation(splits) } - #[test] - fn test_score_merge_operation() { - let score_merge_operation_aux = |num_splits, num_bytes_per_split| { - let merge_operation = build_merge_operation(num_splits, num_bytes_per_split); - score_merge_operation(&merge_operation) - }; - assert!(score_merge_operation_aux(10, 10_000_000) < score_merge_operation_aux(10, 999_999)); - assert!( - score_merge_operation_aux(10, 10_000_000) > score_merge_operation_aux(9, 10_000_000) - ); - assert_eq!( - // 9 - 1 = 8 splits removed. - score_merge_operation_aux(9, 10_000_000), - // 5 - 1 = 4 splits removed. - score_merge_operation_aux(5, 10_000_000 * 9 / 10) - ); - } - #[tokio::test] async fn test_merge_schedule_service_prioritize() { let universe = Universe::new(); diff --git a/quickwit/quickwit-indexing/src/lib.rs b/quickwit/quickwit-indexing/src/lib.rs index 66c50d4d150..379f038484f 100644 --- a/quickwit/quickwit-indexing/src/lib.rs +++ b/quickwit/quickwit-indexing/src/lib.rs @@ -14,6 +14,8 @@ #![deny(clippy::disallowed_methods)] +use std::sync::Arc; + use quickwit_actors::{Mailbox, Universe}; use quickwit_cluster::Cluster; use quickwit_common::pubsub::EventBroker; @@ -24,7 +26,6 @@ use quickwit_proto::metastore::MetastoreServiceClient; use quickwit_storage::StorageResolver; use tracing::info; -use crate::actors::MergeSchedulerService; pub use crate::actors::{ FinishPendingMergesAndShutdownPipeline, IndexingError, IndexingPipeline, IndexingPipelineParams, IndexingService, PublisherType, Sequencer, SplitsUpdateMailbox, @@ -32,7 +33,8 @@ pub use crate::actors::{ pub use crate::controlled_directory::ControlledDirectory; use crate::models::IndexingStatistics; pub use crate::split_store::{ - IndexingSplitCache, IndexingSplitStore, get_tantivy_directory_from_split_bundle, + IndexingSplitCache, IndexingSplitStore, SplitStoreQuota, + get_tantivy_directory_from_split_bundle, }; pub mod actors; @@ -71,7 +73,7 @@ pub async fn start_indexing_service( ingester_pool: IngesterPool, storage_resolver: StorageResolver, event_broker: EventBroker, - merge_scheduler_mailbox: Option>, + indexing_split_cache: Arc, ) -> anyhow::Result> { info!("starting indexer service"); let ingest_api_service_mailbox = universe.get_one::(); @@ -83,10 +85,10 @@ pub async fn start_indexing_service( cluster, metastore.clone(), ingest_api_service_mailbox, - merge_scheduler_mailbox, ingester_pool, storage_resolver, event_broker, + indexing_split_cache, ) .await?; let (indexing_service, _) = universe.spawn_builder().spawn(indexing_service); diff --git a/quickwit/quickwit-indexing/src/merge_policy/mod.rs b/quickwit/quickwit-indexing/src/merge_policy/mod.rs index 9319f8d8498..f950c30635c 100644 --- a/quickwit/quickwit-indexing/src/merge_policy/mod.rs +++ b/quickwit/quickwit-indexing/src/merge_policy/mod.rs @@ -26,7 +26,7 @@ pub use nop_merge_policy::NopMergePolicy; use quickwit_config::IndexingSettings; use quickwit_config::merge_policy_config::MergePolicyConfig; use quickwit_metastore::{SplitMaturity, SplitMetadata}; -use quickwit_proto::types::SplitId; +use quickwit_proto::types::{IndexUid, SourceId, SplitId}; use serde::Serialize; pub(crate) use stable_log_merge_policy::StableLogMergePolicy; use tantivy::TrackedObject; @@ -83,20 +83,35 @@ pub struct MergeOperation { #[serde(skip_serializing)] pub merge_parent_span: Span, pub merge_split_id: SplitId, + pub index_uid: IndexUid, + pub source_id: SourceId, pub splits: Vec, pub operation_type: MergeOperationType, + /// Priority score, computed once at construction. Higher = run sooner. + /// `Ord`/`Eq` for `MergeOperation` are defined purely on this field so a + /// `BinaryHeap` is a max-heap by score. + pub score: u64, } impl MergeOperation { + /// All splits must belong to the same `(index_uid, source_id)` — a precondition + /// merge policies already satisfy because they partition before merging. pub fn new_merge_operation(splits: Vec) -> Self { + let first_split = splits.first().expect("merge operation must have splits"); + let index_uid = first_split.index_uid.clone(); + let source_id = first_split.source_id.clone(); let merge_split_id = new_split_id(); let split_ids = splits.iter().map(|split| split.split_id()).collect_vec(); let merge_parent_span = info_span!("merge", merge_split_id=%merge_split_id, split_ids=?split_ids, typ=%MergeOperationType::Merge); + let score = compute_score(&splits); Self { merge_parent_span, merge_split_id, + index_uid, + source_id, splits, operation_type: MergeOperationType::Merge, + score, } } @@ -108,19 +123,77 @@ impl MergeOperation { } pub fn new_delete_and_merge_operation(split: SplitMetadata) -> Self { + let index_uid = split.index_uid.clone(); + let source_id = split.source_id.clone(); let merge_split_id = new_split_id(); let merge_parent_span = info_span!("delete", merge_split_id=%merge_split_id, split_ids=?split.split_id(), typ=%MergeOperationType::DeleteAndMerge); + let splits = vec![split]; + let score = compute_score(&splits); Self { merge_parent_span, merge_split_id, - splits: vec![split], + index_uid, + source_id, + splits, operation_type: MergeOperationType::DeleteAndMerge, + score, } } pub fn splits_as_slice(&self) -> &[SplitMetadata] { self.splits.as_slice() } + + pub fn merge_level(&self) -> usize { + self.splits + .iter() + .map(|s| s.num_merge_ops) + .max() + .unwrap_or(0) + } +} + +/// The higher, the sooner we will execute the merge operation. +/// A good merge operation: +/// - strongly reduces the number of splits +/// - is light. +fn compute_score(splits: &[SplitMetadata]) -> u64 { + let total_num_bytes: u64 = splits.iter().map(|split| split.footer_offsets.end).sum(); + if total_num_bytes == 0 { + // Silly corner case that should never happen. + return u64::MAX; + } + // We will remove splits.len() and add 1 merge split. + let delta_num_splits = (splits.len() - 1) as u64; + // We use integer arithmetic to avoid `f64 are not ordered` silliness. + (delta_num_splits << 48) + .checked_div(total_num_bytes) + .unwrap_or(1u64) +} + +impl PartialEq for MergeOperation { + fn eq(&self, other: &Self) -> bool { + self.merge_split_id == other.merge_split_id + } +} + +impl Eq for MergeOperation {} + +impl PartialOrd for MergeOperation { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for MergeOperation { + /// The way we reason about ordering merge operations is that a highest score should take + /// precedence over a lower score; by that logic, score already serves the Ord property; we + /// formalize that here. + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.score + .cmp(&other.score) + .then_with(|| self.merge_split_id.cmp(&other.merge_split_id)) + } } impl fmt::Debug for MergeOperation { @@ -233,6 +306,30 @@ pub mod tests { }; use crate::models::{NewSplits, create_split_metadata}; + #[test] + fn test_score() { + fn op(num_splits: usize, num_bytes_per_split: u64) -> MergeOperation { + let splits: Vec = (0..num_splits) + .map(|_| SplitMetadata { + footer_offsets: 0..num_bytes_per_split, + ..Default::default() + }) + .collect(); + MergeOperation::new_merge_operation(splits) + } + // Lighter merge (smaller total bytes) at the same split count scores higher. + assert!(op(10, 10_000_000).score < op(10, 999_999).score); + // More splits removed at the same total bytes scores higher. + assert!(op(10, 10_000_000).score > op(9, 10_000_000).score); + // Score is `(delta_splits << 48) / total_bytes` — equal ratios yield equal scores. + assert_eq!( + // 9 splits, 90M bytes → delta=8. + op(9, 10_000_000).score, + // 5 splits, 45M bytes → delta=4 (same 8/90M ratio). + op(5, 10_000_000 * 9 / 10).score, + ); + } + fn pow_of_10(n: usize) -> usize { 10usize.pow(n as u32) } diff --git a/quickwit/quickwit-indexing/src/models/indexing_service_message.rs b/quickwit/quickwit-indexing/src/models/indexing_service_message.rs index 67548b9c09d..868f8029be7 100644 --- a/quickwit/quickwit-indexing/src/models/indexing_service_message.rs +++ b/quickwit/quickwit-indexing/src/models/indexing_service_message.rs @@ -13,7 +13,7 @@ // limitations under the License. use quickwit_config::SourceConfig; -use quickwit_proto::indexing::{IndexingPipelineId, MergePipelineId}; +use quickwit_proto::indexing::IndexingPipelineId; use quickwit_proto::types::{IndexId, PipelineUid}; #[derive(Clone, Debug)] @@ -31,14 +31,6 @@ pub struct DetachIndexingPipeline { pub pipeline_id: IndexingPipelineId, } -/// Detaches a merge pipeline from the indexing service. The pipeline is no longer managed by the -/// server. This is mostly useful for preventing the server killing an existing merge pipeline -/// if a indexing pipeline is detached. -#[derive(Debug)] -pub struct DetachMergePipeline { - pub pipeline_id: MergePipelineId, -} - #[derive(Debug)] pub struct ObservePipeline { pub pipeline_id: IndexingPipelineId, diff --git a/quickwit/quickwit-indexing/src/models/mod.rs b/quickwit/quickwit-indexing/src/models/mod.rs index fd3188e2104..7f84bbab0af 100644 --- a/quickwit/quickwit-indexing/src/models/mod.rs +++ b/quickwit/quickwit-indexing/src/models/mod.rs @@ -34,9 +34,7 @@ pub use indexed_split::{ CommitTrigger, EmptySplit, IndexedSplit, IndexedSplitBatch, IndexedSplitBatchBuilder, IndexedSplitBuilder, }; -pub use indexing_service_message::{ - DetachIndexingPipeline, DetachMergePipeline, ObservePipeline, SpawnPipeline, -}; +pub use indexing_service_message::{DetachIndexingPipeline, ObservePipeline, SpawnPipeline}; pub use indexing_statistics::IndexingStatistics; pub use merge_planner_message::NewSplits; pub use merge_scratch::MergeScratch; diff --git a/quickwit/quickwit-indexing/src/split_store/indexing_split_cache.rs b/quickwit/quickwit-indexing/src/split_store/indexing_split_cache.rs index 70f19410ecf..66ef9dd6005 100644 --- a/quickwit/quickwit-indexing/src/split_store/indexing_split_cache.rs +++ b/quickwit/quickwit-indexing/src/split_store/indexing_split_cache.rs @@ -21,11 +21,13 @@ use std::time::{Duration, SystemTime}; use anyhow::Context; use bytesize::ByteSize; +use quickwit_common::fs::get_cache_directory_path; use quickwit_common::split_file; +use quickwit_config::IndexerConfig; use quickwit_directories::BundleDirectory; use quickwit_storage::StorageResult; use tantivy::Directory; -use tantivy::directory::MmapDirectory; +use tantivy::directory::{Advice, MmapDirectory}; use tokio::sync::Mutex; use tracing::{debug, error, warn}; use ulid::Ulid; @@ -38,12 +40,15 @@ const SPLIT_MAX_AGE: Duration = Duration::from_secs(2 * 24 * 3_600); // 2 days pub fn get_tantivy_directory_from_split_bundle( split_file: &Path, ) -> StorageResult> { - let mmap_directory = MmapDirectory::open(split_file.parent().ok_or_else(|| { - io::Error::new( - io::ErrorKind::NotFound, - format!("couldn't find parent for {}", split_file.display()), - ) - })?)?; + let mmap_directory = MmapDirectory::open_with_madvice( + split_file.parent().ok_or_else(|| { + io::Error::new( + io::ErrorKind::NotFound, + format!("couldn't find parent for {}", split_file.display()), + ) + })?, + Advice::Sequential, + )?; let split_fileslice = mmap_directory.open_read(Path::new(&split_file))?; Ok(Box::new(BundleDirectory::open_split(split_fileslice)?)) } @@ -361,6 +366,31 @@ impl IndexingSplitCache { IndexingSplitCache { inner } } + /// Builds an [`IndexingSplitCache`] from an [`IndexerConfig`]. + /// + /// A zero quota for either dimension produces a [`IndexingSplitCache::no_caching`] + /// instance — useful when compaction runs on dedicated nodes and indexers no + /// longer benefit from caching freshly produced splits. Otherwise, opens the + /// cache rooted at `/indexer-split-cache/splits`. + pub async fn from_config( + indexer_config: &IndexerConfig, + data_dir_path: &Path, + ) -> anyhow::Result { + if indexer_config.split_store_max_num_bytes.as_u64() == 0 + || indexer_config.split_store_max_num_splits == 0 + { + return Ok(IndexingSplitCache::no_caching()); + } + let cache_path = get_cache_directory_path(data_dir_path); + let quota = SplitStoreQuota::try_new( + indexer_config.split_store_max_num_splits, + indexer_config.split_store_max_num_bytes, + )?; + IndexingSplitCache::open(cache_path, quota) + .await + .context("failed to open indexing split cache") + } + /// Try to open an existing local split store directory. /// /// If the directory does not exists, it will be created. @@ -508,6 +538,7 @@ mod tests { use std::time::Duration; use bytesize::ByteSize; + use quickwit_config::IndexerConfig; use quickwit_directories::BundleDirectory; use quickwit_storage::{PutPayload, SplitPayloadBuilder}; use tantivy::Directory; @@ -530,6 +561,54 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_from_config() { + // A zero quota in either dimension yields a no-caching cache that does + // not touch the filesystem; a positive quota opens (and creates) the + // cache directory at `/indexer-split-cache/splits`. + let zero_bytes = { + let mut config = IndexerConfig::for_test().unwrap(); + config.split_store_max_num_bytes = ByteSize(0); + config + }; + let zero_splits = { + let mut config = IndexerConfig::for_test().unwrap(); + config.split_store_max_num_splits = 0; + config + }; + let both_zero = { + let mut config = IndexerConfig::for_test().unwrap(); + config.split_store_max_num_bytes = ByteSize(0); + config.split_store_max_num_splits = 0; + config + }; + for config in [zero_bytes, zero_splits, both_zero] { + let data_dir = tempdir().unwrap(); + let _cache = IndexingSplitCache::from_config(&config, data_dir.path()) + .await + .unwrap(); + assert!( + !data_dir + .path() + .join("indexer-split-cache") + .try_exists() + .unwrap(), + "no-caching variant must not create the cache directory", + ); + } + + let data_dir = tempdir().unwrap(); + let config = IndexerConfig::for_test().unwrap(); + let _cache = IndexingSplitCache::from_config(&config, data_dir.path()) + .await + .unwrap(); + let cache_dir = data_dir.path().join("indexer-split-cache").join("splits"); + assert!( + cache_dir.is_dir(), + "positive quota must open (and create) the cache directory", + ); + } + #[tokio::test] async fn test_local_split_store_load_existing_splits() -> anyhow::Result<()> { let temp_dir = tempfile::tempdir()?; diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 550e40aa16f..62634d136cc 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -40,6 +40,7 @@ use serde_json::Value as JsonValue; use crate::actors::IndexingService; use crate::models::{DetachIndexingPipeline, IndexingStatistics, SpawnPipeline}; +use crate::split_store::IndexingSplitCache; /// Creates a Test environment. /// @@ -111,7 +112,6 @@ impl TestSandbox { let num_blocking_threads = 1; let storage = storage_resolver.resolve(&index_uri).await?; let universe = Universe::with_accelerated_time(); - let merge_scheduler_mailbox = universe.get_or_spawn_one(); let queues_dir_path = temp_dir.path().join(QUEUES_DIR_NAME); let ingest_api_service = init_ingest_api(&universe, &queues_dir_path, &IngestApiConfig::default()).await?; @@ -123,10 +123,10 @@ impl TestSandbox { cluster, metastore.clone(), Some(ingest_api_service), - Some(merge_scheduler_mailbox), IngesterPool::default(), storage_resolver.clone(), EventBroker::default(), + Arc::new(IndexingSplitCache::no_caching()), ) .await?; let (indexing_service, _indexing_service_handle) = diff --git a/quickwit/quickwit-janitor/Cargo.toml b/quickwit/quickwit-janitor/Cargo.toml index 8d4cad3beb9..16a9a540806 100644 --- a/quickwit/quickwit-janitor/Cargo.toml +++ b/quickwit/quickwit-janitor/Cargo.toml @@ -27,6 +27,7 @@ utoipa = { workspace = true } quickwit-actors = { workspace = true } quickwit-common = { workspace = true } +quickwit-compaction = { workspace = true } quickwit-config = { workspace = true } quickwit-doc-mapper = { workspace = true } quickwit-index-management = { workspace = true } diff --git a/quickwit/quickwit-janitor/src/janitor_service.rs b/quickwit/quickwit-janitor/src/janitor_service.rs index b8bfe5e54b0..712458ad805 100644 --- a/quickwit/quickwit-janitor/src/janitor_service.rs +++ b/quickwit/quickwit-janitor/src/janitor_service.rs @@ -16,6 +16,7 @@ use async_trait::async_trait; use quickwit_actors::{ Actor, ActorContext, ActorExitStatus, ActorHandle, ActorState, Handler, Healthz, }; +use quickwit_compaction::planner::CompactionPlanner; use serde_json::{Value as JsonValue, json}; use crate::actors::{DeleteTaskService, GarbageCollector, RetentionPolicyExecutor}; @@ -24,6 +25,7 @@ pub struct JanitorService { delete_task_service_handle: Option>, garbage_collector_handle: ActorHandle, retention_policy_executor_handle: ActorHandle, + compaction_planner_handle: ActorHandle, } impl JanitorService { @@ -31,11 +33,13 @@ impl JanitorService { delete_task_service_handle: Option>, garbage_collector_handle: ActorHandle, retention_policy_executor_handle: ActorHandle, + compaction_planner_handle: ActorHandle, ) -> Self { Self { delete_task_service_handle, garbage_collector_handle, retention_policy_executor_handle, + compaction_planner_handle, } } @@ -49,6 +53,7 @@ impl JanitorService { delete_task_is_not_failure && self.garbage_collector_handle.state() != ActorState::Failure && self.retention_policy_executor_handle.state() != ActorState::Failure + && self.compaction_planner_handle.state() != ActorState::Failure } } diff --git a/quickwit/quickwit-janitor/src/lib.rs b/quickwit/quickwit-janitor/src/lib.rs index bef73160377..8bac8c46c21 100644 --- a/quickwit/quickwit-janitor/src/lib.rs +++ b/quickwit/quickwit-janitor/src/lib.rs @@ -14,8 +14,9 @@ #![deny(clippy::disallowed_methods)] -use quickwit_actors::{Mailbox, Universe}; +use quickwit_actors::{ActorHandle, Mailbox, Universe}; use quickwit_common::pubsub::EventBroker; +use quickwit_compaction::planner::CompactionPlanner; use quickwit_config::NodeConfig; use quickwit_indexing::actors::MergeSchedulerService; use quickwit_metastore::SplitInfo; @@ -39,6 +40,7 @@ use crate::actors::{DeleteTaskService, GarbageCollector, RetentionPolicyExecutor /// Schema used for the OpenAPI generation which are apart of this crate. pub struct JanitorApiSchemas; +#[allow(clippy::too_many_arguments)] pub async fn start_janitor_service( universe: &Universe, config: &NodeConfig, @@ -47,6 +49,7 @@ pub async fn start_janitor_service( storage_resolver: StorageResolver, event_broker: EventBroker, run_delete_task_service: bool, + compaction_planner_handle: ActorHandle, ) -> anyhow::Result> { info!("starting janitor service"); let garbage_collector = GarbageCollector::new(metastore.clone(), storage_resolver.clone()); @@ -77,6 +80,7 @@ pub async fn start_janitor_service( delete_task_service_handle, garbage_collector_handle, retention_policy_executor_handle, + compaction_planner_handle, ); let (janitor_service_mailbox, _janitor_service_handle) = universe.spawn_builder().spawn(janitor_service); diff --git a/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.down.sql new file mode 100644 index 00000000000..5aae02487bd --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.down.sql @@ -0,0 +1 @@ +DROP INDEX IF EXISTS splits_published_maturity_timestamp_idx; \ No newline at end of file diff --git a/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.up.sql b/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.up.sql new file mode 100644 index 00000000000..ec26bb98298 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/27_create-index-splits-published-maturity-timestamp.up.sql @@ -0,0 +1,19 @@ +-- Index for the compaction planner's scan, which on every tick reads up to +-- LIMIT splits matching: +-- split_state = 'Published' +-- maturity_timestamp > now() +-- ordered by maturity_timestamp ascending. The planner keeps a local set of +-- already-tracked splits and dedups against it, so re-reading the immature +-- set every tick is intentional -- it's how the planner recovers splits +-- whose merge timed out or failed. +-- +-- The btree on (maturity_timestamp, split_id) lets postgres seek to the live +-- "still immature" range in index order, satisfying both the filter and the +-- ORDER BY without an extra sort. The split_id column is included as a +-- tiebreaker so postgres returns deterministic pages under LIMIT. +-- +-- The partial predicate is restricted to split_state = 'Published' because +-- partial-index predicates must be IMMUTABLE; "now()" cannot appear here. +CREATE INDEX IF NOT EXISTS splits_published_maturity_timestamp_idx + ON splits (maturity_timestamp, split_id) + WHERE split_state = 'Published'; \ No newline at end of file diff --git a/quickwit/quickwit-metastore/src/lib.rs b/quickwit/quickwit-metastore/src/lib.rs index e8fffeb7962..83c492e1ba0 100644 --- a/quickwit/quickwit-metastore/src/lib.rs +++ b/quickwit/quickwit-metastore/src/lib.rs @@ -48,7 +48,7 @@ pub use metastore::{ ListMetricsSplitsQuery, ListMetricsSplitsRequestExt, ListMetricsSplitsResponseExt, ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt, MetastoreServiceStreamSplitsExt, PublishMetricsSplitsRequestExt, PublishSplitsRequestExt, - StageMetricsSplitsRequestExt, StageSplitsRequestExt, UpdateIndexRequestExt, + SortBy, StageMetricsSplitsRequestExt, StageSplitsRequestExt, UpdateIndexRequestExt, UpdateSourceRequestExt, file_backed, }; pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore}; diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index ddfee25afab..caaae7316a8 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -875,10 +875,21 @@ pub struct ListSplitsQuery { } #[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] +/// Ordering applied to the result of a [`ListSplitsQuery`]. pub enum SortBy { + /// No ordering — the metastore may return splits in any order. None, + /// Order by `(delete_opstamp ASC, publish_timestamp ASC)`. Used by the + /// delete pipeline to process the splits with the most pending delete + /// work first. Staleness, + /// Order by `(index_uid ASC, split_id ASC)`, matching the splits-table + /// primary key. Used for stable pagination across all indexes. IndexUid, + /// Order by `(maturity_timestamp ASC, split_id ASC)`. Used by the + /// compaction planner so that under a backlog the splits closest to + /// becoming mature are processed first. + MaturityTimestamp, } impl SortBy { @@ -904,6 +915,16 @@ impl SortBy { .split_id .cmp(&right_split.split_metadata.split_id) }), + SortBy::MaturityTimestamp => left_split + .split_metadata + .maturity_unix_timestamp() + .cmp(&right_split.split_metadata.maturity_unix_timestamp()) + .then_with(|| { + left_split + .split_metadata + .split_id + .cmp(&right_split.split_metadata.split_id) + }), } } } @@ -1154,6 +1175,12 @@ impl ListSplitsQuery { self } + /// Sorts the splits by maturity_timestamp ascending, with split_id as a tiebreaker. + pub fn sort_by_maturity_timestamp(mut self) -> Self { + self.sort_by = SortBy::MaturityTimestamp; + self + } + /// Only return splits whose (index_uid, split_id) are lexicographically after this split. /// This is only useful if results are sorted by index_uid and split_id. pub fn after_split(mut self, split_meta: &SplitMetadata) -> Self { diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index 28519b8a294..29c26e41fb2 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -211,6 +211,10 @@ pub(super) fn append_query_filters_and_order_by( sql.order_by(Splits::IndexUid, Order::Asc) .order_by(Splits::SplitId, Order::Asc); } + SortBy::MaturityTimestamp => { + sql.order_by(Splits::MaturityTimestamp, Order::Asc) + .order_by(Splits::SplitId, Order::Asc); + } SortBy::None => (), } diff --git a/quickwit/quickwit-metastore/src/split_metadata.rs b/quickwit/quickwit-metastore/src/split_metadata.rs index fe88fe379d3..95066e87838 100644 --- a/quickwit/quickwit-metastore/src/split_metadata.rs +++ b/quickwit/quickwit-metastore/src/split_metadata.rs @@ -218,6 +218,18 @@ impl SplitMetadata { } } + /// Returns the unix timestamp at which the split becomes mature, or 0 if + /// the split is already mature (matching the metastore's stored + /// `maturity_timestamp` column). + pub fn maturity_unix_timestamp(&self) -> i64 { + match self.maturity { + SplitMaturity::Mature => 0, + SplitMaturity::Immature { maturation_period } => { + self.create_timestamp + maturation_period.as_secs() as i64 + } + } + } + #[cfg(any(test, feature = "testsuite"))] /// Returns an instance of `SplitMetadata` for testing. pub fn for_test(split_id: SplitId) -> SplitMetadata { diff --git a/quickwit/quickwit-proto/protos/quickwit/compaction.proto b/quickwit/quickwit-proto/protos/quickwit/compaction.proto index 7313d85db91..3c82d6b34c7 100644 --- a/quickwit/quickwit-proto/protos/quickwit/compaction.proto +++ b/quickwit/quickwit-proto/protos/quickwit/compaction.proto @@ -39,7 +39,6 @@ message CompactionInProgress { message CompactionSuccess { string task_id = 1; - string merged_split_id = 2; } message CompactionFailure { @@ -61,4 +60,5 @@ message MergeTaskAssignment { quickwit.common.IndexUid index_uid = 7; string source_id = 8; string index_storage_uri = 9; + uint64 merge_level = 10; } \ No newline at end of file diff --git a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.compaction.rs b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.compaction.rs index f96bb33ed20..1404816f276 100644 --- a/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.compaction.rs +++ b/quickwit/quickwit-proto/src/codegen/quickwit/quickwit.compaction.rs @@ -30,8 +30,6 @@ pub struct CompactionInProgress { pub struct CompactionSuccess { #[prost(string, tag = "1")] pub task_id: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub merged_split_id: ::prost::alloc::string::String, } #[derive(serde::Serialize, serde::Deserialize, utoipa::ToSchema)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -68,6 +66,8 @@ pub struct MergeTaskAssignment { pub source_id: ::prost::alloc::string::String, #[prost(string, tag = "9")] pub index_storage_uri: ::prost::alloc::string::String, + #[prost(uint64, tag = "10")] + pub merge_level: u64, } /// BEGIN quickwit-codegen #[allow(unused_imports)] diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 4c2e8d8499c..1fe1b69069c 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -57,7 +57,7 @@ pub use format::BodyFormat; use futures::StreamExt; use itertools::Itertools; use once_cell::sync::Lazy; -use quickwit_actors::{ActorExitStatus, Mailbox, SpawnContext, Universe}; +use quickwit_actors::{ActorExitStatus, ActorHandle, Mailbox, SpawnContext, Universe}; use quickwit_cluster::{ Cluster, ClusterChange, ClusterChangeStream, ClusterNode, ListenerHandle, start_cluster_service, }; @@ -79,9 +79,9 @@ use quickwit_config::{ClusterConfig, IngestApiConfig, NodeConfig}; use quickwit_control_plane::control_plane::{ControlPlane, ControlPlaneEventSubscriber}; use quickwit_control_plane::{IndexerNodeInfo, IndexerPool}; use quickwit_index_management::{IndexService as IndexManager, IndexServiceError}; -use quickwit_indexing::actors::{IndexingService, MergeSchedulerService}; +use quickwit_indexing::actors::IndexingService; use quickwit_indexing::models::ShardPositionsService; -use quickwit_indexing::start_indexing_service; +use quickwit_indexing::{IndexingSplitCache, start_indexing_service}; use quickwit_ingest::{ GetMemoryCapacity, IngestRequest, IngestRouter, IngestServiceClient, Ingester, IngesterPool, IngesterPoolEntry, LocalShardsUpdate, get_idle_shard_timeout, @@ -275,22 +275,31 @@ async fn balance_channel_for_service( /// /// On janitor nodes, spawns a `CompactionPlanner` actor and builds the client from /// its mailbox. On compactor-only nodes, connects to a remote janitor via gRPC. +/// +/// The second tuple element is the local planner's `ActorHandle`, returned only +/// on janitor nodes so the caller can attach it to the janitor liveness probe. async fn get_compaction_planner_client_if_needed( node_config: &NodeConfig, cluster: &Cluster, universe: &Universe, metastore_client: &MetastoreServiceClient, -) -> anyhow::Result> { +) -> anyhow::Result<( + Option, + Option>, +)> { let is_janitor = node_config.is_service_enabled(QuickwitService::Janitor); let is_compactor = node_config.is_service_enabled(QuickwitService::Compactor); if !is_janitor && !is_compactor { - return Ok(None); + return Ok((None, None)); } if is_janitor { let planner = CompactionPlanner::new(metastore_client.clone()); - let (mailbox, _handle) = universe.spawn_builder().spawn(planner); + let (mailbox, handle) = universe.spawn_builder().spawn(planner); info!("compaction planner actor started on janitor node"); - return Ok(Some(CompactionPlannerServiceClient::from_mailbox(mailbox))); + return Ok(( + Some(CompactionPlannerServiceClient::from_mailbox(mailbox)), + Some(handle), + )); } // Compactor-only node: connect to the planner on a remote janitor. let balance_channel = balance_channel_for_service(cluster, QuickwitService::Janitor).await; @@ -303,21 +312,14 @@ async fn get_compaction_planner_client_if_needed( bail!("compactor is enabled but no janitor node was found in the cluster") } info!("remote compaction planner detected on janitor node"); - Ok(Some(CompactionPlannerServiceClient::from_balance_channel( - balance_channel, - node_config.grpc_config.max_message_size, + Ok(( + Some(CompactionPlannerServiceClient::from_balance_channel( + balance_channel, + node_config.grpc_config.max_message_size, + None, + )), None, - ))) -} - -fn spawn_merge_scheduler_service( - universe: &Universe, - node_config: &NodeConfig, -) -> Mailbox { - let (mailbox, _) = universe.spawn_builder().spawn(MergeSchedulerService::new( - node_config.indexer_config.merge_concurrency.get(), - )); - mailbox + )) } async fn start_ingest_client_if_needed( @@ -598,22 +600,37 @@ pub async fn serve_quickwit( .await .context("failed to start ingest v1 service")?; - let compaction_service_client_opt = get_compaction_planner_client_if_needed( - &node_config, - &cluster, - &universe, - &metastore_client, - ) - .await - .context("failed to initialize compaction service client")?; + let (compaction_service_client_opt, compaction_planner_handle_opt) = + get_compaction_planner_client_if_needed( + &node_config, + &cluster, + &universe, + &metastore_client, + ) + .await + .context("failed to initialize compaction service client")?; + + // Build the indexing split cache once and share it between the indexing + // service and the compactor supervisor (when both run on the same node). + // A zero quota in `IndexerConfig` produces a no-op cache. + let indexing_split_cache_opt: Option> = if node_config + .is_service_enabled(QuickwitService::Indexer) + || node_config.is_service_enabled(QuickwitService::Compactor) + { + let cache = IndexingSplitCache::from_config( + &node_config.indexer_config, + &node_config.data_dir_path, + ) + .await?; + Some(Arc::new(cache)) + } else { + None + }; let indexing_service_opt = if node_config.is_service_enabled(QuickwitService::Indexer) { - let merge_scheduler_mailbox_opt = - if !node_config.indexer_config.enable_standalone_compactors { - Some(spawn_merge_scheduler_service(&universe, &node_config)) - } else { - None - }; + let split_cache = indexing_split_cache_opt + .clone() + .expect("indexing split cache must exist on indexer nodes"); let indexing_service = start_indexing_service( &universe, &node_config, @@ -623,7 +640,7 @@ pub async fn serve_quickwit( ingester_pool.clone(), storage_resolver.clone(), event_broker.clone(), - merge_scheduler_mailbox_opt, + split_cache, ) .await .context("failed to start indexing service")?; @@ -773,6 +790,8 @@ pub async fn serve_quickwit( }; let janitor_service_opt = if node_config.is_service_enabled(QuickwitService::Janitor) { + let compaction_planner_handle = compaction_planner_handle_opt + .expect("compaction planner handle must exist on janitor nodes"); let janitor_service = start_janitor_service( &universe, &node_config, @@ -781,6 +800,7 @@ pub async fn serve_quickwit( storage_resolver.clone(), event_broker.clone(), !get_bool_from_env(DISABLE_DELETE_TASK_SERVICE_ENV_KEY, false), + compaction_planner_handle, ) .await .context("failed to start janitor service")?; @@ -795,18 +815,20 @@ pub async fn serve_quickwit( let compaction_root_directory = quickwit_common::temp_dir::Builder::default() .tempdir_in(&compaction_dir) .context("failed to create compaction temp directory")?; - let split_cache = Arc::new(quickwit_indexing::IndexingSplitCache::no_caching()); let compaction_client = compaction_service_client_opt .clone() .expect("compactor service enabled but no compaction client available"); + let split_cache = indexing_split_cache_opt + .clone() + .expect("indexing split cache must exist on compactor nodes"); let compactor_mailbox = start_compactor_service( &universe, cluster.self_node_id().into(), compaction_client, &node_config.compactor_config, - split_cache, metastore_client.clone(), storage_resolver.clone(), + split_cache, event_broker.clone(), compaction_root_directory, ) @@ -1930,11 +1952,12 @@ mod tests { let mut node_config = NodeConfig::for_test(); node_config.enabled_services = HashSet::from([QuickwitService::Janitor, QuickwitService::Indexer]); - let result = + let (client_opt, handle_opt) = get_compaction_planner_client_if_needed(&node_config, &cluster, &universe, &metastore) .await .unwrap(); - assert!(result.is_some()); + assert!(client_opt.is_some()); + assert!(handle_opt.is_some()); // With compactor + janitor enabled, planner client is also returned. node_config.enabled_services = HashSet::from([ @@ -1942,19 +1965,21 @@ mod tests { QuickwitService::Indexer, QuickwitService::Compactor, ]); - let result = + let (client_opt, handle_opt) = get_compaction_planner_client_if_needed(&node_config, &cluster, &universe, &metastore) .await .unwrap(); - assert!(result.is_some()); + assert!(client_opt.is_some()); + assert!(handle_opt.is_some()); - // Neither janitor nor compactor: no client. + // Neither janitor nor compactor: no client, no handle. node_config.enabled_services = HashSet::from([QuickwitService::Indexer]); - let result = + let (client_opt, handle_opt) = get_compaction_planner_client_if_needed(&node_config, &cluster, &universe, &metastore) .await .unwrap(); - assert!(result.is_none()); + assert!(client_opt.is_none()); + assert!(handle_opt.is_none()); universe.assert_quit().await; }