Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,50 @@ pub struct Options {
)]
pub hot_tier_storage_path: Option<PathBuf>,

#[arg(
long = "hot-tier-download-chunk-size",
env = "P_HOT_TIER_DOWNLOAD_CHUNK_SIZE",
value_parser = clap::value_parser!(u64).range(5242880..),
default_value = "8388608",
help = "Chunk size in bytes for parallel hot tier downloads (default 8 MiB)"
)]
pub hot_tier_download_chunk_size: u64,

#[arg(
long = "hot-tier-download-concurrency",
env = "P_HOT_TIER_DOWNLOAD_CONCURRENCY",
value_parser = clap::value_parser!(u64).range(1..),
default_value = "16",
help = "Number of concurrent range requests per hot tier download"
)]
pub hot_tier_download_concurrency: u64,
Comment on lines +317 to +333
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the new option types and the downstream `as usize` casts.
sed -n '317,359p' src/cli.rs
sed -n '394,405p' src/storage/s3.rs
sed -n '233,245p' src/storage/gcs.rs
sed -n '267,279p' src/storage/azure_blob.rs

Repository: parseablehq/parseable

Length of output: 3499


Cap these download knobs to usize::MAX for 32-bit builds.

These values are cast to usize in the S3/GCS/Azure ranged-download paths. On 32-bit targets, values above usize::MAX truncate to zero, which causes step_by(0) to panic and creates a zero-permit semaphore that deadlocks.

Suggested fix
     #[arg(
         long = "hot-tier-download-chunk-size",
         env = "P_HOT_TIER_DOWNLOAD_CHUNK_SIZE",
-        value_parser = clap::value_parser!(u64).range(5242880..),
+        value_parser = clap::value_parser!(u64).range(5242880..=(usize::MAX as u64)),
         default_value = "8388608",
         help = "Chunk size in bytes for parallel hot tier downloads (default 8 MiB)"
     )]
     pub hot_tier_download_chunk_size: u64,

     #[arg(
         long = "hot-tier-download-concurrency",
         env = "P_HOT_TIER_DOWNLOAD_CONCURRENCY",
-        value_parser = clap::value_parser!(u64).range(1..),
+        value_parser = clap::value_parser!(u64).range(1..=(usize::MAX as u64)),
         default_value = "16",
         help = "Number of concurrent range requests per hot tier download"
     )]
     pub hot_tier_download_concurrency: u64,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[arg(
long = "hot-tier-download-chunk-size",
env = "P_HOT_TIER_DOWNLOAD_CHUNK_SIZE",
value_parser = clap::value_parser!(u64).range(5242880..),
default_value = "8388608",
help = "Chunk size in bytes for parallel hot tier downloads (default 8 MiB)"
)]
pub hot_tier_download_chunk_size: u64,
#[arg(
long = "hot-tier-download-concurrency",
env = "P_HOT_TIER_DOWNLOAD_CONCURRENCY",
value_parser = clap::value_parser!(u64).range(1..),
default_value = "16",
help = "Number of concurrent range requests per hot tier download"
)]
pub hot_tier_download_concurrency: u64,
#[arg(
long = "hot-tier-download-chunk-size",
env = "P_HOT_TIER_DOWNLOAD_CHUNK_SIZE",
value_parser = clap::value_parser!(u64).range(5242880..=(usize::MAX as u64)),
default_value = "8388608",
help = "Chunk size in bytes for parallel hot tier downloads (default 8 MiB)"
)]
pub hot_tier_download_chunk_size: u64,
#[arg(
long = "hot-tier-download-concurrency",
env = "P_HOT_TIER_DOWNLOAD_CONCURRENCY",
value_parser = clap::value_parser!(u64).range(1..=(usize::MAX as u64)),
default_value = "16",
help = "Number of concurrent range requests per hot tier download"
)]
pub hot_tier_download_concurrency: u64,
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cli.rs` around lines 317 - 333, hot_tier_download_chunk_size and
hot_tier_download_concurrency are u64 but later get cast to usize for
ranged-downloads; on 32-bit targets values larger than usize::MAX wrap/truncate
and can produce zero-step or zero-permit semaphores. Fix by capping these u64
CLI values to usize::MAX before any cast (e.g., when converting to usize in
S3/GCS/Azure ranged-download code or immediately after parsing), so use
value.min(usize::MAX as u64) (or equivalent clamp) prior to usize::try_from()/as
to ensure safe non-zero values for step_by and semaphore creation; reference the
hot_tier_download_chunk_size and hot_tier_download_concurrency symbols when
applying the cap.


#[arg(
long = "hot-tier-files-per-stream-concurrency",
env = "P_HOT_TIER_FILES_PER_STREAM_CONCURRENCY",
default_value = "4",
help = "Number of concurrent parquet file downloads per stream during hot tier sync"
)]
pub hot_tier_files_per_stream_concurrency: usize,
Comment on lines +335 to +341
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject 0 for hot_tier_files_per_stream_concurrency.

This is the only new hot-tier concurrency knob that still accepts 0 from env/CLI. 0 is not a valid concurrency setting and will leave downstream hot-tier work with an invalid limit.

Suggested fix
     #[arg(
         long = "hot-tier-files-per-stream-concurrency",
         env = "P_HOT_TIER_FILES_PER_STREAM_CONCURRENCY",
+        value_parser = clap::value_parser!(usize).range(1..),
         default_value = "4",
         help = "Number of concurrent parquet file downloads per stream during hot tier sync"
     )]
     pub hot_tier_files_per_stream_concurrency: usize,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[arg(
long = "hot-tier-files-per-stream-concurrency",
env = "P_HOT_TIER_FILES_PER_STREAM_CONCURRENCY",
default_value = "4",
help = "Number of concurrent parquet file downloads per stream during hot tier sync"
)]
pub hot_tier_files_per_stream_concurrency: usize,
#[arg(
long = "hot-tier-files-per-stream-concurrency",
env = "P_HOT_TIER_FILES_PER_STREAM_CONCURRENCY",
value_parser = clap::value_parser!(usize).range(1..),
default_value = "4",
help = "Number of concurrent parquet file downloads per stream during hot tier sync"
)]
pub hot_tier_files_per_stream_concurrency: usize,
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/cli.rs` around lines 335 - 341, The CLI accepts 0 for
hot_tier_files_per_stream_concurrency which is invalid; update the argument
validation for hot_tier_files_per_stream_concurrency so values < 1 are rejected
(e.g., use a clap value parser or custom validator to enforce range >= 1) and
provide a clear error message explaining that the concurrency must be at least
1; adjust any parsing logic that reads hot_tier_files_per_stream_concurrency to
assume it will never be 0.


#[arg(
long = "hot-tier-latest-minutes",
env = "P_HOT_TIER_LATEST_MINUTES",
value_parser = clap::value_parser!(u64).range(1..),
default_value = "10",
help = "Files whose timestamp is within the last N minutes are 'latest'; rest are 'historic'."
)]
pub hot_tier_latest_minutes: u64,

#[arg(
long = "hot-tier-historic-sync-minutes",
env = "P_HOT_TIER_HISTORIC_SYNC_MINUTES",
value_parser = clap::value_parser!(u32).range(1..),
default_value = "5",
help = "Interval (minutes) at which the historic hot-tier sync runs."
)]
pub hot_tier_historic_sync_minutes: u32,
Comment thread
coderabbitai[bot] marked this conversation as resolved.

//TODO: remove this when smart cache is implemented
#[arg(
long = "index-storage-path",
Expand Down
28 changes: 27 additions & 1 deletion src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,21 @@ pub async fn get_stream_info(
Ok((web::Json(stream_info), StatusCode::OK))
}

#[tracing::instrument(
name = "http.put_stream_hot_tier",
skip(req, logstream, hottier),
fields(stream = tracing::field::Empty, tenant = tracing::field::Empty, size = hottier.size)
)]
pub async fn put_stream_hot_tier(
req: HttpRequest,
logstream: Path<String>,
Json(mut hottier): Json<StreamHotTier>,
) -> Result<impl Responder, StreamError> {
let stream_name = logstream.into_inner();
let tenant_id = get_tenant_id_from_request(&req);
tracing::Span::current()
.record("stream", tracing::field::display(&stream_name))
.record("tenant", tracing::field::debug(&tenant_id));
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
Expand Down Expand Up @@ -469,19 +477,29 @@ pub async fn put_stream_hot_tier(
.metastore
.put_stream_json(&stream_metadata, &stream_name, &tenant_id)
.await?;

hot_tier_manager
.spawn_stream_tasks(stream_name.clone(), tenant_id.clone())
.await;
Ok((
format!("hot tier set for stream {stream_name}"),
StatusCode::OK,
))
}

#[tracing::instrument(
name = "http.get_stream_hot_tier",
skip(req, logstream),
fields(stream = tracing::field::Empty, tenant = tracing::field::Empty)
)]
pub async fn get_stream_hot_tier(
req: HttpRequest,
logstream: Path<String>,
) -> Result<impl Responder, StreamError> {
let stream_name = logstream.into_inner();
let tenant_id = get_tenant_id_from_request(&req);
tracing::Span::current()
.record("stream", tracing::field::display(&stream_name))
.record("tenant", tracing::field::debug(&tenant_id));
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
Expand All @@ -502,12 +520,20 @@ pub async fn get_stream_hot_tier(
Ok((web::Json(meta), StatusCode::OK))
}

#[tracing::instrument(
name = "http.delete_stream_hot_tier",
skip(req, logstream),
fields(stream = tracing::field::Empty, tenant = tracing::field::Empty)
)]
pub async fn delete_stream_hot_tier(
req: HttpRequest,
logstream: Path<String>,
) -> Result<impl Responder, StreamError> {
let stream_name = logstream.into_inner();
let tenant_id = get_tenant_id_from_request(&req);
tracing::Span::current()
.record("stream", tracing::field::display(&stream_name))
.record("tenant", tracing::field::debug(&tenant_id));
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
Expand Down
6 changes: 6 additions & 0 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ pub trait ParseableServer {
// Shutdown resource monitor
let _ = resource_shutdown_tx.send(());

// Shutdown hottier
if let Some(ht_global) = HotTierManager::global() {
ht_global.abort_all().await;
}

// Initiate graceful shutdown
info!("Graceful shutdown of HTTP server triggered");
srv_handle.stop(true).await;
Expand Down Expand Up @@ -626,6 +631,7 @@ pub type PrismMetadata = NodeMetadata;
/// Initialize hot tier metadata files for streams that have hot tier configuration
/// in their stream metadata but don't have local hot tier metadata files yet.
/// This function is called once during query server startup.
#[tracing::instrument(name = "hottier.init_metadata_startup", skip(hot_tier_manager))]
pub async fn initialize_hot_tier_metadata_on_startup(
hot_tier_manager: &HotTierManager,
) -> anyhow::Result<()> {
Expand Down
Loading
Loading