diff --git a/src/cli.rs b/src/cli.rs index 969c3a619..07ae2e843 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -314,6 +314,50 @@ pub struct Options { )] pub hot_tier_storage_path: Option, + #[arg( + long = "hot-tier-download-chunk-size", + env = "P_HOT_TIER_DOWNLOAD_CHUNK_SIZE", + value_parser = clap::value_parser!(u64).range(5242880..), + default_value = "8388608", + help = "Chunk size in bytes for parallel hot tier downloads (default 8 MiB)" + )] + pub hot_tier_download_chunk_size: u64, + + #[arg( + long = "hot-tier-download-concurrency", + env = "P_HOT_TIER_DOWNLOAD_CONCURRENCY", + value_parser = clap::value_parser!(u64).range(1..), + default_value = "16", + help = "Number of concurrent range requests per hot tier download" + )] + pub hot_tier_download_concurrency: u64, + + #[arg( + long = "hot-tier-files-per-stream-concurrency", + env = "P_HOT_TIER_FILES_PER_STREAM_CONCURRENCY", + default_value = "4", + help = "Number of concurrent parquet file downloads per stream during hot tier sync" + )] + pub hot_tier_files_per_stream_concurrency: usize, + + #[arg( + long = "hot-tier-latest-minutes", + env = "P_HOT_TIER_LATEST_MINUTES", + value_parser = clap::value_parser!(u64).range(1..), + default_value = "10", + help = "Files whose timestamp is within the last N minutes are 'latest'; rest are 'historic'." + )] + pub hot_tier_latest_minutes: u64, + + #[arg( + long = "hot-tier-historic-sync-minutes", + env = "P_HOT_TIER_HISTORIC_SYNC_MINUTES", + value_parser = clap::value_parser!(u32).range(1..), + default_value = "5", + help = "Interval (minutes) at which the historic hot-tier sync runs." + )] + pub hot_tier_historic_sync_minutes: u32, + //TODO: remove this when smart cache is implemented #[arg( long = "index-storage-path", diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 86f83329c..4c38ffee2 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -413,6 +413,11 @@ pub async fn get_stream_info( Ok((web::Json(stream_info), StatusCode::OK)) } +#[tracing::instrument( + name = "http.put_stream_hot_tier", + skip(req, logstream, hottier), + fields(stream = tracing::field::Empty, tenant = tracing::field::Empty, size = hottier.size) +)] pub async fn put_stream_hot_tier( req: HttpRequest, logstream: Path, @@ -420,6 +425,9 @@ pub async fn put_stream_hot_tier( ) -> Result { let stream_name = logstream.into_inner(); let tenant_id = get_tenant_id_from_request(&req); + tracing::Span::current() + .record("stream", tracing::field::display(&stream_name)) + .record("tenant", tracing::field::debug(&tenant_id)); // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage @@ -469,19 +477,29 @@ pub async fn put_stream_hot_tier( .metastore .put_stream_json(&stream_metadata, &stream_name, &tenant_id) .await?; - + hot_tier_manager + .spawn_stream_tasks(stream_name.clone(), tenant_id.clone()) + .await; Ok(( format!("hot tier set for stream {stream_name}"), StatusCode::OK, )) } +#[tracing::instrument( + name = "http.get_stream_hot_tier", + skip(req, logstream), + fields(stream = tracing::field::Empty, tenant = tracing::field::Empty) +)] pub async fn get_stream_hot_tier( req: HttpRequest, logstream: Path, ) -> Result { let stream_name = logstream.into_inner(); let tenant_id = get_tenant_id_from_request(&req); + tracing::Span::current() + .record("stream", tracing::field::display(&stream_name)) + .record("tenant", tracing::field::debug(&tenant_id)); // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage @@ -502,12 +520,20 @@ pub async fn get_stream_hot_tier( Ok((web::Json(meta), StatusCode::OK)) } +#[tracing::instrument( + name = "http.delete_stream_hot_tier", + skip(req, logstream), + fields(stream = tracing::field::Empty, tenant = tracing::field::Empty) +)] pub async fn delete_stream_hot_tier( req: HttpRequest, logstream: Path, ) -> Result { let stream_name = logstream.into_inner(); let tenant_id = get_tenant_id_from_request(&req); + tracing::Span::current() + .record("stream", tracing::field::display(&stream_name)) + .record("tenant", tracing::field::debug(&tenant_id)); // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 77622aca5..d5cac5049 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -160,6 +160,11 @@ pub trait ParseableServer { // Shutdown resource monitor let _ = resource_shutdown_tx.send(()); + // Shutdown hottier + if let Some(ht_global) = HotTierManager::global() { + ht_global.abort_all().await; + } + // Initiate graceful shutdown info!("Graceful shutdown of HTTP server triggered"); srv_handle.stop(true).await; @@ -626,6 +631,7 @@ pub type PrismMetadata = NodeMetadata; /// Initialize hot tier metadata files for streams that have hot tier configuration /// in their stream metadata but don't have local hot tier metadata files yet. /// This function is called once during query server startup. +#[tracing::instrument(name = "hottier.init_metadata_startup", skip(hot_tier_manager))] pub async fn initialize_hot_tier_metadata_on_startup( hot_tier_manager: &HotTierManager, ) -> anyhow::Result<()> { diff --git a/src/hottier.rs b/src/hottier.rs index c472f91c5..8eba302fa 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -16,11 +16,14 @@ * */ +use datafusion::common::HashSet; use std::{ - collections::BTreeMap, + collections::HashMap, io, path::{Path, PathBuf}, + sync::Arc, }; +use tokio::sync::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use crate::{ catalog::manifest::{File, Manifest}, @@ -31,8 +34,7 @@ use crate::{ utils::{extract_datetime, human_size::bytes_to_human_size}, validator::error::HotTierValidationError, }; -use chrono::NaiveDate; -use clokwerk::{AsyncScheduler, Interval, Job}; +use chrono::{DateTime, NaiveDate, Timelike, Utc}; use futures::{StreamExt, TryStreamExt, stream::FuturesUnordered}; use futures_util::TryFutureExt; use object_store::{ObjectStoreExt, local::LocalFileSystem}; @@ -42,13 +44,20 @@ use relative_path::RelativePathBuf; use std::time::Duration; use sysinfo::Disks; use tokio::fs::{self, DirEntry}; -use tokio::io::AsyncWriteExt; use tokio_stream::wrappers::ReadDirStream; -use tracing::{error, warn}; +use tracing::{Instrument, error, info}; + +/// Floor a timestamp to the start of its minute (seconds + sub-second zeroed). +/// Used to produce a stable per-tick anchor so all spans within one tick share +/// the same cutoff value. +fn floor_to_minute(ts: DateTime) -> DateTime { + ts.with_second(0) + .and_then(|t| t.with_nanosecond(0)) + .unwrap_or(ts) +} pub const STREAM_HOT_TIER_FILENAME: &str = ".hot_tier.json"; pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB -const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1); pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10485760; //10 MiB pub const CURRENT_HOT_TIER_VERSION: &str = "v2"; @@ -65,9 +74,33 @@ pub struct StreamHotTier { pub oldest_date_time_entry: Option, } +/// Per-stream in-memory bookkeeping. Mutex protects concurrent reservation, +/// commit, and per-date manifest writes. Downloads run outside the lock. +struct StreamSyncState { + sht: AsyncMutex, +} + +/// Hot-tier sync runs in two phases. Latest pulls files newer than +/// `hot_tier_latest_minutes` ago and may evict historic to make room. +/// Historic pulls older files, runs less often, never triggers eviction. +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +enum SyncPhase { + Latest, + Historic, +} + +type StreamKey = (Option, String); + +struct StreamTasks { + latest: tokio::task::JoinHandle<()>, + historic: tokio::task::JoinHandle<()>, +} + pub struct HotTierManager { filesystem: LocalFileSystem, hot_tier_path: &'static Path, + state_cache: AsyncRwLock>>, + tasks: AsyncRwLock>, } impl HotTierManager { @@ -76,7 +109,287 @@ impl HotTierManager { HotTierManager { filesystem: LocalFileSystem::new(), hot_tier_path, + state_cache: AsyncRwLock::new(HashMap::new()), + tasks: AsyncRwLock::new(HashMap::new()), + } + } + + /// Lazy-load and cache the `StreamHotTier` for a (tenant, stream) pair. + /// All sync-path mutations should acquire `state.sht.lock()`. + async fn get_or_load_state( + &self, + stream: &str, + tenant_id: &Option, + ) -> Result, HotTierError> { + let key: StreamKey = (tenant_id.clone(), stream.to_owned()); + if let Some(state) = self.state_cache.read().await.get(&key).cloned() { + return Ok(state); + } + // key not present, reconcile + let sht = self.reconcile_stream(stream, tenant_id).await?; + let state = Arc::new(StreamSyncState { + sht: AsyncMutex::new(sht), + }); + + let mut cache = self.state_cache.write().await; + if cache.insert(key, state.clone()).is_some() { + tracing::warn!( + "Key- {:?} was absent during read lock but already exists after reconcile!", + (tenant_id, stream), + ); + }; + Ok(state) + } + + /// Drop cached state for a stream (used after delete). + pub async fn invalidate_state(&self, stream: &str, tenant_id: &Option) { + let key: StreamKey = (tenant_id.clone(), stream.to_owned()); + self.state_cache.write().await.remove(&key); + } + + /// Walk the on-disk hot-tier directory for a stream and bring it into + /// agreement with `hottier.manifest.json` files. Removes `.partial` + /// orphans, drops manifest entries whose files are missing or wrong size, + /// deletes parquet files that exist but are not in their date manifest, + /// then recomputes `used_size` / `available_size` from the cleaned + /// manifests and persists the updated `StreamHotTier`. + #[tracing::instrument( + name = "hottier.reconcile_stream", + skip(self), + fields(stream = %stream, tenant = ?tenant_id), + err + )] + async fn reconcile_stream( + &self, + stream: &str, + tenant_id: &Option, + ) -> Result { + info!(stream = %stream, tenant = ?tenant_id, "reconcile starting"); + let mut sht = self.get_hot_tier(stream, tenant_id).await?; + let dates = self.fetch_hot_tier_dates(stream, tenant_id).await?; + let mut total_used: u64 = 0; + let mut partials_removed = 0usize; + let mut entries_dropped = 0usize; + let mut orphans_removed = 0usize; + + for date in dates { + let date_dir = self.get_stream_path_for_date(stream, &date, tenant_id); + if !date_dir.exists() { + continue; + } + + let mut on_disk: HashSet = HashSet::new(); + + // Pass 1: collect on-disk parquet files (drop .partial orphans). + self.drop_partials( + &mut on_disk, + &date_dir, + &mut partials_removed, + stream, + tenant_id, + ) + .await?; + + // Pass 2: clean manifest of stale entries. + let mut keep_names: HashSet = HashSet::new(); + self.clean_manifest( + &mut keep_names, + &date_dir, + &mut total_used, + &mut entries_dropped, + stream, + tenant_id, + ) + .await?; + + // Pass 3: delete on-disk parquet files not referenced by the cleaned manifest. + for name in on_disk.difference(&keep_names) { + let p = date_dir.join(name); + let _ = fs::remove_file(&p).await; + orphans_removed += 1; + info!( + stream = %stream, + tenant = ?tenant_id, + file = %p.display(), + "reconcile: deleted orphan parquet not in manifest" + ); + } + } + + sht.used_size = total_used; + sht.available_size = sht.size.saturating_sub(total_used); + self.put_hot_tier(stream, &mut sht, tenant_id).await?; + info!( + stream = %stream, + tenant = ?tenant_id, + partials_removed, + entries_dropped, + orphans_removed, + used = sht.used_size, + available = sht.available_size, + "reconcile done" + ); + Ok(sht) + } + + #[tracing::instrument( + name = "hottier.drop_partials", + skip(self, on_disk, partials_removed), + fields(stream = %stream, tenant = ?tenant_id, date_dir = %date_dir.display()) + )] + async fn drop_partials( + &self, + on_disk: &mut HashSet, + date_dir: &PathBuf, + partials_removed: &mut usize, + stream: &str, + tenant_id: &Option, + ) -> Result<(), HotTierError> { + let mut stack: Vec = vec![date_dir.clone()]; + while let Some(dir) = stack.pop() { + let mut entries = fs::read_dir(&dir).await.map_err(|e| { + error!( + stream = %stream, + tenant = ?tenant_id, + dir = ?dir, + error = ?e + ); + e + })?; + while let Some(entry) = entries.next_entry().await.map_err(|e| { + error!( + stream = %stream, + tenant = ?tenant_id, + error = ?e + ); + e + })? { + let p = entry.path(); + let ft = entry.file_type().await.map_err(|e| { + error!( + stream = %stream, + tenant = ?tenant_id, + entry = ?entry, + error = ?e + ); + e + })?; + if ft.is_dir() { + stack.push(p); + continue; + } + let Some(name_os) = p.file_name() else { + continue; + }; + let name = name_os.to_string_lossy(); + if name.ends_with(".partial") { + let _ = fs::remove_file(&p).await; + *partials_removed += 1; + info!( + stream = %stream, + tenant = ?tenant_id, + path = %p.display(), + "reconcile: deleted partial orphan" + ); + continue; + } + if name.ends_with(".manifest.json") { + continue; + } + if !ft.is_file() { + continue; + } + if let Ok(rel) = p.strip_prefix(date_dir) { + on_disk.insert(rel.to_string_lossy().into_owned()); + } + } } + Ok(()) + } + + #[tracing::instrument( + name = "hottier.clean_manifest", + skip(self, keep_names, total_used, entries_dropped), + fields(stream = %stream, tenant = ?tenant_id, date_dir = %date_dir.display()) + )] + async fn clean_manifest( + &self, + keep_names: &mut HashSet, + date_dir: &PathBuf, + total_used: &mut u64, + entries_dropped: &mut usize, + stream: &str, + tenant_id: &Option, + ) -> Result<(), HotTierError> { + let manifest_path = date_dir.join("hottier.manifest.json"); + let mut manifest: Manifest = if manifest_path.exists() { + let bytes = fs::read(&manifest_path).await.map_err(|e| { + error!( + stream = %stream, + tenant = ?tenant_id, + manifest_path = ?manifest_path, + error = ?e + ); + e + })?; + serde_json::from_slice(&bytes).unwrap_or_default() + } else { + Manifest::default() + }; + + let mut kept = Vec::with_capacity(manifest.files.len()); + for f in manifest.files.drain(..) { + let local = self.hot_tier_path.join(&f.file_path); + let ok = match fs::metadata(&local).await { + Ok(m) => m.len() == f.file_size, + Err(_) => false, + }; + if ok { + if let Ok(rel) = local.strip_prefix(date_dir) { + keep_names.insert(rel.to_string_lossy().into_owned()); + } + *total_used += f.file_size; + kept.push(f); + } else { + let _ = fs::remove_file(&local).await; + *entries_dropped += 1; + info!( + stream = %stream, + tenant = ?tenant_id, + file = %f.file_path, + "reconcile: dropped manifest entry (file missing or wrong size)" + ); + } + } + kept.sort_by_key(|f| f.file_path.clone()); + manifest.files = kept; + + if manifest_path.exists() || !manifest.files.is_empty() { + fs::create_dir_all(&date_dir).await?; + fs::write( + &manifest_path, + serde_json::to_vec(&manifest).map_err(|e| { + error!( + stream = %stream, + tenant = ?tenant_id, + mainfest_path = ?manifest_path, + error = ?e + ); + e + })?, + ) + .await + .map_err(|e| { + error!( + stream = %stream, + tenant = ?tenant_id, + manifest_path = ?manifest_path, + error = ?e + ); + e + })?; + } + Ok(()) } /// Get a global @@ -91,6 +404,12 @@ impl HotTierManager { } /// get the total hot tier size for all streams + #[tracing::instrument( + name = "hottier.get_hot_tiers_size", + skip(self), + fields(current_stream = %current_stream, current_tenant = ?current_tenant_id), + err + )] pub async fn get_hot_tiers_size( &self, current_stream: &str, @@ -122,6 +441,12 @@ impl HotTierManager { /// check disk usage and hot tier size of all other streams /// check if total hot tier size of all streams is less than max disk usage /// delete all the files from hot tier once validation is successful and hot tier is ready to be updated + #[tracing::instrument( + name = "hottier.validate_size", + skip(self), + fields(stream = %stream, tenant = ?tenant_id, size = stream_hot_tier_size), + err + )] pub async fn validate_hot_tier_size( &self, stream: &str, @@ -184,6 +509,12 @@ impl HotTierManager { } /// get the hot tier metadata file for the stream + #[tracing::instrument( + name = "hottier.get_hot_tier", + skip(self), + fields(stream = %stream, tenant = ?tenant_id), + err + )] pub async fn get_hot_tier( &self, stream: &str, @@ -206,6 +537,12 @@ impl HotTierManager { Ok(stream_hot_tier) } + #[tracing::instrument( + name = "hottier.delete_hot_tier", + skip(self), + fields(stream = %stream, tenant = ?tenant_id), + err + )] pub async fn delete_hot_tier( &self, stream: &str, @@ -214,18 +551,35 @@ impl HotTierManager { if !self.check_stream_hot_tier_exists(stream, tenant_id) { return Err(HotTierValidationError::NotFound(stream.to_owned()).into()); } + // Stop loops before tearing down the directory so no in-flight tick + // re-creates files mid-delete. + self.abort_stream_tasks(stream, tenant_id).await; let path = if let Some(tenant_id) = tenant_id.as_ref() { self.hot_tier_path.join(tenant_id).join(stream) } else { self.hot_tier_path.join(stream) }; - fs::remove_dir_all(path).await?; + fs::remove_dir_all(path).await.map_err(|e| { + error!( + stream = %stream, + tenant = ?tenant_id, + error = ?e + ); + e + })?; + self.invalidate_state(stream, tenant_id).await; Ok(()) } /// put the hot tier metadata file for the stream /// set the updated_date_range in the hot tier metadata file + #[tracing::instrument( + name = "hottier.put_hot_tier", + skip(self, hot_tier), + fields(stream = %stream, tenant = ?tenant_id, size = hot_tier.size), + err + )] pub async fn put_hot_tier( &self, stream: &str, @@ -259,216 +613,656 @@ impl HotTierManager { Ok(path) } - /// schedule the download of the hot tier files from S3 every minute + #[tracing::instrument(name = "hottier.abort", skip(self))] + pub async fn abort_all(&self) { + let guard = self.tasks.write().await; + for (streamkey, task) in guard.iter() { + task.latest.abort(); + task.historic.abort(); + info!("aborted hot tier tasks for- {streamkey:?}"); + } + } + + /// Discover hot-tier-enabled streams at boot and spawn a per-stream pair + /// of (Latest, Historic) loops for each. New streams added later acquire + /// their own loops via `spawn_stream_tasks` from the PUT hot-tier handler. + #[tracing::instrument(name = "hottier.startup", skip(self), err)] pub fn download_from_s3<'a>(&'a self) -> Result<(), HotTierError> where 'a: 'static, { - let mut scheduler = AsyncScheduler::new(); - scheduler - .every(HOT_TIER_SYNC_DURATION) - .plus(Interval::Seconds(5)) - .run(move || async { - if let Err(err) = self.sync_hot_tier().await { - error!("Error in hot tier scheduler: {:?}", err); + let latest_min = PARSEABLE.options.hot_tier_latest_minutes; + let historic_min = PARSEABLE.options.hot_tier_historic_sync_minutes; + info!( + latest_minutes = latest_min, + historic_sync_minutes = historic_min, + "hot tier scheduler starting" + ); + + let this: &'static HotTierManager = self; + let startup_span = tracing::info_span!("hottier.startup.bootstrap"); + let span = startup_span.clone(); + tokio::spawn( + async move { + // pstats hot tier may need to be created on boot before any tasks + // can pick it up. + if let Err(e) = this.create_pstats_hot_tier().await { + tracing::error!("Skipping pstats hot tier creation because of error: {e}"); + } + let tenants = if let Some(tenants) = PARSEABLE.list_tenants() { + tenants.into_iter().map(Some).collect::>() + } else { + vec![None] + }; + for tenant_id in tenants { + for stream in PARSEABLE.streams.list(&tenant_id) { + if this.check_stream_hot_tier_exists(&stream, &tenant_id) { + let tenant_id = tenant_id.clone(); + + tokio::spawn(async move { + this.spawn_stream_tasks(stream, tenant_id).await; + }.instrument(span.clone())); + tokio::time::sleep(Duration::from_secs(8)).await; + } else { + // check for potential orphan directory on disk + let path = if let Some(tenant_id) = tenant_id.as_ref() { + self.hot_tier_path.join(tenant_id).join(stream) + } else { + self.hot_tier_path.join(stream) + }; + if path.exists() { + // delete this entire folder as stream meta says no hottier for stream + if let Err(e) = fs::remove_dir_all(&path).await { + tracing::error!( + "Unable to remove orphaned hottier dir- `{path:?}` with error- {e}" + ); + }; + } + } + } } - }); - - tokio::spawn(async move { - loop { - scheduler.run_pending().await; - tokio::time::sleep(Duration::from_secs(10)).await; } - }); + .instrument(startup_span.clone()), + ); Ok(()) } - /// sync the hot tier files from S3 to the hot tier directory for all streams - async fn sync_hot_tier(&self) -> Result<(), HotTierError> { - // Before syncing, check if pstats stream was created and needs hot tier - if let Err(e) = self.create_pstats_hot_tier().await { - tracing::trace!("Skipping pstats hot tier creation because of error: {e}"); + /// Spawn (Latest, Historic) loops for a single stream. Idempotent: + /// if tasks already exist for this (tenant, stream), no-op. + #[tracing::instrument( + name = "hottier.spawn_stream_tasks", + skip(self), + fields(stream = %stream, tenant = ?tenant_id) + )] + pub async fn spawn_stream_tasks(&'static self, stream: String, tenant_id: Option) { + let key: StreamKey = (tenant_id.clone(), stream.clone()); + { + let tasks = self.tasks.read().await; + if let Some(existing) = tasks.get(&key) + && !existing.latest.is_finished() + && !existing.historic.is_finished() + { + return; + } } - let mut sync_hot_tier_tasks = FuturesUnordered::new(); - let tenants = if let Some(tenants) = PARSEABLE.list_tenants() { - tenants.into_iter().map(Some).collect() - } else { - vec![None] - }; - for tenant_id in tenants { - for stream in PARSEABLE.streams.list(&tenant_id) { - if self.check_stream_hot_tier_exists(&stream, &tenant_id) { - sync_hot_tier_tasks.push(self.process_stream(stream, tenant_id.to_owned())); + let latest_interval = Duration::from_secs(60); + let historic_interval = + Duration::from_secs(PARSEABLE.options.hot_tier_historic_sync_minutes as u64 * 60); + + info!(stream = %stream, tenant = ?tenant_id, "spawning per-stream hot tier tasks"); + + let s = stream.clone(); + let t = tenant_id.clone(); + let latest = tokio::spawn(async move { + loop { + let anchor = floor_to_minute(Utc::now()); + let tick_span = tracing::info_span!( + "hottier.tick", + stream = %s, + tenant = ?t, + phase = "latest", + anchor = %anchor + ); + async { + info!("stream tick fired"); + if let Err(err) = self + .process_stream(s.clone(), t.clone(), SyncPhase::Latest, anchor) + .await + { + error!("latest sync error: {err:?}"); + } } + .instrument(tick_span) + .await; + tokio::time::sleep(latest_interval).await; } - } + }); - while let Some(res) = sync_hot_tier_tasks.next().await { - if let Err(err) = res { - error!("Failed to run hot tier sync task {err:?}"); - return Err(err); + let s = stream.clone(); + let t = tenant_id.clone(); + let historic = tokio::spawn(async move { + loop { + let anchor = floor_to_minute(Utc::now()); + let tick_span = tracing::info_span!( + "hottier.tick", + stream = %s, + tenant = ?t, + phase = "historic", + anchor = %anchor + ); + async { + info!("stream tick fired"); + if let Err(err) = self + .process_stream(s.clone(), t.clone(), SyncPhase::Historic, anchor) + .await + { + error!("historic sync error: {err:?}"); + } + } + .instrument(tick_span) + .await; + tokio::time::sleep(historic_interval).await; } + }); + + let mut tasks = self.tasks.write().await; + if let Some(old) = tasks.insert(key, StreamTasks { latest, historic }) { + old.latest.abort(); + old.historic.abort(); + } + } + + /// Abort and remove per-stream tasks. Caller must ensure no further work + /// will be enqueued for the stream after this returns. + async fn abort_stream_tasks(&self, stream: &str, tenant_id: &Option) { + let key: StreamKey = (tenant_id.clone(), stream.to_owned()); + if let Some(t) = self.tasks.write().await.remove(&key) { + t.latest.abort(); + t.historic.abort(); + info!(stream = %stream, tenant = ?tenant_id, "aborted per-stream hot tier tasks"); } - Ok(()) } /// process the hot tier files for the stream /// delete the files from the hot tier directory if the available date range is outside the hot tier range + #[tracing::instrument( + name = "hottier.process_stream", + skip(self), + fields(stream = %stream, tenant = ?tenant_id, phase = ?phase, anchor = %anchor), + err + )] async fn process_stream( &self, stream: String, tenant_id: Option, + phase: SyncPhase, + anchor: DateTime, ) -> Result<(), HotTierError> { - let stream_hot_tier = self.get_hot_tier(&stream, &tenant_id).await?; - let mut parquet_file_size = stream_hot_tier.used_size; - - let mut s3_manifest_file_list = PARSEABLE - .metastore - .get_all_manifest_files(&stream, &tenant_id) + let stream_start = std::time::Instant::now(); + self.process_manifest(&stream, &tenant_id, phase, anchor) .await .map_err(|e| { - HotTierError::ObjectStorageError(ObjectStorageError::MetastoreError(Box::new( - e.to_detail(), - ))) + error!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + error = ?e + ); + e })?; - self.process_manifest( - &stream, - &mut s3_manifest_file_list, - &mut parquet_file_size, - &tenant_id, - ) - .await?; - + info!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + elapsed_ms = stream_start.elapsed().as_millis() as u64, + "stream sync done" + ); Ok(()) } - /// process the hot tier files for the date for the stream - /// collect all manifests from metastore for the date, sort the parquet file list - /// in order to download the latest files first - /// download the parquet files if not present in hot tier directory + /// process the hot tier files for the stream + /// Determine the candidate dates for the current phase, fetch only those + /// manifests from the metastore, build a work list sorted newest-first by + /// file timestamp, then download via the existing reserve/commit flow. + #[tracing::instrument( + name = "hottier.process_manifest", + skip(self), + fields( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + anchor = %anchor, + candidate_dates = tracing::field::Empty, + work_count = tracing::field::Empty, + total_bytes = tracing::field::Empty, + ), + err + )] async fn process_manifest( &self, stream: &str, - manifest_files_to_download: &mut BTreeMap>, - parquet_file_size: &mut u64, tenant_id: &Option, + phase: SyncPhase, + anchor: DateTime, ) -> Result<(), HotTierError> { - if manifest_files_to_download.is_empty() { - return Ok(()); - } - for (str_date, manifest_files) in manifest_files_to_download.iter().rev() { - let mut storage_combined_manifest = Manifest::default(); + let latest_minutes = PARSEABLE.options.hot_tier_latest_minutes; + let latest_window = chrono::Duration::minutes(latest_minutes.try_into().unwrap()); + let historic_cutoff = anchor - latest_window; + let historic_cutoff_naive = historic_cutoff.naive_utc(); + let today_date_key = format!("date={}", anchor.date_naive()); + + // Determine which date keys to fetch from the metastore this tick. + let candidate_dates: Vec = match phase { + SyncPhase::Latest => { + // Dates covered by [historic_cutoff, anchor]. Usually just today, + // or today + yesterday if window crosses midnight. + let start = historic_cutoff.date_naive(); + let end = anchor.date_naive(); + let mut out = Vec::new(); + let mut d = start; + while d <= end { + out.push(format!("date={d}")); + d = d.succ_opt().unwrap_or(d); + if out.len() > 365 { + break; + } + } + out + } + SyncPhase::Historic => { + let local = self + .fetch_hot_tier_dates(stream, tenant_id) + .await + .unwrap_or_default() + .into_iter() + .map(|d| format!("date={d}")) + .collect::>(); + let s3 = PARSEABLE + .storage() + .get_object_store() + .list_dates(stream, tenant_id) + .await + .unwrap_or_default(); - for storage_manifest in manifest_files { - storage_combined_manifest - .files - .extend(storage_manifest.files.clone()); + let mut union: std::collections::BTreeSet = local.into_iter().collect(); + union.extend(s3); + + let mut out = Vec::new(); + for date_key in union { + // drop today and anything >= today (Latest handles those) + if date_key.as_str() >= today_date_key.as_str() { + continue; + } + if self.is_locally_complete(stream, &date_key, tenant_id).await { + continue; + } + out.push(date_key); + } + // Newest-first: discover newest missing past date first. + out.sort(); + out.reverse(); + out } + }; + tracing::Span::current().record("candidate_dates", candidate_dates.len()); + + if candidate_dates.is_empty() { + info!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + "no candidate dates this tick" + ); + return Ok(()); + } - storage_combined_manifest - .files - .sort_by_key(|file| file.file_path.clone()); + let s3_manifests = PARSEABLE + .metastore + .get_manifest_files_for_dates(stream, tenant_id, &candidate_dates) + .await + .map_err(|e| { + error!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + error = ?e, + "manifest fetch failed" + ); + HotTierError::ObjectStorageError(ObjectStorageError::MetastoreError(Box::new( + e.to_detail(), + ))) + })?; - while let Some(parquet_file) = storage_combined_manifest.files.pop() { - let parquet_file_path = &parquet_file.file_path; - let parquet_path = self.hot_tier_path.join(parquet_file_path); + // Build flat work list across all candidate dates: keep only files + // matching this phase's cutoff and not already on disk. + let mut work: Vec<(NaiveDate, chrono::NaiveDateTime, File, PathBuf)> = Vec::new(); + for (str_date, manifest_files) in s3_manifests.iter() { + let date = + match NaiveDate::parse_from_str(str_date.trim_start_matches("date="), "%Y-%m-%d") { + Ok(d) => d, + Err(_) => { + info!("Invalid date format: {}", str_date); + continue; + } + }; - if !parquet_path.exists() { - if let Ok(date) = - NaiveDate::parse_from_str(str_date.trim_start_matches("date="), "%Y-%m-%d") - { - if !self - .process_parquet_file( - stream, - &parquet_file, - parquet_file_size, - parquet_path, - date, - tenant_id, - ) - .await? - { - break; - } - } else { - warn!("Invalid date format: {}", str_date); + for storage_manifest in manifest_files { + for parquet_file in &storage_manifest.files { + let parquet_path = self.hot_tier_path.join(&parquet_file.file_path); + if parquet_path.exists() { + continue; + } + let dt = match extract_datetime(&parquet_file.file_path) { + Some(d) => d, + None => continue, + }; + let is_latest = dt >= historic_cutoff_naive; + let keep = match phase { + SyncPhase::Latest => is_latest, + SyncPhase::Historic => !is_latest, + }; + if keep { + work.push((date, dt, parquet_file.clone(), parquet_path)); } } } } + // Newest first by file timestamp. + work.sort_by_key(|b| std::cmp::Reverse(b.1)); + + let work_count = work.len(); + let total_bytes: u64 = work.iter().map(|(_, _, f, _)| f.file_size).sum(); + tracing::Span::current() + .record("work_count", work_count) + .record("total_bytes", total_bytes); + if work.is_empty() { + info!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + "no files to download this tick" + ); + return Ok(()); + } + info!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + work_count, + total_bytes, + "work list built" + ); + + let state = self.get_or_load_state(stream, tenant_id).await?; + let concurrency = PARSEABLE.options.hot_tier_files_per_stream_concurrency; + + // Reservation failure (out of disk + nothing to evict) is sticky: + // once one file can't be placed, no subsequent file will fit either. + let stop = Arc::new(std::sync::atomic::AtomicBool::new(false)); + + let stream_owned = stream.to_owned(); + let tenant_owned = tenant_id.clone(); + + let results: Vec> = futures::stream::iter(work) + .map(|(date, _dt, file, parquet_path)| { + let state = state.clone(); + let stream = stream_owned.clone(); + let tenant_id = tenant_owned.clone(); + let stop = stop.clone(); + async move { + if stop.load(std::sync::atomic::Ordering::Relaxed) { + return Ok(()); + } + let processed = self + .process_parquet_file_concurrent( + &stream, + &file, + parquet_path, + date, + &tenant_id, + &state, + phase, + ) + .await?; + if !processed && !stop.swap(true, std::sync::atomic::Ordering::Relaxed) { + info!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + "sticky stop: halting further reservations this tick" + ); + } + Ok(()) + } + }) + .buffer_unordered(concurrency) + .collect() + .await; + + for r in results { + r?; + } + Ok(()) } - /// process the parquet file for the stream - /// check if the disk is available to download the parquet file - /// if not available, delete the oldest entry from the hot tier directory - /// download the parquet file from S3 to the hot tier directory - /// update the used and available size in the hot tier metadata - /// return true if the parquet file is processed successfully - async fn process_parquet_file( + /// Reserve disk budget under the per-stream lock, download outside the lock, + /// then commit usage + per-date manifest under the lock again. + /// Returns false when no budget is available (caller should stop scheduling + /// further work for this stream). + #[allow(clippy::too_many_arguments)] + #[tracing::instrument( + name = "hottier.process_parquet_file", + skip(self, parquet_file, parquet_path, state), + fields( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + date = %date, + file = %parquet_file.file_path, + file_size = parquet_file.file_size + ), + err + )] + async fn process_parquet_file_concurrent( &self, stream: &str, parquet_file: &File, - parquet_file_size: &mut u64, parquet_path: PathBuf, date: NaiveDate, tenant_id: &Option, + state: &Arc, + phase: SyncPhase, ) -> Result { - let mut file_processed = false; - let mut stream_hot_tier = self.get_hot_tier(stream, tenant_id).await?; - if !self.is_disk_available(parquet_file.file_size).await? - || stream_hot_tier.available_size <= parquet_file.file_size + // RESERVE { - if !self - .cleanup_hot_tier_old_data( - stream, - &mut stream_hot_tier, - &parquet_path, - parquet_file.file_size, - tenant_id, - ) - .await? + let mut sht = state.sht.lock().await; + info!( + stream = %stream, + tenant = ?tenant_id, + phase = ?phase, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + used = sht.used_size, + "reserving" + ); + if !self.is_disk_available(parquet_file.file_size).await? + || sht.available_size < parquet_file.file_size { - return Ok(file_processed); + match phase { + SyncPhase::Latest => { + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + "tight on space; triggering eviction" + ); + if !self + .cleanup_hot_tier_old_data( + stream, + &mut sht, + &parquet_path, + parquet_file.file_size, + tenant_id, + ) + .await? + { + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + "eviction freed nothing, skipping file" + ); + return Ok(false); + } + } + SyncPhase::Historic => { + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + "historic phase: full, skipping file" + ); + return Ok(false); + } + } + } + if sht.available_size < parquet_file.file_size { + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + "still no space after eviction, skipping" + ); + return Ok(false); } - *parquet_file_size = stream_hot_tier.used_size; + sht.available_size = + if let Some(val) = sht.available_size.checked_sub(parquet_file.file_size) { + val + } else { + tracing::error!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + available = sht.available_size, + "file_size > sht.available_size, setting available_size to 0 and moving on" + ); + 0 + }; + self.put_hot_tier(stream, &mut sht, tenant_id).await?; + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + deducted = parquet_file.file_size, + new_available = sht.available_size, + "reserved" + ); } + + // DOWNLOAD (no lock held) let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone()); fs::create_dir_all(parquet_path.parent().unwrap()).await?; - let mut file = fs::File::create(parquet_path.clone()).await?; - let parquet_data = PARSEABLE + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + "download starting" + ); + let dl_start = std::time::Instant::now(); + let download_result = PARSEABLE .storage .get_object_store() - .get_object(&parquet_file_path, tenant_id) - .await?; - file.write_all(&parquet_data).await?; - *parquet_file_size += parquet_file.file_size; - stream_hot_tier.used_size = *parquet_file_size; + .parallel_chunked_download(&parquet_file_path, tenant_id, parquet_path.clone()) + .await; + let dl_elapsed = dl_start.elapsed(); + + if let Err(e) = download_result { + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + elapsed_ms = dl_elapsed.as_millis() as u64, + err = %e, + "download failed, refunding reservation" + ); + // refund reservation + let mut sht = state.sht.lock().await; + sht.available_size += parquet_file.file_size; + if let Err(put_err) = self.put_hot_tier(stream, &mut sht, tenant_id).await { + error!("failed to persist refund after download failure: {put_err:?}"); + } + // backend already cleaned up its `.partial` file; final path was never created. + return Err(e.into()); + } + let elapsed_ms = dl_elapsed.as_millis() as u64; + let mbps = if dl_elapsed.as_secs_f64() > 0.0 { + (parquet_file.file_size as f64 * 8.0) / dl_elapsed.as_secs_f64() / 1_000_000.0 + } else { + 0.0 + }; + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + file_size = parquet_file.file_size, + elapsed_ms, + mbps = format!("{mbps:.1}"), + "download finished, committing" + ); + + // COMMIT + { + let mut sht = state.sht.lock().await; + sht.used_size += parquet_file.file_size; + self.put_hot_tier(stream, &mut sht, tenant_id).await?; - stream_hot_tier.available_size -= parquet_file.file_size; - self.put_hot_tier(stream, &mut stream_hot_tier, tenant_id) - .await?; - file_processed = true; - let path = self.get_stream_path_for_date(stream, &date, tenant_id); - let mut hot_tier_manifest = HotTierManager::get_hot_tier_manifest_from_path(path).await?; - hot_tier_manifest.files.push(parquet_file.clone()); - hot_tier_manifest - .files - .sort_by_key(|file| file.file_path.clone()); - // write the manifest file to the hot tier directory - let manifest_path = self - .get_stream_path_for_date(stream, &date, tenant_id) - .join("hottier.manifest.json"); - fs::create_dir_all(manifest_path.parent().unwrap()).await?; - fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?; - - Ok(file_processed) + let path = self.get_stream_path_for_date(stream, &date, tenant_id); + let mut hot_tier_manifest = + HotTierManager::get_hot_tier_manifest_from_path(path).await?; + hot_tier_manifest.files.push(parquet_file.clone()); + hot_tier_manifest + .files + .sort_by_key(|file| file.file_path.clone()); + // write the manifest file to the hot tier directory + let manifest_path = self + .get_stream_path_for_date(stream, &date, tenant_id) + .join("hottier.manifest.json"); + fs::create_dir_all(manifest_path.parent().unwrap()).await?; + fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?; + info!( + stream = %stream, + tenant = ?tenant_id, + file = %parquet_file.file_path, + used = sht.used_size, + available = sht.available_size, + "committed" + ); + } + + Ok(true) } ///fetch the list of dates available in the hot tier directory for the stream and sort them + #[tracing::instrument( + name = "hottier.fetch_dates", + skip(self), + fields(stream = %stream, tenant = ?tenant_id), + err + )] pub async fn fetch_hot_tier_dates( &self, stream: &str, @@ -551,6 +1345,34 @@ impl HotTierManager { } } + /// A past date is treated as fully synced when its on-disk hottier + /// manifest exists and lists at least one parquet. Used by the Historic + /// phase to skip the metastore fetch for dates we already have data for. + /// `date_key` is the partition directory name (e.g. "date=2026-05-12"). + async fn is_locally_complete( + &self, + stream: &str, + date_key: &str, + tenant_id: &Option, + ) -> bool { + let date_dir = if let Some(tenant) = tenant_id.as_ref() { + self.hot_tier_path.join(tenant).join(stream).join(date_key) + } else { + self.hot_tier_path.join(stream).join(date_key) + }; + let manifest_path = date_dir.join("hottier.manifest.json"); + if !manifest_path.exists() { + return false; + } + match fs::read(&manifest_path).await { + Ok(bytes) => match serde_json::from_slice::(&bytes) { + Ok(m) => !m.files.is_empty(), + Err(_) => false, + }, + Err(_) => false, + } + } + /// Returns the list of manifest files present in hot tier directory for the stream pub async fn get_hot_tier_manifest_files( &self, @@ -582,6 +1404,12 @@ impl HotTierManager { } ///get the list of parquet files from the hot tier directory for the stream + #[tracing::instrument( + name = "hottier.get_parquet_files", + skip(self), + fields(stream = %stream, tenant = ?tenant_id), + err + )] pub async fn get_hot_tier_parquet_files( &self, stream: &str, @@ -628,12 +1456,18 @@ impl HotTierManager { path.exists() } - ///delete the parquet file from the hot tier directory for the stream + /// delete entire parquet file minute from the hot tier directory for the stream /// loop through all manifests in the hot tier directory for the stream /// loop through all parquet files in the manifest /// check for the oldest entry to delete if the path exists in hot tier /// update the used and available size in the hot tier metadata /// loop if available size is still less than the parquet file size + #[tracing::instrument( + name = "hottier.cleanup_old_data", + skip(self, stream_hot_tier, download_file_path), + fields(stream = %stream, tenant = ?tenant_id, target_size = parquet_file_size), + err + )] pub async fn cleanup_hot_tier_old_data( &self, stream: &str, @@ -642,11 +1476,33 @@ impl HotTierManager { parquet_file_size: u64, tenant_id: &Option, ) -> Result { + info!( + stream = %stream, + tenant = ?tenant_id, + target_size = parquet_file_size, + available = stream_hot_tier.available_size, + "eviction starting" + ); let mut delete_successful = false; + let mut freed_total: u64 = 0; let dates = self.fetch_hot_tier_dates(stream, tenant_id).await?; + if dates.is_empty() { + info!( + stream = %stream, + tenant = ?tenant_id, + "eviction: no date dirs found, nothing to evict" + ); + } 'loop_dates: for date in dates { let path = self.get_stream_path_for_date(stream, &date, tenant_id); if !path.exists() { + info!( + stream = %stream, + tenant = ?tenant_id, + date = %date, + path = %path.display(), + "eviction: date path missing, skipping" + ); continue; } @@ -658,57 +1514,133 @@ impl HotTierManager { .to_string_lossy() .ends_with(".manifest.json") }); + if manifest_files.is_empty() { + info!( + stream = %stream, + tenant = ?tenant_id, + date = %date, + path = %path.display(), + "eviction: no .manifest.json files in date dir" + ); + continue; + } for manifest_file in manifest_files { let file = fs::read(manifest_file.path()).await?; let mut manifest: Manifest = serde_json::from_slice(&file)?; - manifest.files.sort_by_key(|file| file.file_path.clone()); - manifest.files.reverse(); - - 'loop_files: while let Some(file_to_delete) = manifest.files.pop() { - let file_size = file_to_delete.file_size; - let path_to_delete = self.hot_tier_path.join(&file_to_delete.file_path); - - if path_to_delete.exists() { - if let (Some(download_date_time), Some(delete_date_time)) = ( - extract_datetime(download_file_path.to_str().unwrap()), - extract_datetime(path_to_delete.to_str().unwrap()), - ) && download_date_time <= delete_date_time - { - delete_successful = false; - break 'loop_files; - } - - fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; + if manifest.files.is_empty() { + info!( + stream = %stream, + tenant = ?tenant_id, + manifest = %manifest_file.path().display(), + "eviction: manifest has zero file entries" + ); + continue; + } - fs::remove_dir_all(path_to_delete.parent().unwrap()).await?; - delete_empty_directory_hot_tier( - path_to_delete.parent().unwrap().to_path_buf(), - ) - .await?; + // sort in an ascending manner + // idx0: minute=00 + // idx59: minute=59 + manifest.files.sort_by_key(|file| file.file_path.clone()); - stream_hot_tier.used_size -= file_size; - stream_hot_tier.available_size += file_size; - self.put_hot_tier(stream, stream_hot_tier, tenant_id) - .await?; - delete_successful = true; + // get first file's parent (/hottier/stream/date=d/hour=h/minute=m) + let first_file = manifest.files.first().unwrap(); + let first_file_path = self.hot_tier_path.join(&first_file.file_path); + let minute_to_delete = first_file_path.parent().unwrap(); + + if !minute_to_delete.exists() { + info!( + stream = %stream, + tenant = ?tenant_id, + manifest = %manifest_file.path().display(), + first_file = %first_file.file_path, + minute = %minute_to_delete.display(), + "eviction: minute dir referenced by manifest does not exist on disk" + ); + continue; + } + { + if let (Some(download_date_time), Some(delete_date_time)) = ( + extract_datetime(download_file_path.to_str().unwrap()), + extract_datetime(first_file_path.to_str().unwrap()), + ) && download_date_time <= delete_date_time + { + info!( + stream = %stream, + tenant = ?tenant_id, + candidate = %minute_to_delete.display(), + target = %download_file_path.display(), + "skip evict: candidate newer than target" + ); + continue; + } - if stream_hot_tier.available_size <= parquet_file_size { - continue 'loop_files; + let minute_to_delete_owned = minute_to_delete.to_path_buf(); + let mut minute_freed: u64 = 0; + manifest.files.retain(|file| { + let file_path = self.hot_tier_path.join(&file.file_path); + let file_minute = file_path.parent().unwrap(); + if file_minute == minute_to_delete_owned { + minute_freed = minute_freed.saturating_add(file.file_size); + false } else { - break 'loop_dates; + true } + }); + + stream_hot_tier.used_size = stream_hot_tier + .used_size + .checked_sub(minute_freed) + .unwrap_or_else(|| { + tracing::error!( + stream = %stream, + tenant = ?tenant_id, + minute = %minute_to_delete_owned.display(), + minute_freed, + used_size = stream_hot_tier.used_size, + "minute_freed > used_size, clamping used_size to 0" + ); + 0 + }); + stream_hot_tier.available_size = + stream_hot_tier.available_size.saturating_add(minute_freed); + freed_total = freed_total.saturating_add(minute_freed); + + fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; + fs::remove_dir_all(&minute_to_delete_owned).await?; + delete_empty_directory_hot_tier(minute_to_delete_owned.clone()).await?; + self.put_hot_tier(stream, stream_hot_tier, tenant_id) + .await?; + delete_successful = true; + info!( + stream = %stream, + tenant = ?tenant_id, + evicted_minute = %minute_to_delete_owned.display(), + evicted_size = minute_freed, + freed_total, + new_available = stream_hot_tier.available_size, + "evicted" + ); + if stream_hot_tier.available_size < parquet_file_size { + continue; } else { - fs::write(manifest_file.path(), serde_json::to_vec(&manifest)?).await?; + break 'loop_dates; } } } } + info!( + stream = %stream, + tenant = ?tenant_id, + freed_total, + success = delete_successful, + "eviction complete" + ); Ok(delete_successful) } - ///check if the disk is available to download the parquet file + /// check if the disk is available to download the parquet file /// check if the disk usage is above the threshold pub async fn is_disk_available(&self, size_to_download: u64) -> Result { if let Some(DiskUtil { @@ -780,6 +1712,7 @@ impl HotTierManager { Ok(None) } + #[tracing::instrument(name = "hottier.put_internal_stream", skip(self), err)] pub async fn put_internal_stream_hot_tier(&self) -> Result<(), HotTierError> { let tenants = if let Some(tenants) = PARSEABLE.list_tenants() { tenants.into_iter().map(Some).collect() @@ -811,6 +1744,7 @@ impl HotTierManager { } /// Creates hot tier for pstats internal stream if the stream exists in storage + #[tracing::instrument(name = "hottier.create_pstats", skip(self), err)] async fn create_pstats_hot_tier(&self) -> Result<(), HotTierError> { let tenants = if let Some(tenants) = PARSEABLE.list_tenants() { tenants.into_iter().map(Some).collect() diff --git a/src/metastore/metastore_traits.rs b/src/metastore/metastore_traits.rs index 79745ecf8..0df0e8a5d 100644 --- a/src/metastore/metastore_traits.rs +++ b/src/metastore/metastore_traits.rs @@ -248,11 +248,15 @@ pub trait Metastore: std::fmt::Debug + Send + Sync { tenant_id: &Option, ) -> Result, MetastoreError>; - /// manifest - async fn get_all_manifest_files( + /// Fetch manifests only for the explicitly requested date keys + /// (e.g. `["date=2026-05-12"]`). Skips the top-level LIST to discover + /// dates — callers must already know which dates to query (e.g. via + /// the local hot-tier dir + a single object-store LIST when needed). + async fn get_manifest_files_for_dates( &self, stream_name: &str, tenant_id: &Option, + dates: &[String], ) -> Result>, MetastoreError>; async fn get_manifest( &self, diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index 904bcca24..5adf93cef 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -929,34 +929,24 @@ impl Metastore for ObjectStoreMetastore { } /// Fetch all `Manifest` files - async fn get_all_manifest_files( + async fn get_manifest_files_for_dates( &self, stream_name: &str, tenant_id: &Option, + dates: &[String], ) -> Result>, MetastoreError> { let mut result_file_list: BTreeMap> = BTreeMap::new(); - let root = if let Some(tenant) = tenant_id { - format!("{tenant}/{stream_name}") - } else { - stream_name.into() - }; - let resp = self.storage.list_with_delimiter(Some(root.into())).await?; - - let dates = resp - .common_prefixes - .iter() - .flat_map(|path| path.parts()) - .filter(|name| name.as_ref() != stream_name && name.as_ref() != STREAM_ROOT_DIRECTORY) - .map(|name| name.as_ref().to_string()) - .collect::>(); - for date in dates { let date_path = if let Some(tenant) = tenant_id { - object_store::path::Path::from(format!("{}/{}/{}", tenant, stream_name, &date)) + object_store::path::Path::from(format!("{}/{}/{}", tenant, stream_name, date)) } else { - object_store::path::Path::from(format!("{}/{}", stream_name, &date)) + object_store::path::Path::from(format!("{}/{}", stream_name, date)) + }; + let resp = match self.storage.list_with_delimiter(Some(date_path)).await { + Ok(r) => r, + Err(ObjectStorageError::NoSuchKey(_)) => continue, + Err(e) => return Err(e.into()), }; - let resp = self.storage.list_with_delimiter(Some(date_path)).await?; let manifest_paths: Vec = resp .objects @@ -970,7 +960,6 @@ impl Metastore for ObjectStoreMetastore { .storage .get_object(&RelativePathBuf::from(path), tenant_id) .await?; - result_file_list .entry(date.clone()) .or_default() diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index 9c9ff86fa..f00c8627a 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -306,6 +306,19 @@ pub static TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE: Lazy = + Lazy::new(|| { + IntCounterVec::new( + Opts::new( + "partial_file_scans_in_object_store_calls_by_date", + "Partial file scans in object store calls by date", + ) + .namespace(METRICS_NAMESPACE), + &["method", "date", "tenant_id"], + ) + .expect("metric can be created") + }); + pub static TOTAL_BYTES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE: Lazy = Lazy::new(|| { IntCounterVec::new( @@ -376,6 +389,18 @@ pub static STORAGE_REQUEST_RESPONSE_TIME: Lazy = Lazy::new(|| { .expect("metric can be created") }); +pub static STORAGE_REQUESTS_INFLIGHT: Lazy = Lazy::new(|| { + IntGaugeVec::new( + Opts::new( + "storage_requests_inflight", + "Number of in-flight object store requests", + ) + .namespace(METRICS_NAMESPACE), + &["provider", "method"], + ) + .expect("metric can be created") +}); + pub static TOTAL_METRICS_COLLECTED_BY_DATE: Lazy = Lazy::new(|| { IntCounterVec::new( Opts::new( @@ -527,6 +552,11 @@ fn custom_metrics(registry: &Registry) { TOTAL_FILES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE.clone(), )) .expect("metric can be registered"); + registry + .register(Box::new( + PARTIAL_FILE_SCANS_IN_OBJECT_STORE_CALLS_BY_DATE.clone(), + )) + .expect("metric can be registered"); registry .register(Box::new( TOTAL_BYTES_SCANNED_IN_OBJECT_STORE_CALLS_BY_DATE.clone(), @@ -547,6 +577,9 @@ fn custom_metrics(registry: &Registry) { registry .register(Box::new(STORAGE_REQUEST_RESPONSE_TIME.clone())) .expect("metric can be registered"); + registry + .register(Box::new(STORAGE_REQUESTS_INFLIGHT.clone())) + .expect("metric can be registered"); registry .register(Box::new(TOTAL_METRICS_COLLECTED_BY_DATE.clone())) .expect("metric can be registered"); @@ -695,6 +728,17 @@ pub fn increment_object_store_calls_by_date(method: &str, date: &str, tenant_id: .inc(); } +pub fn increment_partial_file_scans_in_object_store_calls_by_date( + method: &str, + count: u64, + date: &str, + tenant_id: &str, +) { + PARTIAL_FILE_SCANS_IN_OBJECT_STORE_CALLS_BY_DATE + .with_label_values(&[method, date, tenant_id]) + .inc_by(count); +} + pub fn increment_files_scanned_in_object_store_calls_by_date( method: &str, count: u64, diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 62abad0db..1661d16fc 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -16,7 +16,13 @@ * */ -use std::{collections::HashSet, path::Path, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + ops::Range, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; use async_trait::async_trait; use bytes::Bytes; @@ -47,6 +53,7 @@ use crate::{ increment_bytes_scanned_in_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + increment_partial_file_scans_in_object_store_calls_by_date, }, parseable::{DEFAULT_TENANT, LogStream, PARSEABLE}, }; @@ -54,7 +61,8 @@ use crate::{ use super::{ CONNECT_TIMEOUT_SECS, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, STREAM_METADATA_FILE_NAME, - metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, + metrics_layer::MetricLayer, object_storage::parseable_json_path, partial_path, + to_object_store_path, }; #[derive(Debug, Clone, clap::Args)] @@ -180,8 +188,9 @@ impl ObjectStorageProvider for AzureBlobConfig { let azure = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); + let azure = MetricLayer::new(azure, "azure"); Arc::new(BlobStore { - client: azure, + client: Arc::new(azure), account: self.account.clone(), container: self.container.clone(), root: StorePath::from(""), @@ -197,13 +206,125 @@ impl ObjectStorageProvider for AzureBlobConfig { // object store such as S3 and Azure Blob #[derive(Debug)] pub struct BlobStore { - client: LimitStore, + client: Arc>>, account: String, container: String, root: StorePath, } impl BlobStore { + async fn _parallel_download( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let partial = partial_path(&write_path)?; + match self + ._parallel_download_inner(path, tenant_id, partial.clone()) + .await + { + Ok(()) => { + if let Err(e) = tokio::fs::rename(&partial, &write_path).await { + let _ = tokio::fs::remove_file(&partial).await; + return Err(e.into()); + } + Ok(()) + } + Err(e) => { + let _ = tokio::fs::remove_file(&partial).await; + Err(e) + } + } + } + + #[tracing::instrument( + name = "azure.parallel_download", + skip(self, partial_path), + fields(path = %path, tenant = ?tenant_id, total_bytes = tracing::field::Empty, chunks = tracing::field::Empty), + err + )] + async fn _parallel_download_inner( + &self, + path: &RelativePath, + tenant_id: &Option, + partial_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let date = Utc::now().date_naive().to_string(); + let src = to_object_store_path(path); + + let meta = self.client.head(&src).await?; + increment_object_store_calls_by_date("HEAD", &date, tenant_str); + let total = meta.size; + + if let Some(parent) = partial_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file = tokio::fs::File::create(&partial_path).await?; + file.set_len(total).await?; + let std_file = Arc::new(file.into_std().await); + + let chunk = PARSEABLE.options.hot_tier_download_chunk_size; + let concurrency = PARSEABLE.options.hot_tier_download_concurrency; + let ranges: Vec> = (0..total) + .step_by(chunk as usize) + .map(|s| s..(s + chunk).min(total)) + .collect(); + let chunk_count = ranges.len() as u64; + tracing::Span::current() + .record("total_bytes", total) + .record("chunks", chunk_count); + let client = self.client.clone(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency as usize)); + + futures::stream::iter(ranges) + .map(|r| { + let client = client.clone(); + let src = src.clone(); + let std_file = std_file.clone(); + let semaphore = semaphore.clone(); + async move { + let _permit = semaphore.acquire_owned().await.map_err(|e| { + ObjectStorageError::Custom(format!("semaphore closed: {e}")) + })?; + let bytes = client.get_range(&src, r.clone()).await?; + let offset = r.start; + tokio::task::spawn_blocking(move || -> std::io::Result<()> { + crate::storage::write_all_at(&std_file, &bytes, offset) + }) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; + Ok::<_, ObjectStorageError>(()) + } + }) + .buffer_unordered(concurrency as usize) + .try_collect::>() + .await?; + + let std_file_sync = std_file.clone(); + tokio::task::spawn_blocking(move || std_file_sync.sync_all()) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; + + increment_object_store_calls_by_date("GET", &date, tenant_str); + increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); + increment_bytes_scanned_in_object_store_calls_by_date("GET", total, &date, tenant_str); + increment_partial_file_scans_in_object_store_calls_by_date( + "GET", + chunk_count, + &date, + tenant_str, + ); + Ok(()) + } + + #[tracing::instrument( + name = "azure.get_object", + skip(self), + fields(path = %path, tenant = ?tenant_id, bytes = tracing::field::Empty), + err + )] async fn _get_object( &self, path: &RelativePath, @@ -216,6 +337,7 @@ impl BlobStore { match resp { Ok(resp) => { let body: Bytes = resp.bytes().await?; + tracing::Span::current().record("bytes", body.len()); increment_files_scanned_in_object_store_calls_by_date( "GET", 1, @@ -234,6 +356,11 @@ impl BlobStore { } } + #[tracing::instrument( + name = "azure.put_object", + skip(self, resource), + fields(path = %path, tenant = ?tenant_id, bytes = resource.content_length()) + )] async fn _put_object( &self, path: &RelativePath, @@ -257,6 +384,12 @@ impl BlobStore { } } + #[tracing::instrument( + name = "azure.delete_prefix", + skip(self), + fields(prefix = %key, tenant = ?tenant_id, deleted = tracing::field::Empty, failed = tracing::field::Empty), + err + )] async fn _delete_prefix( &self, key: &str, @@ -287,6 +420,9 @@ impl BlobStore { } let total_files = files_deleted + failed_deletes; + tracing::Span::current() + .record("deleted", files_deleted) + .record("failed", failed_deletes); increment_files_scanned_in_object_store_calls_by_date( "LIST", total_files, @@ -309,6 +445,12 @@ impl BlobStore { Ok(()) } + #[tracing::instrument( + name = "azure.list_dates", + skip(self), + fields(stream = %stream, tenant = ?tenant_id, dates = tracing::field::Empty), + err + )] async fn _list_dates( &self, stream: &str, @@ -343,10 +485,16 @@ impl BlobStore { .filter_map(|path| path.as_ref().strip_prefix(&format!("{stream}/"))) .map(String::from) .collect(); + tracing::Span::current().record("dates", dates.len()); Ok(dates) } + #[tracing::instrument( + name = "azure.upload_file", + skip(self), + fields(key = %key, path = %path.display(), tenant = ?tenant_id) + )] async fn _upload_file( &self, key: &str, @@ -371,6 +519,11 @@ impl BlobStore { } } + #[tracing::instrument( + name = "azure.upload_multipart", + skip(self), + fields(key = %key, path = %path.display(), tenant = ?tenant_id, total_bytes = tracing::field::Empty) + )] async fn _upload_multipart( &self, key: &RelativePath, @@ -383,6 +536,7 @@ impl BlobStore { let meta = file.metadata().await?; let total_size = meta.len() as usize; + tracing::Span::current().record("total_bytes", total_size); let min_multipart_size = PARSEABLE.options.min_multipart_size as usize; if total_size < min_multipart_size || !PARSEABLE.options.enable_multipart { let mut data = Vec::new(); @@ -466,6 +620,14 @@ impl BlobStore { #[async_trait] impl ObjectStorage for BlobStore { + async fn parallel_chunked_download( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + self._parallel_download(path, tenant_id, write_path).await + } async fn get_buffered_reader( &self, _path: &RelativePath, @@ -488,6 +650,12 @@ impl ObjectStorage for BlobStore { self._upload_multipart(key, path, tenant_id).await } + #[tracing::instrument( + name = "azure.head", + skip(self), + fields(path = %path, tenant = ?tenant_id), + err + )] async fn head( &self, path: &RelativePath, @@ -628,6 +796,12 @@ impl ObjectStorage for BlobStore { Ok(()) } + #[tracing::instrument( + name = "azure.delete_object", + skip(self), + fields(path = %path, tenant = ?tenant_id), + err + )] async fn delete_object( &self, path: &RelativePath, @@ -652,11 +826,18 @@ impl ObjectStorage for BlobStore { Ok(result?) } + #[tracing::instrument( + name = "azure.check", + skip(self), + fields(tenant = ?tenant_id, ok = tracing::field::Empty), + err + )] async fn check(&self, tenant_id: &Option) -> Result<(), ObjectStorageError> { let result = self .client .head(&to_object_store_path(&parseable_json_path())) .await; + tracing::Span::current().record("ok", result.is_ok()); let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string(), tenant); if result.is_ok() { diff --git a/src/storage/gcs.rs b/src/storage/gcs.rs index 019802f24..56bcc71a9 100644 --- a/src/storage/gcs.rs +++ b/src/storage/gcs.rs @@ -16,13 +16,20 @@ * */ -use std::{collections::HashSet, path::Path, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + ops::Range, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; use crate::{ metrics::{ increment_bytes_scanned_in_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + increment_partial_file_scans_in_object_store_calls_by_date, }, parseable::{DEFAULT_TENANT, LogStream, PARSEABLE}, }; @@ -47,12 +54,14 @@ use object_store::{ }; use relative_path::{RelativePath, RelativePathBuf}; use tokio::{fs::OpenOptions, io::AsyncReadExt}; + use tracing::error; use super::{ CONNECT_TIMEOUT_SECS, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, STREAM_METADATA_FILE_NAME, - metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, + metrics_layer::MetricLayer, object_storage::parseable_json_path, partial_path, + to_object_store_path, }; #[derive(Debug, Clone, clap::Args)] @@ -141,7 +150,9 @@ impl ObjectStorageProvider for GcsConfig { fn construct_client(&self) -> Arc { let gcs = self.get_default_builder().build().unwrap(); - + // limit objectstore to a concurrent request limit + let gcs = LimitStore::new(gcs, super::MAX_OBJECT_STORE_REQUESTS); + let gcs = MetricLayer::new(gcs, "gcs"); Arc::new(Gcs { client: Arc::new(gcs), bucket: self.bucket_name.clone(), @@ -163,12 +174,124 @@ impl ObjectStorageProvider for GcsConfig { #[derive(Debug)] pub struct Gcs { - client: Arc, + client: Arc>>, bucket: String, root: StorePath, } impl Gcs { + async fn _parallel_download( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let partial = partial_path(&write_path)?; + match self + ._parallel_download_inner(path, tenant_id, partial.clone()) + .await + { + Ok(()) => { + if let Err(e) = tokio::fs::rename(&partial, &write_path).await { + let _ = tokio::fs::remove_file(&partial).await; + return Err(e.into()); + } + Ok(()) + } + Err(e) => { + let _ = tokio::fs::remove_file(&partial).await; + Err(e) + } + } + } + + #[tracing::instrument( + name = "gcs.parallel_download", + skip(self, partial_path), + fields(path = %path, tenant = ?tenant_id, total_bytes = tracing::field::Empty, chunks = tracing::field::Empty), + err + )] + async fn _parallel_download_inner( + &self, + path: &RelativePath, + tenant_id: &Option, + partial_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let date = Utc::now().date_naive().to_string(); + let src = to_object_store_path(path); + + let meta = self.client.head(&src).await?; + increment_object_store_calls_by_date("HEAD", &date, tenant_str); + let total = meta.size; + + if let Some(parent) = partial_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file = tokio::fs::File::create(&partial_path).await?; + file.set_len(total).await?; + let std_file = Arc::new(file.into_std().await); + + let chunk = PARSEABLE.options.hot_tier_download_chunk_size; + let concurrency = PARSEABLE.options.hot_tier_download_concurrency; + let ranges: Vec> = (0..total) + .step_by(chunk as usize) + .map(|s| s..(s + chunk).min(total)) + .collect(); + let chunk_count = ranges.len() as u64; + tracing::Span::current() + .record("total_bytes", total) + .record("chunks", chunk_count); + let client = self.client.clone(); + let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency as usize)); + + futures::stream::iter(ranges) + .map(|r| { + let client = client.clone(); + let src = src.clone(); + let std_file = std_file.clone(); + let semaphore = semaphore.clone(); + async move { + let _permit = semaphore.acquire_owned().await.map_err(|e| { + ObjectStorageError::Custom(format!("semaphore closed: {e}")) + })?; + let bytes = client.get_range(&src, r.clone()).await?; + let offset = r.start; + tokio::task::spawn_blocking(move || -> std::io::Result<()> { + crate::storage::write_all_at(&std_file, &bytes, offset) + }) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; + Ok::<_, ObjectStorageError>(()) + } + }) + .buffer_unordered(concurrency as usize) + .try_collect::>() + .await?; + + let std_file_sync = std_file.clone(); + tokio::task::spawn_blocking(move || std_file_sync.sync_all()) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; + + increment_object_store_calls_by_date("GET", &date, tenant_str); + increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); + increment_bytes_scanned_in_object_store_calls_by_date("GET", total, &date, tenant_str); + increment_partial_file_scans_in_object_store_calls_by_date( + "GET", + chunk_count, + &date, + tenant_str, + ); + Ok(()) + } + + #[tracing::instrument( + name = "gcs.get_object", + skip(self), + fields(path = %path, tenant = ?tenant_id, bytes = tracing::field::Empty), + err + )] async fn _get_object( &self, path: &RelativePath, @@ -180,6 +303,7 @@ impl Gcs { match resp { Ok(resp) => { let body: Bytes = resp.bytes().await?; + tracing::Span::current().record("bytes", body.len()); increment_files_scanned_in_object_store_calls_by_date( "GET", 1, @@ -198,6 +322,11 @@ impl Gcs { } } + #[tracing::instrument( + name = "gcs.put_object", + skip(self, resource), + fields(path = %path, tenant = ?tenant_id, bytes = resource.content_length()) + )] async fn _put_object( &self, path: &RelativePath, @@ -221,6 +350,12 @@ impl Gcs { } } + #[tracing::instrument( + name = "gcs.delete_prefix", + skip(self), + fields(prefix = %key, tenant = ?tenant_id, deleted = tracing::field::Empty, failed = tracing::field::Empty), + err + )] async fn _delete_prefix( &self, key: &str, @@ -251,6 +386,9 @@ impl Gcs { } let total_files = files_deleted + failed_deletes; + tracing::Span::current() + .record("deleted", files_deleted) + .record("failed", failed_deletes); increment_files_scanned_in_object_store_calls_by_date( "LIST", total_files, @@ -273,6 +411,12 @@ impl Gcs { Ok(()) } + #[tracing::instrument( + name = "gcs.list_dates", + skip(self), + fields(stream = %stream, tenant = ?tenant_id, dates = tracing::field::Empty), + err + )] async fn _list_dates( &self, stream: &str, @@ -307,10 +451,16 @@ impl Gcs { .filter_map(|path| path.as_ref().strip_prefix(&format!("{stream}/"))) .map(String::from) .collect(); + tracing::Span::current().record("dates", dates.len()); Ok(dates) } + #[tracing::instrument( + name = "gcs.upload_file", + skip(self), + fields(key = %key, path = %path.display(), tenant = ?tenant_id) + )] async fn _upload_file( &self, key: &str, @@ -335,6 +485,11 @@ impl Gcs { } } + #[tracing::instrument( + name = "gcs.upload_multipart", + skip(self), + fields(key = %key, path = %path.display(), tenant = ?tenant_id, total_bytes = tracing::field::Empty) + )] async fn _upload_multipart( &self, key: &RelativePath, @@ -346,6 +501,7 @@ impl Gcs { let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); let meta = file.metadata().await?; let total_size = meta.len() as usize; + tracing::Span::current().record("total_bytes", total_size); let min_multipart_size = PARSEABLE.options.min_multipart_size as usize; if total_size < min_multipart_size || !PARSEABLE.options.enable_multipart { let mut data = Vec::new(); @@ -431,6 +587,14 @@ impl Gcs { #[async_trait] impl ObjectStorage for Gcs { + async fn parallel_chunked_download( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + self._parallel_download(path, tenant_id, write_path).await + } async fn get_buffered_reader( &self, path: &RelativePath, @@ -469,6 +633,12 @@ impl ObjectStorage for Gcs { self._upload_multipart(key, path, tenant_id).await } + #[tracing::instrument( + name = "gcs.head", + skip(self), + fields(path = %path, tenant = ?tenant_id), + err + )] async fn head( &self, path: &RelativePath, @@ -609,6 +779,12 @@ impl ObjectStorage for Gcs { Ok(()) } + #[tracing::instrument( + name = "gcs.delete_object", + skip(self), + fields(path = %path, tenant = ?tenant_id), + err + )] async fn delete_object( &self, path: &RelativePath, @@ -633,11 +809,18 @@ impl ObjectStorage for Gcs { Ok(result?) } + #[tracing::instrument( + name = "gcs.check", + skip(self), + fields(tenant = ?tenant_id, ok = tracing::field::Empty), + err + )] async fn check(&self, tenant_id: &Option) -> Result<(), ObjectStorageError> { let result = self .client .head(&to_object_store_path(&parseable_json_path())) .await; + tracing::Span::current().record("ok", result.is_ok()); let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); increment_object_store_calls_by_date("HEAD", &Utc::now().date_naive().to_string(), tenant); diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 6f981981c..97d909eb6 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -106,6 +106,28 @@ impl LocalFS { #[async_trait] impl ObjectStorage for LocalFS { + async fn parallel_chunked_download( + &self, + path: &RelativePath, + _tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let src = self.path_in_root(path); + let partial = super::partial_path(&write_path)?; + if let Some(parent) = partial.parent() { + fs::create_dir_all(parent).await?; + } + match fs::copy(&src, &partial).await { + Ok(_) => { + fs::rename(&partial, &write_path).await?; + Ok(()) + } + Err(e) => { + let _ = fs::remove_file(&partial).await; + Err(e.into()) + } + } + } async fn upload_multipart( &self, key: &RelativePath, diff --git a/src/storage/metrics_layer.rs b/src/storage/metrics_layer.rs index ac82e43d6..297e65eec 100644 --- a/src/storage/metrics_layer.rs +++ b/src/storage/metrics_layer.rs @@ -31,7 +31,34 @@ use object_store::{ Result as ObjectStoreResult, path::Path, }; -use crate::metrics::STORAGE_REQUEST_RESPONSE_TIME; +use crate::metrics::{STORAGE_REQUEST_RESPONSE_TIME, STORAGE_REQUESTS_INFLIGHT}; + +/// RAII guard that increments the in-flight gauge on construction and +/// decrements on drop. Handles early returns, panics, and dropped futures. +struct InflightGuard { + provider: String, + method: &'static str, +} + +impl InflightGuard { + fn new(provider: &str, method: &'static str) -> Self { + STORAGE_REQUESTS_INFLIGHT + .with_label_values(&[provider, method]) + .inc(); + Self { + provider: provider.to_string(), + method, + } + } +} + +impl Drop for InflightGuard { + fn drop(&mut self) { + STORAGE_REQUESTS_INFLIGHT + .with_label_values(&[&self.provider, self.method]) + .dec(); + } +} // Public helper function to map object_store errors to HTTP status codes pub fn error_to_status_code(err: &object_store::Error) -> &'static str { @@ -91,6 +118,7 @@ impl ObjectStore for MetricLayer { payload: PutPayload, opts: PutOptions, ) -> ObjectStoreResult { + let _guard = InflightGuard::new(&self.provider, "PUT"); let time = time::Instant::now(); let put_result = self.inner.put_opts(location, payload, opts).await; let elapsed = time.elapsed().as_secs_f64(); @@ -111,6 +139,7 @@ impl ObjectStore for MetricLayer { location: &Path, opts: PutMultipartOptions, ) -> ObjectStoreResult> { + let _guard = InflightGuard::new(&self.provider, "PUT_MULTIPART"); let time = time::Instant::now(); let result = self.inner.put_multipart_opts(location, opts).await; let elapsed = time.elapsed().as_secs_f64(); @@ -127,6 +156,7 @@ impl ObjectStore for MetricLayer { } async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult { + let _guard = InflightGuard::new(&self.provider, "GET"); let time = time::Instant::now(); let result = self.inner.get_opts(location, options).await; let elapsed = time.elapsed().as_secs_f64(); @@ -147,6 +177,7 @@ impl ObjectStore for MetricLayer { location: &Path, ranges: &[Range], ) -> ObjectStoreResult> { + let _guard = InflightGuard::new(&self.provider, "GET_RANGES"); let time = time::Instant::now(); let result = self.inner.get_ranges(location, ranges).await; let elapsed = time.elapsed().as_secs_f64(); @@ -170,6 +201,7 @@ impl ObjectStore for MetricLayer { } fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult> { + let _guard = InflightGuard::new(&self.provider, "LIST"); let time = time::Instant::now(); let inner = self.inner.list(prefix); let res = StreamMetricWrapper { @@ -177,6 +209,7 @@ impl ObjectStore for MetricLayer { provider: self.provider.clone(), method: "LIST", status: "200", + _guard, inner, }; Box::pin(res) @@ -187,6 +220,7 @@ impl ObjectStore for MetricLayer { prefix: Option<&Path>, offset: &Path, ) -> BoxStream<'static, ObjectStoreResult> { + let _guard = InflightGuard::new(&self.provider, "LIST_OFFSET"); let time = time::Instant::now(); let inner = self.inner.list_with_offset(prefix, offset); let res = StreamMetricWrapper { @@ -194,6 +228,7 @@ impl ObjectStore for MetricLayer { provider: self.provider.clone(), method: "LIST_OFFSET", status: "200", + _guard, inner, }; @@ -201,6 +236,7 @@ impl ObjectStore for MetricLayer { } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult { + let _guard = InflightGuard::new(&self.provider, "LIST_DELIM"); let time = time::Instant::now(); let result = self.inner.list_with_delimiter(prefix).await; let elapsed = time.elapsed().as_secs_f64(); @@ -222,6 +258,7 @@ impl ObjectStore for MetricLayer { to: &Path, options: CopyOptions, ) -> ObjectStoreResult<()> { + let _guard = InflightGuard::new(&self.provider, "COPY"); let time = time::Instant::now(); let result = self.inner.copy_opts(from, to, options).await; let elapsed = time.elapsed().as_secs_f64(); @@ -243,6 +280,7 @@ impl ObjectStore for MetricLayer { to: &Path, options: RenameOptions, ) -> ObjectStoreResult<()> { + let _guard = InflightGuard::new(&self.provider, "RENAME"); let time = time::Instant::now(); let result = self.inner.rename_opts(from, to, options).await; let elapsed = time.elapsed().as_secs_f64(); @@ -264,6 +302,7 @@ struct StreamMetricWrapper<'a, T> { provider: String, method: &'static str, status: &'static str, + _guard: InflightGuard, inner: BoxStream<'a, T>, } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 2ca7a10de..df120cacb 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -48,6 +48,44 @@ pub mod retention; mod s3; pub mod store_metadata; +/// Cross-platform positional write: pwrite(2) on Unix, seek_write+loop on Windows. +/// Both APIs accept `&File`, so concurrent ranged downloads can share an Arc. +#[inline(always)] +pub(crate) fn write_all_at(file: &std::fs::File, buf: &[u8], offset: u64) -> std::io::Result<()> { + #[cfg(unix)] + { + use std::os::unix::fs::FileExt; + file.write_all_at(buf, offset) + } + #[cfg(windows)] + { + use std::os::windows::fs::FileExt; + let mut buf = buf; + let mut offset = offset; + while !buf.is_empty() { + match file.seek_write(buf, offset) { + Ok(0) => { + return Err(std::io::Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )); + } + Ok(n) => { + buf = &buf[n..]; + offset += n as u64; + } + Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} + Err(e) => return Err(e), + } + } + Ok(()) + } + #[cfg(not(any(unix, windows)))] + { + compile_error!("write_all_at: unsupported platform"); + } +} + use self::retention::Retention; pub use azure_blob::AzureBlobConfig; pub use gcs::GcsConfig; @@ -344,3 +382,18 @@ pub enum ObjectStorageError { pub fn to_object_store_path(path: &RelativePath) -> Path { Path::from(path.as_str()) } + +/// Append `.partial` to the file name of a local path. Used by hot-tier +/// downloaders to write to a sibling path and atomically rename on success. +pub fn partial_path( + write_path: &std::path::Path, +) -> Result { + let name = write_path + .file_name() + .ok_or_else(|| ObjectStorageError::Custom("download write_path has no file name".into()))?; + let mut next = std::ffi::OsString::from(name); + next.push(".partial"); + let mut buf = write_path.to_path_buf(); + buf.set_file_name(next); + Ok(buf) +} diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 2e58a0df5..b67cf18f9 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -98,6 +98,11 @@ pub(crate) struct UploadResult { } /// Handles the upload of a single parquet file +#[tracing::instrument( + name = "object_store.upload_single_parquet", + skip(store, schema), + fields(stream = %stream_name, tenant = ?tenant_id, path = %path.display(), key = %stream_relative_path) +)] async fn upload_single_parquet_file( store: Arc, path: std::path::PathBuf, @@ -240,6 +245,11 @@ async fn calculate_stats_if_enabled( } /// Validates that a parquet file uploaded to object storage matches the staging file size +#[tracing::instrument( + name = "object_store.validate_uploaded_parquet", + skip(store), + fields(stream = %stream_name, tenant = ?tenant_id, key = %stream_relative_path, expected_size) +)] async fn validate_uploaded_parquet_file( store: &Arc, stream_relative_path: &str, @@ -302,6 +312,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { path: &RelativePath, tenant_id: &Option, ) -> Result; + async fn parallel_chunked_download( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError>; async fn get_object( &self, path: &RelativePath, @@ -998,6 +1014,12 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { // pick a better name fn get_bucket_name(&self) -> String; + #[tracing::instrument( + name = "object_store.upload_files_from_staging", + skip(self), + fields(stream = %stream_name, tenant = ?tenant_id), + err + )] async fn upload_files_from_staging( &self, stream_name: &str, @@ -1027,6 +1049,18 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { } /// Processes parquet files concurrently and returns stats status and manifest files +#[tracing::instrument( + name = "object_store.process_parquet_files", + skip(upload_context), + fields( + stream = %stream_name, + tenant = ?tenant_id, + parquet_count = tracing::field::Empty, + total_size_bytes = tracing::field::Empty, + smallest_date = tracing::field::Empty, + largest_date = tracing::field::Empty, + ) +)] async fn process_parquet_files( upload_context: &UploadContext, stream_name: &str, @@ -1053,6 +1087,27 @@ async fn process_parquet_files( ret }; + let mut total_size: u64 = 0; + let mut min_dt: Option> = None; + let mut max_dt: Option> = None; + for path in &parquet_paths { + if let Ok(meta) = path.metadata() { + total_size = total_size.saturating_add(meta.len()); + } + if let Ok(dt) = extract_datetime_from_parquet_path_regex(path) { + min_dt = Some(min_dt.map_or(dt, |cur| cur.min(dt))); + max_dt = Some(max_dt.map_or(dt, |cur| cur.max(dt))); + } + } + let span = tracing::Span::current(); + span.record("parquet_count", parquet_paths.len()); + span.record("total_size_bytes", total_size); + if let Some(dt) = min_dt { + span.record("smallest_date", tracing::field::display(dt)); + } + if let Some(dt) = max_dt { + span.record("largest_date", tracing::field::display(dt)); + } // Spawn upload tasks for each parquet file for path in parquet_paths { spawn_parquet_upload_task( diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 169d3c508..dd3013952 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -17,7 +17,13 @@ */ use std::{ - collections::HashSet, fmt::Display, path::Path, str::FromStr, sync::Arc, time::Duration, + collections::HashSet, + fmt::Display, + ops::Range, + path::{Path, PathBuf}, + str::FromStr, + sync::Arc, + time::Duration, }; use async_trait::async_trait; @@ -48,6 +54,7 @@ use crate::{ increment_bytes_scanned_in_object_store_calls_by_date, increment_files_scanned_in_object_store_calls_by_date, increment_object_store_calls_by_date, + increment_partial_file_scans_in_object_store_calls_by_date, }, parseable::{DEFAULT_TENANT, LogStream, PARSEABLE}, }; @@ -55,7 +62,8 @@ use crate::{ use super::{ CONNECT_TIMEOUT_SECS, ObjectStorage, ObjectStorageError, ObjectStorageProvider, PARSEABLE_ROOT_DIRECTORY, REQUEST_TIMEOUT_SECS, STREAM_METADATA_FILE_NAME, - metrics_layer::MetricLayer, object_storage::parseable_json_path, to_object_store_path, + metrics_layer::MetricLayer, object_storage::parseable_json_path, partial_path, + to_object_store_path, }; // in bytes @@ -310,9 +318,11 @@ impl ObjectStorageProvider for S3Config { fn construct_client(&self) -> Arc { let s3 = self.get_default_builder().build().unwrap(); - + // limit objectstore to a concurrent request limit + let s3 = LimitStore::new(s3, super::MAX_OBJECT_STORE_REQUESTS); + let s3 = MetricLayer::new(s3, "s3"); Arc::new(S3 { - client: s3, + client: Arc::new(s3), bucket: self.bucket_name.clone(), root: StorePath::from(""), }) @@ -325,12 +335,124 @@ impl ObjectStorageProvider for S3Config { #[derive(Debug)] pub struct S3 { - client: AmazonS3, + client: Arc>>, bucket: String, root: StorePath, } impl S3 { + async fn _parallel_download( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let partial = partial_path(&write_path)?; + match self + ._parallel_download_inner(path, tenant_id, partial.clone()) + .await + { + Ok(()) => { + if let Err(e) = tokio::fs::rename(&partial, &write_path).await { + let _ = tokio::fs::remove_file(&partial).await; + return Err(e.into()); + } + Ok(()) + } + Err(e) => { + let _ = tokio::fs::remove_file(&partial).await; + Err(e) + } + } + } + + #[tracing::instrument( + name = "s3.parallel_download", + skip(self, partial_path), + fields(path = %path, tenant = ?tenant_id, total_bytes = tracing::field::Empty, chunks = tracing::field::Empty), + err + )] + async fn _parallel_download_inner( + &self, + path: &RelativePath, + tenant_id: &Option, + partial_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + let date = Utc::now().date_naive().to_string(); + let src = to_object_store_path(path); + + let meta = self.client.head(&src).await?; + increment_object_store_calls_by_date("HEAD", &date, tenant_str); + let total = meta.size; + + if let Some(parent) = partial_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let file = tokio::fs::File::create(&partial_path).await?; + file.set_len(total).await?; + let std_file = Arc::new(file.into_std().await); + + let chunk = PARSEABLE.options.hot_tier_download_chunk_size; + let concurrency = PARSEABLE.options.hot_tier_download_concurrency; + let ranges: Vec> = (0..total) + .step_by(chunk as usize) + .map(|s| s..(s + chunk).min(total)) + .collect(); + let chunk_count = ranges.len() as u64; + tracing::Span::current() + .record("total_bytes", total) + .record("chunks", chunk_count); + let client = Arc::new(self.client.clone()); + let semaphore = Arc::new(tokio::sync::Semaphore::new(concurrency as usize)); + + futures::stream::iter(ranges) + .map(|r| { + let client = client.clone(); + let src = src.clone(); + let std_file = std_file.clone(); + let semaphore = semaphore.clone(); + async move { + let _permit = semaphore.acquire_owned().await.map_err(|e| { + ObjectStorageError::Custom(format!("semaphore closed: {e}")) + })?; + let bytes = client.get_range(&src, r.clone()).await?; + let offset = r.start; + tokio::task::spawn_blocking(move || -> std::io::Result<()> { + crate::storage::write_all_at(&std_file, &bytes, offset) + }) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; + Ok::<_, ObjectStorageError>(()) + } + }) + .buffer_unordered(concurrency as usize) + .try_collect::>() + .await?; + + let std_file_sync = std_file.clone(); + tokio::task::spawn_blocking(move || std_file_sync.sync_all()) + .await + .map_err(|e| ObjectStorageError::Custom(format!("join: {e}")))??; + + increment_object_store_calls_by_date("GET", &date, tenant_str); + increment_files_scanned_in_object_store_calls_by_date("GET", 1, &date, tenant_str); + increment_bytes_scanned_in_object_store_calls_by_date("GET", total, &date, tenant_str); + increment_partial_file_scans_in_object_store_calls_by_date( + "GET", + chunk_count, + &date, + tenant_str, + ); + Ok(()) + } + + #[tracing::instrument( + name = "s3.get_object", + skip(self), + fields(path = %path, tenant = ?tenant_id, bytes = tracing::field::Empty), + err + )] async fn _get_object( &self, path: &RelativePath, @@ -347,6 +469,7 @@ impl S3 { match resp { Ok(resp) => { let body = resp.bytes().await?; + tracing::Span::current().record("bytes", body.len()); increment_files_scanned_in_object_store_calls_by_date( "GET", 1, @@ -365,6 +488,11 @@ impl S3 { } } + #[tracing::instrument( + name = "s3.put_object", + skip(self, resource), + fields(path = %path, tenant = ?tenant_id, bytes = resource.content_length()) + )] async fn _put_object( &self, path: &RelativePath, @@ -392,6 +520,12 @@ impl S3 { } } + #[tracing::instrument( + name = "s3.delete_prefix", + skip(self), + fields(prefix = %key, tenant = ?tenant_id, deleted = tracing::field::Empty, failed = tracing::field::Empty), + err + )] async fn _delete_prefix( &self, key: &str, @@ -426,6 +560,9 @@ impl S3 { } let total_files = files_deleted + failed_deletes; + tracing::Span::current() + .record("deleted", files_deleted) + .record("failed", failed_deletes); increment_files_scanned_in_object_store_calls_by_date( "LIST", total_files, @@ -448,6 +585,12 @@ impl S3 { Ok(()) } + #[tracing::instrument( + name = "s3.list_dates", + skip(self), + fields(stream = %stream, tenant = ?tenant_id, dates = tracing::field::Empty), + err + )] async fn _list_dates( &self, stream: &str, @@ -490,10 +633,16 @@ impl S3 { .filter_map(|path| path.as_ref().strip_prefix(&prefix)) .map(String::from) .collect(); + tracing::Span::current().record("dates", dates.len()); Ok(dates) } + #[tracing::instrument( + name = "s3.upload_file", + skip(self), + fields(key = %key, path = %path.display(), tenant = ?tenant_id) + )] async fn _upload_file( &self, key: &str, @@ -523,6 +672,11 @@ impl S3 { } } + #[tracing::instrument( + name = "s3.upload_multipart", + skip(self), + fields(key = %key, path = %path.display(), tenant = ?tenant_id, total_bytes = tracing::field::Empty, parts = tracing::field::Empty) + )] async fn _upload_multipart( &self, key: &RelativePath, @@ -579,6 +733,9 @@ impl S3 { let has_final_partial_part = !total_size.is_multiple_of(min_multipart_size); let num_full_parts = total_size / min_multipart_size; let total_parts = num_full_parts + if has_final_partial_part { 1 } else { 0 }; + tracing::Span::current() + .record("total_bytes", total_size) + .record("parts", total_parts); // Upload each part with metrics for part_number in 0..(total_parts) { @@ -662,6 +819,12 @@ impl ObjectStorage for S3 { self._upload_multipart(key, path, tenant_id).await } + #[tracing::instrument( + name = "s3.head", + skip(self), + fields(path = %path, tenant = ?tenant_id), + err + )] async fn head( &self, path: &RelativePath, @@ -686,12 +849,21 @@ impl ObjectStorage for S3 { Ok(result?) } + async fn parallel_chunked_download( + &self, + path: &RelativePath, + tenant_id: &Option, + write_path: PathBuf, + ) -> Result<(), ObjectStorageError> { + self._parallel_download(path, tenant_id, write_path).await + } + async fn get_object( &self, path: &RelativePath, tenant_id: &Option, ) -> Result { - Ok(self._get_object(path, tenant_id).await?) + self._get_object(path, tenant_id).await } async fn get_objects( @@ -815,6 +987,12 @@ impl ObjectStorage for S3 { Ok(()) } + #[tracing::instrument( + name = "s3.delete_object", + skip(self), + fields(path = %path, tenant = ?tenant_id), + err + )] async fn delete_object( &self, path: &RelativePath, @@ -839,12 +1017,25 @@ impl ObjectStorage for S3 { Ok(result?) } + #[tracing::instrument( + name = "s3.check", + skip(self), + fields(tenant = ?tenant_id), + err + )] + #[tracing::instrument( + name = "s3.check", + skip(self), + fields(tenant = ?tenant_id, ok = tracing::field::Empty), + err + )] async fn check(&self, tenant_id: &Option) -> Result<(), ObjectStorageError> { let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); let result = self .client .head(&to_object_store_path(&parseable_json_path())) .await; + tracing::Span::current().record("ok", result.is_ok()); increment_object_store_calls_by_date( "HEAD", &Utc::now().date_naive().to_string(), diff --git a/src/sync.rs b/src/sync.rs index a9c5d4485..14a88ab38 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -277,34 +277,42 @@ pub fn local_sync() -> ( /// local and object store sync at the start of the server #[tokio::main(flavor = "current_thread")] pub async fn sync_start() -> anyhow::Result<()> { - // Monitor local sync duration at startup - monitor_task_duration( - "startup_local_sync", - Duration::from_secs(PARSEABLE.options.local_sync_threshold), - || async { - let mut local_sync_joinset = JoinSet::new(); - PARSEABLE - .streams - .flush_and_convert(&mut local_sync_joinset, true, false); - while let Some(res) = local_sync_joinset.join_next().await { - log_join_result(res, "flush and convert"); - } - }, - ) + async { + // Monitor local sync duration at startup + monitor_task_duration( + "startup_local_sync", + Duration::from_secs(PARSEABLE.options.local_sync_threshold), + || async { + let mut local_sync_joinset = JoinSet::new(); + PARSEABLE + .streams + .flush_and_convert(&mut local_sync_joinset, true, false); + while let Some(res) = local_sync_joinset.join_next().await { + log_join_result(res, "flush and convert"); + } + }, + ) + .await; + } + .instrument(info_span!("local_sync_startup")) .await; - // Monitor object store sync duration at startup - monitor_task_duration( - "startup_object_store_sync", - Duration::from_secs(PARSEABLE.options.object_store_sync_threshold), - || async { - let mut object_store_joinset = JoinSet::new(); - sync_all_streams(&mut object_store_joinset); - while let Some(res) = object_store_joinset.join_next().await { - log_join_result(res, "object store sync"); - } - }, - ) + async { + // Monitor object store sync duration at startup + monitor_task_duration( + "startup_object_store_sync", + Duration::from_secs(PARSEABLE.options.object_store_sync_threshold), + || async { + let mut object_store_joinset = JoinSet::new(); + sync_all_streams(&mut object_store_joinset); + while let Some(res) = object_store_joinset.join_next().await { + log_join_result(res, "object store sync"); + } + }, + ) + .await; + } + .instrument(info_span!("object_store_sync_startup")) .await; Ok(()) diff --git a/src/telemetry.rs b/src/telemetry.rs index 4e4d7dfdb..1a2212bda 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -49,7 +49,7 @@ const OTEL_EXPORTER_OTLP_PROTOCOL: &str = "OTEL_EXPORTER_OTLP_PROTOCOL"; /// /// Returns \`None\` when \`OTEL_EXPORTER_OTLP_ENDPOINT\` or `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` is not set (OTEL disabled). /// The caller must call \`provider.shutdown()\` before process exit. -pub fn init_tracing() -> Option { +pub fn init_tracing(service: &'static str) -> Option { // Only used to decide whether OTEL is enabled; the SDK reads it again // from env to build the exporter (which also appends /v1/traces for HTTP). if std::env::var(OTEL_EXPORTER_OTLP_ENDPOINT).is_err() @@ -101,7 +101,7 @@ pub fn init_tracing() -> Option { // migration tables to rewrite attribute names across semconv versions — // so even if upstream semconv drifts, emitted telemetry remains translatable. let resource = Resource::builder_empty() - .with_service_name("parseable") + .with_service_name(service) .with_schema_url( std::iter::empty::(), "https://opentelemetry.io/schemas/1.56.0",