Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
295 changes: 220 additions & 75 deletions src/interactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ use std::io::{self, BufRead, IsTerminal, Write};
#[cfg(unix)]
use std::os::unix::fs::{OpenOptionsExt, PermissionsExt};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};

const ENV_FILE_NAME: &str = ".parseable.env";

/// Guard to avoid parsing `.parseable.env` more than once per process.
static ENV_FILE_LOADED: AtomicBool = AtomicBool::new(false);

/// A required or optional env var that the user may need to provide.
struct EnvPrompt {
env_var: &'static str,
Expand Down Expand Up @@ -62,82 +66,12 @@ pub fn prompt_missing_envs() -> Vec<(String, String)> {
// checks which env vars are already set.
load_env_file();

let prompts = get_env_prompts(&subcommand);
if prompts.is_empty() {
return vec![];
}

// Collect which required envs are still missing after loading the env file
let missing: Vec<&EnvPrompt> = prompts
.iter()
.filter(|p| p.required && std::env::var(p.env_var).is_err())
.collect();

if missing.is_empty() {
return vec![];
}

// Only prompt if stdin is an interactive terminal
if !io::stdin().is_terminal() {
return vec![];
}

println!();
println!(" Missing required environment variable(s) for {subcommand}:");
for m in &missing {
println!(" - {} ({})", m.env_var, m.display_name);
}
println!();
println!(" Starting interactive setup...");
println!();

// Track values collected in this session
let is_interactive = io::stdin().is_terminal();
let mut collected: Vec<(String, String)> = Vec::new();

// Prompt for ALL env vars (required and optional) that are not yet set
for prompt in &prompts {
if std::env::var(prompt.env_var).is_ok() {
continue;
}

let tag = if prompt.required {
"required"
} else {
"optional, press Enter to skip"
};

let value = if prompt.is_secret {
prompt_secret(&format!(
" {} ({}) [{}]: ",
prompt.display_name, prompt.env_var, tag
))
} else {
prompt_line(&format!(
" {} ({}) [{}]: ",
prompt.display_name, prompt.env_var, tag
))
};

let value = value.trim().to_string();

if value.is_empty() {
if prompt.required {
eprintln!(
" Error: {} is required and cannot be empty. Exiting.",
prompt.env_var
);
std::process::exit(1);
}
continue;
}

// SAFETY: This runs single-threaded during startup, before any async
// runtime or additional threads are spawned.
unsafe { std::env::set_var(prompt.env_var, &value) };
collected.push((prompt.env_var.to_string(), value));
}
let prompts = get_env_prompts(&subcommand);
collect_prompts(&prompts, is_interactive, &subcommand, &mut collected);

println!();
collected
}

Expand Down Expand Up @@ -182,7 +116,11 @@ fn env_file_path() -> PathBuf {

/// Loads env vars from `.parseable.env` if it exists.
/// Format: KEY=VALUE per line, # comments and empty lines are skipped.
fn load_env_file() {
/// Safe to call multiple times — the file is only parsed once per process.
pub fn load_env_file() {
if ENV_FILE_LOADED.swap(true, Ordering::Relaxed) {
return;
}
let path = env_file_path();
let file = match std::fs::File::open(&path) {
Ok(f) => f,
Expand Down Expand Up @@ -292,7 +230,7 @@ fn detect_storage_subcommand() -> Option<String> {
}

/// Returns the list of env var prompts for the given storage subcommand,
/// including any conditionally-required groups like OIDC.
/// including any conditionally-required groups like OIDC and mode-specific vars.
fn get_env_prompts(subcommand: &str) -> Vec<EnvPrompt> {
let mut prompts = get_storage_prompts(subcommand);
prompts.extend(get_tls_prompts());
Expand All @@ -302,6 +240,33 @@ fn get_env_prompts(subcommand: &str) -> Vec<EnvPrompt> {
prompts
}

const MODE_ENV: &str = "P_MODE";

/// Detects the server mode from the `P_MODE` env var or the `--mode` CLI arg.
fn detect_mode() -> Option<String> {
if let Ok(mode) = std::env::var(MODE_ENV) {
let mode = mode.to_lowercase();
if mode != "all" {
return Some(mode);
}
return None;
}

let args: Vec<String> = std::env::args().collect();
for (i, arg) in args.iter().enumerate() {
let value = if arg == "--mode" {
args.get(i + 1).map(|v| v.to_lowercase())
} else {
arg.strip_prefix("--mode=").map(|v| v.to_lowercase())
};
if let Some(mode) = value {
return (mode != "all").then_some(mode);
}
}

None
}

/// Returns storage-specific env var prompts for the given subcommand.
fn get_storage_prompts(subcommand: &str) -> Vec<EnvPrompt> {
match subcommand {
Expand Down Expand Up @@ -529,6 +494,186 @@ fn get_kafka_prompts() -> Vec<EnvPrompt> {
prompts
}

/// Checks for missing enterprise-specific environment variables and
/// prompts for them interactively if running in a terminal.
///
/// Must be called **before** `PARSEABLE` is accessed and before license
/// verification, so that `.parseable.env` values are loaded and the user
/// gets a chance to provide missing vars like `P_CLUSTER_SECRET` and
/// the license file paths.
///
/// Returns collected `(env_var, value)` pairs. Call [`save_collected_envs`]
/// after validation succeeds to persist them.
pub fn prompt_enterprise_envs() -> Vec<(String, String)> {
if is_help_or_version_request() {
return vec![];
}

if detect_storage_subcommand().is_none() {
return vec![];
}

// Load previously saved env vars so we pick up P_CLUSTER_SECRET etc.
load_env_file();

let is_interactive = io::stdin().is_terminal();
let mut collected: Vec<(String, String)> = Vec::new();

// Phase 1: base enterprise vars (license, cluster secret, mode)
let base_prompts = get_enterprise_base_prompts();
collect_prompts(
&base_prompts,
is_interactive,
"Parseable Enterprise",
&mut collected,
);

// Phase 2: now P_MODE is in the environment (from env, CLI, .parseable.env,
// or the interactive prompt above), so mode-specific prompts resolve correctly.
let mode_prompts = get_enterprise_mode_prompts();
if !mode_prompts.is_empty() {
let mode = detect_mode().unwrap_or_else(|| "unknown".to_string());
collect_prompts(
&mode_prompts,
is_interactive,
&format!("{mode} mode"),
&mut collected,
);
}

collected
}

/// Collects missing required env vars from the given prompt list.
/// Prints a header and prompts interactively when running in a terminal.
fn collect_prompts(
prompts: &[EnvPrompt],
is_interactive: bool,
context: &str,
collected: &mut Vec<(String, String)>,
) {
let missing: Vec<&EnvPrompt> = prompts
.iter()
.filter(|p| p.required && std::env::var(p.env_var).is_err())
.collect();

if missing.is_empty() {
return;
}

if !is_interactive {
return;
}

println!();
println!(" Missing required environment variable(s) for {context}:");
for m in &missing {
println!(" - {} ({})", m.env_var, m.display_name);
}
println!();
println!(" Starting interactive setup...");
println!();

for prompt in prompts {
if std::env::var(prompt.env_var).is_ok() {
continue;
}

let tag = if prompt.required {
"required"
} else {
"optional, press Enter to skip"
};

let value = if prompt.is_secret {
prompt_secret(&format!(
" {} ({}) [{}]: ",
prompt.display_name, prompt.env_var, tag
))
} else {
prompt_line(&format!(
" {} ({}) [{}]: ",
prompt.display_name, prompt.env_var, tag
))
};

let value = value.trim().to_string();

if value.is_empty() {
if prompt.required {
eprintln!(
" Error: {} is required and cannot be empty. Exiting.",
prompt.env_var
);
std::process::exit(1);
}
continue;
}

// SAFETY: Single-threaded startup, no other threads running.
unsafe { std::env::set_var(prompt.env_var, &value) };
collected.push((prompt.env_var.to_string(), value));
}

println!();
}

/// Returns base enterprise env var prompts (license, cluster secret, mode).
fn get_enterprise_base_prompts() -> Vec<EnvPrompt> {
let mut prompts = vec![
EnvPrompt {
env_var: "P_LICENSE_DATA_FILE_PATH",
display_name: "License Data File Path",
required: true,
is_secret: false,
},
EnvPrompt {
env_var: "P_LICENSE_SIGNATURE_FILE_PATH",
display_name: "License Signature File Path",
required: true,
is_secret: false,
},
EnvPrompt {
env_var: "P_CLUSTER_SECRET",
display_name: "Cluster Secret",
required: true,
is_secret: true,
},
];

// Enterprise rejects "all" mode, so prompt if not explicitly set
if detect_mode().is_none() {
prompts.push(EnvPrompt {
env_var: "P_MODE",
display_name: "Server Mode (query, ingest, index, prism)",
required: true,
is_secret: false,
});
}

prompts
}

/// Returns mode-specific enterprise env var prompts.
/// Called after phase 1, so P_MODE is guaranteed to be in the environment.
fn get_enterprise_mode_prompts() -> Vec<EnvPrompt> {
match detect_mode().as_deref() {
Some("query") => vec![EnvPrompt {
env_var: "P_HOT_TIER_DIR",
display_name: "Hot Tier Directory Path",
required: true,
is_secret: false,
}],
Some("index") => vec![EnvPrompt {
env_var: "P_INDEX_DIR",
display_name: "Index Storage Directory Path",
required: true,
is_secret: false,
}],
_ => vec![],
}
}

/// Prompts the user for a line of input (visible).
fn prompt_line(prompt: &str) -> String {
print!("{prompt}");
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub mod enterprise;
pub mod event;
pub mod handlers;
pub mod hottier;
mod interactive;
pub mod interactive;
mod livetail;
mod metadata;
pub mod metastore;
Expand Down
Loading