From 312224b6334c3664c18228a9a72d5a36d00cf5cb Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 12:49:20 +0200 Subject: [PATCH 01/25] refactor(app): scaffold dereference and validate submodules Empty modules with snafu Error stubs. Wires up `mod` declarations so later commits can fill them in without further plumbing changes. Co-Authored-By: Claude Opus 4.7 (1M context) --- rust/operator-binary/src/spark_k8s_controller.rs | 3 +++ .../src/spark_k8s_controller/dereference.rs | 12 ++++++++++++ .../src/spark_k8s_controller/validate.rs | 11 +++++++++++ 3 files changed, 26 insertions(+) create mode 100644 rust/operator-binary/src/spark_k8s_controller/dereference.rs create mode 100644 rust/operator-binary/src/spark_k8s_controller/validate.rs diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index 7bc059ea..fd25092a 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -66,6 +66,9 @@ use crate::{ product_logging::{self}, }; +pub mod dereference; +pub mod validate; + #[derive(Snafu, Debug, EnumDiscriminants)] #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] diff --git a/rust/operator-binary/src/spark_k8s_controller/dereference.rs b/rust/operator-binary/src/spark_k8s_controller/dereference.rs new file mode 100644 index 00000000..f2d9654d --- /dev/null +++ b/rust/operator-binary/src/spark_k8s_controller/dereference.rs @@ -0,0 +1,12 @@ +//! The dereference step in the SparkApplication controller. +//! +//! Fetches all Kubernetes objects referenced by the SparkApplication spec (templates, S3 +//! connection, log directory) and returns them in [`DereferencedSparkApplication`]. +//! Synchronous validation belongs in the sibling [`super::validate`] module. + +use snafu::Snafu; + +#[derive(Snafu, Debug)] +pub enum Error {} + +type Result = std::result::Result; diff --git a/rust/operator-binary/src/spark_k8s_controller/validate.rs b/rust/operator-binary/src/spark_k8s_controller/validate.rs new file mode 100644 index 00000000..611945f2 --- /dev/null +++ b/rust/operator-binary/src/spark_k8s_controller/validate.rs @@ -0,0 +1,11 @@ +//! The validate step in the SparkApplication controller. +//! +//! Synchronously validates the [`super::dereference::DereferencedSparkApplication`] and +//! resolves the product image and product config. Does not touch the Kubernetes API. + +use snafu::Snafu; + +#[derive(Snafu, Debug)] +pub enum Error {} + +type Result = std::result::Result; From c2cbddc2ea307a6beaa974bb1ace7faa06db364a Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 12:52:22 +0200 Subject: [PATCH 02/25] refactor(app): add dereference module with template/S3/logdir fetch Defines DereferencedSparkApplication and an async dereference() function that consolidates template merging, S3 connection resolution, the early S3 TLS sanity check, and log directory resolution. Not yet wired into reconcile(). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/spark_k8s_controller/dereference.rs | 105 +++++++++++++++++- 1 file changed, 103 insertions(+), 2 deletions(-) diff --git a/rust/operator-binary/src/spark_k8s_controller/dereference.rs b/rust/operator-binary/src/spark_k8s_controller/dereference.rs index f2d9654d..62e5dec7 100644 --- a/rust/operator-binary/src/spark_k8s_controller/dereference.rs +++ b/rust/operator-binary/src/spark_k8s_controller/dereference.rs @@ -4,9 +4,110 @@ //! connection, log directory) and returns them in [`DereferencedSparkApplication`]. //! Synchronous validation belongs in the sibling [`super::validate`] module. -use snafu::Snafu; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + client::Client, + commons::tls_verification::{CaCert, TlsVerification}, + crd::s3, +}; + +use crate::crd::{ + logdir::ResolvedLogDir, + template_spec::{self}, + v1alpha1, +}; #[derive(Snafu, Debug)] -pub enum Error {} +pub enum Error { + #[snafu(display("failed to merge application templates"))] + MergeApplicationTemplates { source: template_spec::Error }, + + #[snafu(display("failed to configure S3 connection"))] + ConfigureS3Connection { + source: stackable_operator::crd::s3::v1alpha1::ConnectionError, + }, + + #[snafu(display("S3 TLS verification with no verification is not supported"))] + S3TlsNoVerificationNotSupported, + + #[snafu(display("failed to resolve log directory"))] + LogDir { source: crate::crd::logdir::Error }, + + #[snafu(display("object has no namespace"))] + ObjectHasNoNamespace, +} type Result = std::result::Result; + +/// Kubernetes objects referenced from a SparkApplication, already fetched. +pub struct DereferencedSparkApplication { + /// SparkApplication after merging any referenced templates. + pub spark_application: v1alpha1::SparkApplication, + /// Resolved template references for status reporting. + pub resolved_template_refs: Vec, + /// Resolved S3 connection, if `spec.s3connection` is set. + pub s3_connection: Option, + /// Resolved log directory, if `spec.log_file_directory` is set. + pub log_dir: Option, +} + +/// Fetches all Kubernetes objects referenced from the given SparkApplication. +pub async fn dereference( + client: &Client, + spark_application: &v1alpha1::SparkApplication, +) -> Result { + // 1. Template merging — must happen first so subsequent lookups see the merged spec. + let merged = template_spec::merge_application_templates(client, spark_application) + .await + .context(MergeApplicationTemplatesSnafu)?; + let merged_app = merged.app.unwrap_or_else(|| spark_application.clone()); + let resolved_template_refs = merged.resolved_template_ref; + + let namespace = merged_app + .metadata + .namespace + .as_deref() + .ok_or(Error::ObjectHasNoNamespace)?; + + // 2. S3 connection. + let s3_connection = match merged_app.spec.s3connection.as_ref() { + Some(s3bd) => Some( + s3bd.clone() + .resolve(client, namespace) + .await + .context(ConfigureS3ConnectionSnafu)?, + ), + None => None, + }; + + // Early "no verification" rejection — preserves today's behavior (was inline at + // spark_k8s_controller.rs:278–290 before this refactor). + if let Some(conn) = s3_connection.as_ref() { + if let Some(tls) = &conn.tls.tls { + match &tls.verification { + TlsVerification::None {} => return S3TlsNoVerificationNotSupportedSnafu.fail(), + TlsVerification::Server(server_verification) => match &server_verification.ca_cert { + CaCert::WebPki {} => {} + CaCert::SecretClass(_) => {} + }, + } + } + } + + // 3. Log directory (also pulls S3Bucket + TLS secret internally). + let log_dir = match merged_app.spec.log_file_directory.as_ref() { + Some(log_file_dir) => Some( + ResolvedLogDir::resolve(log_file_dir, merged_app.metadata.namespace.clone(), client) + .await + .context(LogDirSnafu)?, + ), + None => None, + }; + + Ok(DereferencedSparkApplication { + spark_application: merged_app, + resolved_template_refs, + s3_connection, + log_dir, + }) +} From a7ae7996fc76019d751e6648a7bc4572f489edd1 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 12:58:43 +0200 Subject: [PATCH 03/25] refactor(app): add validate module with image + product-config validation Defines ValidatedSparkApplication and a sync validate() function that resolves the product image (moved from reconcile()) and runs validated_role_config. Not yet wired into reconcile(). Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/spark_k8s_controller/validate.rs | 58 ++++++++++++++++++- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/rust/operator-binary/src/spark_k8s_controller/validate.rs b/rust/operator-binary/src/spark_k8s_controller/validate.rs index 611945f2..48c61a9a 100644 --- a/rust/operator-binary/src/spark_k8s_controller/validate.rs +++ b/rust/operator-binary/src/spark_k8s_controller/validate.rs @@ -3,9 +3,63 @@ //! Synchronously validates the [`super::dereference::DereferencedSparkApplication`] and //! resolves the product image and product config. Does not touch the Kubernetes API. -use snafu::Snafu; +use product_config::ProductConfigManager; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + cli::OperatorEnvironmentOptions, + commons::product_image_selection::{self, ResolvedProductImage}, + product_config_utils::ValidatedRoleConfigByPropertyKind, +}; + +use crate::{ + crd::constants::CONTAINER_IMAGE_BASE_NAME, + spark_k8s_controller::dereference::DereferencedSparkApplication, +}; #[derive(Snafu, Debug)] -pub enum Error {} +pub enum Error { + #[snafu(display("failed to resolve product image"))] + ResolveProductImage { + source: product_image_selection::Error, + }, + + #[snafu(display("invalid product config"))] + InvalidProductConfig { source: crate::crd::Error }, +} type Result = std::result::Result; + +/// Inputs the rest of `reconcile` needs after dereferencing. +pub struct ValidatedSparkApplication { + pub dereferenced: DereferencedSparkApplication, + pub resolved_product_image: ResolvedProductImage, + pub product_config: ValidatedRoleConfigByPropertyKind, +} + +pub fn validate( + dereferenced: DereferencedSparkApplication, + operator_environment: &OperatorEnvironmentOptions, + product_config: &ProductConfigManager, +) -> Result { + let resolved_product_image = dereferenced + .spark_application + .spec + .spark_image + .resolve( + CONTAINER_IMAGE_BASE_NAME, + &operator_environment.image_repository, + crate::built_info::PKG_VERSION, + ) + .context(ResolveProductImageSnafu)?; + + let product_config = dereferenced + .spark_application + .validated_role_config(&resolved_product_image, product_config) + .context(InvalidProductConfigSnafu)?; + + Ok(ValidatedSparkApplication { + dereferenced, + resolved_product_image, + product_config, + }) +} From 8b60c770439e68829d7c699344e3e58c396008d7 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 13:05:59 +0200 Subject: [PATCH 04/25] refactor(app): wire reconcile through dereference + validate reconcile() now delegates template merging, S3 resolution, TLS sanity checking, log directory resolution, image resolution, and product-config validation to the new submodules. The function body shrinks by ~70 lines and the controller's Error enum loses six variants now living in the submodules. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/spark_k8s_controller.rs | 143 +++++------------- 1 file changed, 36 insertions(+), 107 deletions(-) diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index fd25092a..246498bb 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -16,10 +16,7 @@ use stackable_operator::{ volume::VolumeBuilder, }, }, - commons::{ - product_image_selection::{self, ResolvedProductImage}, - tls_verification::{CaCert, TlsVerification}, - }, + commons::product_image_selection::ResolvedProductImage, crd::s3, k8s_openapi::{ DeepMerge, Resource, @@ -39,7 +36,6 @@ use stackable_operator::{ }, kvp::Label, logging::controller::ReconcilerError, - product_config_utils::ValidatedRoleConfigByPropertyKind, product_logging::{ framework::{ LoggingError, capture_shell_output, create_vector_shutdown_file_command, @@ -73,10 +69,11 @@ pub mod validate; #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { - #[snafu(display("failed to merge application templates"))] - MergeApplicationTemplates { - source: crate::crd::template_spec::Error, - }, + #[snafu(display("failed to dereference SparkApplication"))] + DereferenceSparkApplication { source: dereference::Error }, + + #[snafu(display("failed to validate SparkApplication"))] + ValidateSparkApplication { source: validate::Error }, #[snafu(display("missing secret lifetime"))] MissingSecretLifetime, @@ -120,14 +117,6 @@ pub enum Error { source: stackable_operator::crd::s3::v1alpha1::BucketError, }, - #[snafu(display("failed to configure S3 connection"))] - ConfigureS3Connection { - source: stackable_operator::crd::s3::v1alpha1::ConnectionError, - }, - - #[snafu(display("tls non-verification not supported"))] - S3TlsNoVerificationNotSupported, - #[snafu(display("ca-cert verification not supported"))] S3TlsCaVerificationNotSupported, @@ -142,9 +131,6 @@ pub enum Error { source: stackable_operator::builder::pod::container::Error, }, - #[snafu(display("failed to resolve the log dir configuration"))] - LogDir { source: crate::crd::logdir::Error }, - #[snafu(display("vector agent is enabled but vector aggregator ConfigMap is missing"))] VectorAggregatorConfigMapMissing, @@ -168,9 +154,6 @@ pub enum Error { source: stackable_operator::product_config_utils::Error, }, - #[snafu(display("invalid product config"))] - InvalidProductConfig { source: crate::crd::Error }, - #[snafu(display("invalid submit config"))] SubmitConfig { source: crate::crd::Error }, @@ -211,11 +194,6 @@ pub enum Error { InvalidSparkApplication { source: error_boundary::InvalidObject, }, - - #[snafu(display("failed to resolve product image"))] - ResolveProductImage { - source: product_image_selection::Error, - }, } type Result = std::result::Result; @@ -250,78 +228,29 @@ pub async fn reconcile( // It is important to do this at the top of the reconciliation function to ensure // all referenced resources and configuration are merged before any of them are created. - let merged_template_result = - &crate::crd::template_spec::merge_application_templates(client, spark_application) - .await - .context(MergeApplicationTemplatesSnafu)?; - let spark_application = match &merged_template_result.app { - Some(app) => app, - None => spark_application, - }; + let dereferenced = dereference::dereference(client, spark_application) + .await + .context(DereferenceSparkApplicationSnafu)?; + + let validated = validate::validate( + dereferenced, + &ctx.operator_environment, + &ctx.product_config, + ) + .context(ValidateSparkApplicationSnafu)?; + + let spark_application = &validated.dereferenced.spark_application; + let opt_s3conn = &validated.dereferenced.s3_connection; + let logdir = &validated.dereferenced.log_dir; + let resolved_product_image = &validated.resolved_product_image; + let validated_product_config = &validated.product_config; // This is the final version of the spark app to reconcile. // No more mutating operations after this point (except for status). tracing::debug!("reconciling spark application [{spark_application:?}]"); - let opt_s3conn = match spark_application.spec.s3connection.as_ref() { - Some(s3bd) => Some( - s3bd.clone() - .resolve( - client, - // TODO (@NickLarsenNZ): Explain this unwrap. Either convert to expect, or gracefully handle the error. - spark_application.metadata.namespace.as_deref().unwrap(), - ) - .await - .context(ConfigureS3ConnectionSnafu)?, - ), - _ => None, - }; - - // check early for valid verification options - if let Some(conn) = opt_s3conn.as_ref() { - if let Some(tls) = &conn.tls.tls { - match &tls.verification { - TlsVerification::None {} => return S3TlsNoVerificationNotSupportedSnafu.fail(), - TlsVerification::Server(server_verification) => { - match &server_verification.ca_cert { - CaCert::WebPki {} => {} - CaCert::SecretClass(_) => {} - } - } - } - } - } - - let logdir = if let Some(log_file_dir) = &spark_application.spec.log_file_directory { - Some( - ResolvedLogDir::resolve( - log_file_dir, - spark_application.metadata.namespace.clone(), - client, - ) - .await - .context(LogDirSnafu)?, - ) - } else { - None - }; - - let resolved_product_image = spark_application - .spec - .spark_image - .resolve( - CONTAINER_IMAGE_BASE_NAME, - &ctx.operator_environment.image_repository, - crate::built_info::PKG_VERSION, - ) - .context(ResolveProductImageSnafu)?; - - let validated_product_config: ValidatedRoleConfigByPropertyKind = spark_application - .validated_role_config(&resolved_product_image, &ctx.product_config) - .context(InvalidProductConfigSnafu)?; - let (serviceaccount, rolebinding) = - build_spark_role_serviceaccount(spark_application, &resolved_product_image)?; + build_spark_role_serviceaccount(spark_application, resolved_product_image)?; client .apply_patch(SPARK_CONTROLLER_NAME, &serviceaccount, &serviceaccount) .await @@ -331,7 +260,7 @@ pub async fn reconcile( .await .context(ApplyRoleBindingSnafu)?; - let env_vars = spark_application.env(&opt_s3conn, &logdir); + let env_vars = spark_application.env(opt_s3conn, logdir); let driver_config = spark_application .driver_config() @@ -348,9 +277,9 @@ pub async fn reconcile( &driver_config, driver_product_config, &env_vars, - &opt_s3conn, - &logdir, - &resolved_product_image, + opt_s3conn, + logdir, + resolved_product_image, &serviceaccount, )?; client @@ -377,9 +306,9 @@ pub async fn reconcile( &executor_config, executor_product_config, &env_vars, - &opt_s3conn, - &logdir, - &resolved_product_image, + opt_s3conn, + logdir, + resolved_product_image, &serviceaccount, )?; client @@ -392,7 +321,7 @@ pub async fn reconcile( .context(ApplyApplicationSnafu)?; let job_commands = spark_application - .build_command(&opt_s3conn, &logdir, &resolved_product_image.image) + .build_command(opt_s3conn, logdir, &resolved_product_image.image) .context(BuildCommandSnafu)?; let submit_config = spark_application @@ -407,7 +336,7 @@ pub async fn reconcile( let submit_job_config_map = submit_job_config_map( spark_application, submit_product_config, - &resolved_product_image, + resolved_product_image, )?; client .apply_patch( @@ -420,12 +349,12 @@ pub async fn reconcile( let job = spark_job( spark_application, - &resolved_product_image, + resolved_product_image, &serviceaccount, &env_vars, &job_commands, - &opt_s3conn, - &logdir, + opt_s3conn, + logdir, &submit_config, )?; client @@ -442,7 +371,7 @@ pub async fn reconcile( spark_application, &v1alpha1::SparkApplicationStatus { phase: "Unknown".to_string(), - resolved_template_ref: merged_template_result.resolved_template_ref.clone(), + resolved_template_ref: validated.dereferenced.resolved_template_refs.clone(), }, ) .await From 746a3ca3d4af51f538b410b57be0905df2e49c46 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 13:20:14 +0200 Subject: [PATCH 05/25] chore(history): rename history_controller.rs to controller.rs Aligns history with connect (which already uses controller.rs) ahead of splitting it into a controller/ submodule folder. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/history/{history_controller.rs => controller.rs} | 0 rust/operator-binary/src/history/mod.rs | 2 +- rust/operator-binary/src/main.rs | 6 +++--- 3 files changed, 4 insertions(+), 4 deletions(-) rename rust/operator-binary/src/history/{history_controller.rs => controller.rs} (100%) diff --git a/rust/operator-binary/src/history/history_controller.rs b/rust/operator-binary/src/history/controller.rs similarity index 100% rename from rust/operator-binary/src/history/history_controller.rs rename to rust/operator-binary/src/history/controller.rs diff --git a/rust/operator-binary/src/history/mod.rs b/rust/operator-binary/src/history/mod.rs index f0f0bf2b..a8b727c6 100644 --- a/rust/operator-binary/src/history/mod.rs +++ b/rust/operator-binary/src/history/mod.rs @@ -5,7 +5,7 @@ use crate::crd::constants::{ }; pub mod config; -pub mod history_controller; +pub mod controller; pub mod operations; pub mod service; diff --git a/rust/operator-binary/src/main.rs b/rust/operator-binary/src/main.rs index 5f6024fb..6ae07183 100644 --- a/rust/operator-binary/src/main.rs +++ b/rust/operator-binary/src/main.rs @@ -7,7 +7,7 @@ use anyhow::anyhow; use clap::Parser; use connect::crd::{CONNECT_FULL_CONTROLLER_NAME, SparkConnectServer}; use futures::{FutureExt, StreamExt, TryFutureExt}; -use history::history_controller; +use history::controller; use product_config::ProductConfigManager; use stackable_operator::{ YamlSchema, @@ -272,8 +272,8 @@ async fn main() -> anyhow::Result<()> { ) .graceful_shutdown_on(sigterm_watcher.handle()) .run( - history_controller::reconcile, - history_controller::error_policy, + controller::reconcile, + controller::error_policy, Arc::new(ctx), ) .instrument(info_span!("history_controller")) From aba4a07b549c6a52a3d85d31c6f95ad2326e04ac Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 13:21:40 +0200 Subject: [PATCH 06/25] refactor(history): scaffold dereference and validate submodules Co-Authored-By: Claude Opus 4.7 (1M context) --- rust/operator-binary/src/history/controller.rs | 3 +++ .../operator-binary/src/history/controller/dereference.rs | 8 ++++++++ rust/operator-binary/src/history/controller/validate.rs | 8 ++++++++ 3 files changed, 19 insertions(+) create mode 100644 rust/operator-binary/src/history/controller/dereference.rs create mode 100644 rust/operator-binary/src/history/controller/validate.rs diff --git a/rust/operator-binary/src/history/controller.rs b/rust/operator-binary/src/history/controller.rs index a5401fd8..2a755fa6 100644 --- a/rust/operator-binary/src/history/controller.rs +++ b/rust/operator-binary/src/history/controller.rs @@ -78,6 +78,9 @@ use crate::{ product_logging::{self}, }; +pub mod dereference; +pub mod validate; + #[derive(Snafu, Debug, EnumDiscriminants)] #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] diff --git a/rust/operator-binary/src/history/controller/dereference.rs b/rust/operator-binary/src/history/controller/dereference.rs new file mode 100644 index 00000000..31085e4e --- /dev/null +++ b/rust/operator-binary/src/history/controller/dereference.rs @@ -0,0 +1,8 @@ +//! The dereference step in the SparkHistoryServer controller. + +use snafu::Snafu; + +#[derive(Snafu, Debug)] +pub enum Error {} + +type Result = std::result::Result; diff --git a/rust/operator-binary/src/history/controller/validate.rs b/rust/operator-binary/src/history/controller/validate.rs new file mode 100644 index 00000000..0dee50b0 --- /dev/null +++ b/rust/operator-binary/src/history/controller/validate.rs @@ -0,0 +1,8 @@ +//! The validate step in the SparkHistoryServer controller. + +use snafu::Snafu; + +#[derive(Snafu, Debug)] +pub enum Error {} + +type Result = std::result::Result; From f343b863114b22a6502e8ee853057d8d4acefdda Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 13:23:31 +0200 Subject: [PATCH 07/25] refactor(history): add dereference module with log directory fetch Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/history/controller/dereference.rs | 33 +++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/rust/operator-binary/src/history/controller/dereference.rs b/rust/operator-binary/src/history/controller/dereference.rs index 31085e4e..046f1d55 100644 --- a/rust/operator-binary/src/history/controller/dereference.rs +++ b/rust/operator-binary/src/history/controller/dereference.rs @@ -1,8 +1,37 @@ //! The dereference step in the SparkHistoryServer controller. +//! +//! Fetches the resolved log directory (including the underlying S3 connection/bucket and +//! TLS secret if any) and returns it in [`DereferencedSparkHistoryServer`]. -use snafu::Snafu; +use snafu::{ResultExt, Snafu}; +use stackable_operator::client::Client; + +use crate::crd::{history::v1alpha1, logdir::ResolvedLogDir}; #[derive(Snafu, Debug)] -pub enum Error {} +pub enum Error { + #[snafu(display("failed to resolve log directory"))] + LogDir { source: crate::crd::logdir::Error }, +} type Result = std::result::Result; + +/// Kubernetes objects referenced from a SparkHistoryServer, already fetched. +pub struct DereferencedSparkHistoryServer { + pub log_dir: ResolvedLogDir, +} + +pub async fn dereference( + client: &Client, + shs: &v1alpha1::SparkHistoryServer, +) -> Result { + let log_dir = ResolvedLogDir::resolve( + &shs.spec.log_file_directory, + shs.metadata.namespace.clone(), + client, + ) + .await + .context(LogDirSnafu)?; + + Ok(DereferencedSparkHistoryServer { log_dir }) +} From 730c61ce003afaf4c2368722cff00e175d9e8a88 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 13:33:37 +0200 Subject: [PATCH 08/25] refactor(history): add validate module with image + product-config validation Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/history/controller/validate.rs | 59 ++++++++++++++++++- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/rust/operator-binary/src/history/controller/validate.rs b/rust/operator-binary/src/history/controller/validate.rs index 0dee50b0..351b6f83 100644 --- a/rust/operator-binary/src/history/controller/validate.rs +++ b/rust/operator-binary/src/history/controller/validate.rs @@ -1,8 +1,63 @@ //! The validate step in the SparkHistoryServer controller. +//! +//! Resolves the product image and runs role/role-group config validation. +//! Does not touch the Kubernetes API. -use snafu::Snafu; +use product_config::ProductConfigManager; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + cli::OperatorEnvironmentOptions, + commons::product_image_selection::{self, ResolvedProductImage}, + product_config_utils::ValidatedRoleConfigByPropertyKind, +}; + +use crate::{ + crd::{constants::CONTAINER_IMAGE_BASE_NAME, history::v1alpha1}, + history::controller::dereference::DereferencedSparkHistoryServer, +}; #[derive(Snafu, Debug)] -pub enum Error {} +pub enum Error { + #[snafu(display("failed to resolve product image"))] + ResolveProductImage { + source: product_image_selection::Error, + }, + + #[snafu(display("invalid product config"))] + InvalidProductConfig { source: crate::crd::history::Error }, +} type Result = std::result::Result; + +pub struct ValidatedSparkHistoryServer { + pub dereferenced: DereferencedSparkHistoryServer, + pub resolved_product_image: ResolvedProductImage, + pub product_config: ValidatedRoleConfigByPropertyKind, +} + +pub fn validate( + shs: &v1alpha1::SparkHistoryServer, + dereferenced: DereferencedSparkHistoryServer, + operator_environment: &OperatorEnvironmentOptions, + product_config: &ProductConfigManager, +) -> Result { + let resolved_product_image = shs + .spec + .image + .resolve( + CONTAINER_IMAGE_BASE_NAME, + &operator_environment.image_repository, + crate::built_info::PKG_VERSION, + ) + .context(ResolveProductImageSnafu)?; + + let product_config_validated = shs + .validated_role_config(&resolved_product_image, product_config) + .context(InvalidProductConfigSnafu)?; + + Ok(ValidatedSparkHistoryServer { + dereferenced, + resolved_product_image, + product_config: product_config_validated, + }) +} From 8210bb14988f33ac192d06151b799e8b24af2f92 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 13:37:07 +0200 Subject: [PATCH 09/25] refactor(history): wire reconcile through dereference + validate Co-Authored-By: Claude Opus 4.7 (1M context) --- .../operator-binary/src/history/controller.rs | 79 ++++++++----------- 1 file changed, 33 insertions(+), 46 deletions(-) diff --git a/rust/operator-binary/src/history/controller.rs b/rust/operator-binary/src/history/controller.rs index 2a755fa6..bf3bf163 100644 --- a/rust/operator-binary/src/history/controller.rs +++ b/rust/operator-binary/src/history/controller.rs @@ -21,10 +21,7 @@ use stackable_operator::{ }, }, cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, - commons::{ - product_image_selection::{self, ResolvedProductImage}, - rbac::build_rbac_resources, - }, + commons::{product_image_selection::ResolvedProductImage, rbac::build_rbac_resources}, crd::listener, k8s_openapi::{ DeepMerge, @@ -57,13 +54,12 @@ use crate::{ Ctx, crd::{ constants::{ - ACCESS_KEY_ID, CONTAINER_IMAGE_BASE_NAME, HISTORY_APP_NAME, HISTORY_CONTROLLER_NAME, - HISTORY_UI_PORT, JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, - LISTENER_VOLUME_NAME, MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, - SECRET_ACCESS_KEY, SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, - STACKABLE_TRUST_STORE, VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, - VOLUME_MOUNT_NAME_LOG_CONFIG, VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, - VOLUME_MOUNT_PATH_LOG_CONFIG, + ACCESS_KEY_ID, HISTORY_APP_NAME, HISTORY_CONTROLLER_NAME, HISTORY_UI_PORT, + JVM_SECURITY_PROPERTIES_FILE, LISTENER_VOLUME_DIR, LISTENER_VOLUME_NAME, + MAX_SPARK_LOG_FILES_SIZE, METRICS_PORT, OPERATOR_NAME, SECRET_ACCESS_KEY, + SPARK_DEFAULTS_FILE_NAME, SPARK_ENV_SH_FILE_NAME, STACKABLE_TRUST_STORE, + VOLUME_MOUNT_NAME_CONFIG, VOLUME_MOUNT_NAME_LOG, VOLUME_MOUNT_NAME_LOG_CONFIG, + VOLUME_MOUNT_PATH_CONFIG, VOLUME_MOUNT_PATH_LOG, VOLUME_MOUNT_PATH_LOG_CONFIG, }, history::{self, HistoryConfig, SparkHistoryServerContainer, v1alpha1}, listener_ext, @@ -144,8 +140,11 @@ pub enum Error { source: stackable_operator::cluster_resources::Error, }, - #[snafu(display("product config validation failed"))] - ProductConfigValidation { source: crate::crd::history::Error }, + #[snafu(display("failed to dereference SparkHistoryServer"))] + DereferenceSparkHistoryServer { source: dereference::Error }, + + #[snafu(display("failed to validate SparkHistoryServer"))] + ValidateSparkHistoryServer { source: validate::Error }, #[snafu(display("failed to resolve and merge config for role and role group"))] FailedToResolveConfig { source: crate::crd::history::Error }, @@ -238,11 +237,6 @@ pub enum Error { source: stackable_operator::cluster_resources::Error, }, - #[snafu(display("failed to resolve product image"))] - ResolveProductImage { - source: product_image_selection::Error, - }, - #[snafu(display("failed to resolve product image"))] BuildMetricsService { source: service::Error }, } @@ -267,6 +261,18 @@ pub async fn reconcile( let client = &ctx.client; + let dereferenced = dereference::dereference(client, shs) + .await + .context(DereferenceSparkHistoryServerSnafu)?; + + let validated = validate::validate( + shs, + dereferenced, + &ctx.operator_environment, + &ctx.product_config, + ) + .context(ValidateSparkHistoryServerSnafu)?; + let mut cluster_resources = ClusterResources::new( HISTORY_APP_NAME, OPERATOR_NAME, @@ -277,22 +283,8 @@ pub async fn reconcile( ) .context(CreateClusterResourcesSnafu)?; - let resolved_product_image = shs - .spec - .image - .resolve( - CONTAINER_IMAGE_BASE_NAME, - &ctx.operator_environment.image_repository, - crate::built_info::PKG_VERSION, - ) - .context(ResolveProductImageSnafu)?; - let log_dir = ResolvedLogDir::resolve( - &shs.spec.log_file_directory, - shs.metadata.namespace.clone(), - client, - ) - .await - .context(LogDirSnafu)?; + let resolved_product_image = &validated.resolved_product_image; + let log_dir = &validated.dereferenced.log_dir; // Use a dedicated service account for history server pods. let (service_account, role_binding) = build_rbac_resources( @@ -313,11 +305,7 @@ pub async fn reconcile( .context(ApplyRoleBindingSnafu)?; // The role_name is always HISTORY_ROLE_NAME - for (role_name, role_config) in shs - .validated_role_config(&resolved_product_image, &ctx.product_config) - .context(ProductConfigValidationSnafu)? - .iter() - { + for (role_name, role_config) in validated.product_config.iter() { for (rolegroup_name, rolegroup_config) in role_config.iter() { let rgr = RoleGroupRef { cluster: ObjectRef::from_obj(shs), @@ -335,18 +323,17 @@ pub async fn reconcile( &merged_config, &resolved_product_image.app_version_label_value, &rgr, - &log_dir, + log_dir, )?; - let metrics_service = - build_rolegroup_metrics_service(shs, &resolved_product_image, &rgr) - .context(BuildMetricsServiceSnafu)?; + let metrics_service = build_rolegroup_metrics_service(shs, resolved_product_image, &rgr) + .context(BuildMetricsServiceSnafu)?; let sts = build_stateful_set( shs, - &resolved_product_image, + resolved_product_image, &rgr, - &log_dir, + log_dir, &merged_config, &service_account, )?; @@ -367,7 +354,7 @@ pub async fn reconcile( let rg_group_listener = build_group_listener( shs, - &resolved_product_image, + resolved_product_image, role_name, shs.node_listener_class().to_string(), )?; From 5baf8ff93f1a6d83b75e294b749f0be907956752 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 13:39:33 +0200 Subject: [PATCH 10/25] refactor(connect): scaffold dereference and validate submodules Co-Authored-By: Claude Opus 4.7 (1M context) --- rust/operator-binary/src/connect/controller.rs | 3 +++ .../operator-binary/src/connect/controller/dereference.rs | 8 ++++++++ rust/operator-binary/src/connect/controller/validate.rs | 8 ++++++++ 3 files changed, 19 insertions(+) create mode 100644 rust/operator-binary/src/connect/controller/dereference.rs create mode 100644 rust/operator-binary/src/connect/controller/validate.rs diff --git a/rust/operator-binary/src/connect/controller.rs b/rust/operator-binary/src/connect/controller.rs index 28f2732e..36a7bcb8 100644 --- a/rust/operator-binary/src/connect/controller.rs +++ b/rust/operator-binary/src/connect/controller.rs @@ -25,6 +25,9 @@ use crate::{ crd::constants::{CONTAINER_IMAGE_BASE_NAME, OPERATOR_NAME}, }; +pub mod dereference; +pub mod validate; + #[derive(Snafu, Debug, EnumDiscriminants)] #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] diff --git a/rust/operator-binary/src/connect/controller/dereference.rs b/rust/operator-binary/src/connect/controller/dereference.rs new file mode 100644 index 00000000..adff2908 --- /dev/null +++ b/rust/operator-binary/src/connect/controller/dereference.rs @@ -0,0 +1,8 @@ +//! The dereference step in the SparkConnectServer controller. + +use snafu::Snafu; + +#[derive(Snafu, Debug)] +pub enum Error {} + +type Result = std::result::Result; diff --git a/rust/operator-binary/src/connect/controller/validate.rs b/rust/operator-binary/src/connect/controller/validate.rs new file mode 100644 index 00000000..68c383ea --- /dev/null +++ b/rust/operator-binary/src/connect/controller/validate.rs @@ -0,0 +1,8 @@ +//! The validate step in the SparkConnectServer controller. + +use snafu::Snafu; + +#[derive(Snafu, Debug)] +pub enum Error {} + +type Result = std::result::Result; From 82d495c9c5ab50cdf30c7433c5c3f2ef1481bc3e Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 13:41:35 +0200 Subject: [PATCH 11/25] refactor(connect): add dereference module with S3 connection fetch Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/connect/controller/dereference.rs | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/rust/operator-binary/src/connect/controller/dereference.rs b/rust/operator-binary/src/connect/controller/dereference.rs index adff2908..32f99414 100644 --- a/rust/operator-binary/src/connect/controller/dereference.rs +++ b/rust/operator-binary/src/connect/controller/dereference.rs @@ -1,8 +1,33 @@ //! The dereference step in the SparkConnectServer controller. +//! +//! Fetches the resolved S3 configuration referenced by the SparkConnectServer spec. -use snafu::Snafu; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{client::Client, kube::ResourceExt}; + +use crate::connect::{crd::v1alpha1, s3}; #[derive(Snafu, Debug)] -pub enum Error {} +pub enum Error { + #[snafu(display("failed to resolve S3 connection for {name}"))] + ResolveS3Connections { source: s3::Error, name: String }, +} type Result = std::result::Result; + +pub struct DereferencedSparkConnectServer { + pub resolved_s3: s3::ResolvedS3, +} + +pub async fn dereference( + client: &Client, + scs: &v1alpha1::SparkConnectServer, +) -> Result { + let resolved_s3 = s3::ResolvedS3::resolve(client, scs) + .await + .with_context(|_| ResolveS3ConnectionsSnafu { + name: scs.name_unchecked(), + })?; + + Ok(DereferencedSparkConnectServer { resolved_s3 }) +} From f8f8a673b6b846546e1b6533c48625cd5e456427 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 13:47:54 +0200 Subject: [PATCH 12/25] refactor(connect): add validate module with image + role config resolution Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/connect/controller/validate.rs | 62 ++++++++++++++++++- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/rust/operator-binary/src/connect/controller/validate.rs b/rust/operator-binary/src/connect/controller/validate.rs index 68c383ea..c009316d 100644 --- a/rust/operator-binary/src/connect/controller/validate.rs +++ b/rust/operator-binary/src/connect/controller/validate.rs @@ -1,8 +1,66 @@ //! The validate step in the SparkConnectServer controller. +//! +//! Resolves the product image and the server/executor configs. Does not touch K8s. -use snafu::Snafu; +use snafu::{ResultExt, Snafu}; +use stackable_operator::{ + cli::OperatorEnvironmentOptions, + commons::product_image_selection::{self, ResolvedProductImage}, +}; + +use crate::{ + connect::{ + controller::dereference::DereferencedSparkConnectServer, + crd::{self, v1alpha1}, + }, + crd::constants::CONTAINER_IMAGE_BASE_NAME, +}; #[derive(Snafu, Debug)] -pub enum Error {} +pub enum Error { + #[snafu(display("failed to resolve product image"))] + ResolveProductImage { + source: product_image_selection::Error, + }, + + #[snafu(display("failed to resolve server config"))] + ServerConfig { source: crd::Error }, + + #[snafu(display("failed to resolve executor config"))] + ExecutorConfig { source: crd::Error }, +} type Result = std::result::Result; + +pub struct ValidatedSparkConnectServer { + pub dereferenced: DereferencedSparkConnectServer, + pub resolved_product_image: ResolvedProductImage, + pub server_config: v1alpha1::ServerConfig, + pub executor_config: v1alpha1::ExecutorConfig, +} + +pub fn validate( + scs: &v1alpha1::SparkConnectServer, + dereferenced: DereferencedSparkConnectServer, + operator_environment: &OperatorEnvironmentOptions, +) -> Result { + let resolved_product_image = scs + .spec + .image + .resolve( + CONTAINER_IMAGE_BASE_NAME, + &operator_environment.image_repository, + crate::built_info::PKG_VERSION, + ) + .context(ResolveProductImageSnafu)?; + + let server_config = scs.server_config().context(ServerConfigSnafu)?; + let executor_config = scs.executor_config().context(ExecutorConfigSnafu)?; + + Ok(ValidatedSparkConnectServer { + dereferenced, + resolved_product_image, + server_config, + executor_config, + }) +} From 99b4a64b3f6b92a1d1138040cfe677e240728826 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 13:59:16 +0200 Subject: [PATCH 13/25] refactor(connect): wire reconcile through dereference + validate Co-Authored-By: Claude Opus 4.7 (1M context) --- .../operator-binary/src/connect/controller.rs | 86 ++++++++----------- 1 file changed, 36 insertions(+), 50 deletions(-) diff --git a/rust/operator-binary/src/connect/controller.rs b/rust/operator-binary/src/connect/controller.rs index 36a7bcb8..6b578181 100644 --- a/rust/operator-binary/src/connect/controller.rs +++ b/rust/operator-binary/src/connect/controller.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use snafu::{ResultExt, Snafu}; use stackable_operator::{ cluster_resources::{ClusterResourceApplyStrategy, ClusterResources}, - commons::{product_image_selection, rbac::build_rbac_resources}, + commons::rbac::build_rbac_resources, kube::{ Resource, ResourceExt, core::{DeserializeGuard, error_boundary}, @@ -21,8 +21,8 @@ use strum::{EnumDiscriminants, IntoStaticStr}; use super::crd::{CONNECT_APP_NAME, CONNECT_CONTROLLER_NAME, v1alpha1}; use crate::{ Ctx, - connect::{common, crd::SparkConnectServerStatus, executor, s3, server, service}, - crd::constants::{CONTAINER_IMAGE_BASE_NAME, OPERATOR_NAME}, + connect::{common, crd::SparkConnectServerStatus, executor, server, service}, + crd::constants::OPERATOR_NAME, }; pub mod dereference; @@ -127,11 +127,6 @@ pub enum Error { BuildRbacResources { source: stackable_operator::commons::rbac::Error, }, - #[snafu(display("failed to build connect server configuration"))] - ServerConfig { source: crate::connect::crd::Error }, - - #[snafu(display("failed to build connect executor configuration"))] - ExecutorConfig { source: crate::connect::crd::Error }, #[snafu(display("failed to build connect executor pod template"))] ExecutorPodTemplate { @@ -141,16 +136,14 @@ pub enum Error { #[snafu(display("failed to serialize executor pod template"))] ExecutorPodTemplateSerde { source: serde_yaml::Error }, - #[snafu(display("failed to resolve product image"))] - ResolveProductImage { - source: product_image_selection::Error, - }, - - #[snafu(display("failed to resolve S3 connections for SparkConnectServer {name:?}"))] - ResolveS3Connections { source: s3::Error, name: String }, - #[snafu(display("failed to build connect server S3 properties"))] S3SparkProperties { source: crate::connect::s3::Error }, + + #[snafu(display("failed to dereference SparkConnectServer"))] + DereferenceSparkConnectServer { source: dereference::Error }, + + #[snafu(display("failed to validate SparkConnectServer"))] + ValidateSparkConnectServer { source: validate::Error }, } type Result = std::result::Result; @@ -173,12 +166,22 @@ pub async fn reconcile( .map_err(error_boundary::InvalidObject::clone) .context(InvalidSparkConnectServerSnafu)?; - let server_config = scs.server_config().context(ServerConfigSnafu)?; - let server_role_config = &scs.spec.server.role_config; - let executor_config = scs.executor_config().context(ExecutorConfigSnafu)?; - let client = &ctx.client; + let dereferenced = dereference::dereference(client, scs) + .await + .context(DereferenceSparkConnectServerSnafu)?; + + let validated = validate::validate(scs, dereferenced, &ctx.operator_environment) + .context(ValidateSparkConnectServerSnafu)?; + + let server_config = &validated.server_config; + let executor_config = &validated.executor_config; + let resolved_product_image = &validated.resolved_product_image; + let resolved_s3 = &validated.dereferenced.resolved_s3; + + let server_role_config = &scs.spec.server.role_config; + let mut cluster_resources = ClusterResources::new( CONNECT_APP_NAME, OPERATOR_NAME, @@ -189,23 +192,6 @@ pub async fn reconcile( ) .context(CreateClusterResourcesSnafu)?; - let resolved_product_image = scs - .spec - .image - .resolve( - CONTAINER_IMAGE_BASE_NAME, - &ctx.operator_environment.image_repository, - crate::built_info::PKG_VERSION, - ) - .context(ResolveProductImageSnafu)?; - - // Resolve any S3 connections early to fail fast if there are issues. - let resolved_s3 = s3::ResolvedS3::resolve(client, scs) - .await - .with_context(|_| ResolveS3ConnectionsSnafu { - name: scs.name_unchecked(), - })?; - // Use a dedicated service account for connect server pods. let (service_account, role_binding) = build_rbac_resources( scs, @@ -254,13 +240,13 @@ pub async fn reconcile( .context(S3SparkPropertiesSnafu)?, server::server_properties( scs, - &server_config, + server_config, &applied_headless_service, &service_account, - &resolved_product_image, + resolved_product_image, ) .context(ServerPropertiesSnafu)?, - executor::executor_properties(scs, &executor_config, &resolved_product_image) + executor::executor_properties(scs, executor_config, resolved_product_image) .context(ExecutorPropertiesSnafu)?, ]) .context(SerializePropertiesSnafu)?; @@ -268,7 +254,7 @@ pub async fn reconcile( // ======================================== // Executor config map and pod template let executor_config_map = - executor::executor_config_map(scs, &executor_config, &resolved_product_image).context( + executor::executor_config_map(scs, executor_config, resolved_product_image).context( BuildExecutorConfigMapSnafu { name: scs.name_unchecked(), }, @@ -283,10 +269,10 @@ pub async fn reconcile( let executor_pod_template = serde_yaml::to_string( &executor::executor_pod_template( scs, - &executor_config, - &resolved_product_image, + executor_config, + resolved_product_image, &executor_config_map, - &resolved_s3, + resolved_s3, ) .context(ExecutorPodTemplateSnafu)?, ) @@ -296,8 +282,8 @@ pub async fn reconcile( // Server config map let server_config_map = server::server_config_map( scs, - &server_config, - &resolved_product_image, + server_config, + resolved_product_image, &spark_props, &executor_pod_template, ) @@ -313,7 +299,7 @@ pub async fn reconcile( // ======================================== // Server listener - let listener = server::build_listener(scs, server_role_config, &resolved_product_image) + let listener = server::build_listener(scs, server_role_config, resolved_product_image) .context(BuildListenerSnafu)?; let applied_listener = cluster_resources @@ -326,13 +312,13 @@ pub async fn reconcile( let args = server::command_args(&scs.spec.args); let stateful_set = server::build_stateful_set( scs, - &server_config, - &resolved_product_image, + server_config, + resolved_product_image, &service_account, &server_config_map, &applied_listener.name_any(), args, - &resolved_s3, + resolved_s3, ) .context(BuildServerStatefulSetSnafu)?; From 8bfce40e724d2386f79226a110be5a9184d44d02 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 14:02:20 +0200 Subject: [PATCH 14/25] style: apply cargo fmt to wired controllers Co-Authored-By: Claude Opus 4.7 (1M context) --- rust/operator-binary/src/history/controller.rs | 5 +++-- rust/operator-binary/src/spark_k8s_controller.rs | 9 +++------ .../src/spark_k8s_controller/dereference.rs | 10 ++++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/rust/operator-binary/src/history/controller.rs b/rust/operator-binary/src/history/controller.rs index bf3bf163..f7be24ec 100644 --- a/rust/operator-binary/src/history/controller.rs +++ b/rust/operator-binary/src/history/controller.rs @@ -326,8 +326,9 @@ pub async fn reconcile( log_dir, )?; - let metrics_service = build_rolegroup_metrics_service(shs, resolved_product_image, &rgr) - .context(BuildMetricsServiceSnafu)?; + let metrics_service = + build_rolegroup_metrics_service(shs, resolved_product_image, &rgr) + .context(BuildMetricsServiceSnafu)?; let sts = build_stateful_set( shs, diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index 246498bb..4c428b38 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -232,12 +232,9 @@ pub async fn reconcile( .await .context(DereferenceSparkApplicationSnafu)?; - let validated = validate::validate( - dereferenced, - &ctx.operator_environment, - &ctx.product_config, - ) - .context(ValidateSparkApplicationSnafu)?; + let validated = + validate::validate(dereferenced, &ctx.operator_environment, &ctx.product_config) + .context(ValidateSparkApplicationSnafu)?; let spark_application = &validated.dereferenced.spark_application; let opt_s3conn = &validated.dereferenced.s3_connection; diff --git a/rust/operator-binary/src/spark_k8s_controller/dereference.rs b/rust/operator-binary/src/spark_k8s_controller/dereference.rs index 62e5dec7..48ad20dd 100644 --- a/rust/operator-binary/src/spark_k8s_controller/dereference.rs +++ b/rust/operator-binary/src/spark_k8s_controller/dereference.rs @@ -86,10 +86,12 @@ pub async fn dereference( if let Some(tls) = &conn.tls.tls { match &tls.verification { TlsVerification::None {} => return S3TlsNoVerificationNotSupportedSnafu.fail(), - TlsVerification::Server(server_verification) => match &server_verification.ca_cert { - CaCert::WebPki {} => {} - CaCert::SecretClass(_) => {} - }, + TlsVerification::Server(server_verification) => { + match &server_verification.ca_cert { + CaCert::WebPki {} => {} + CaCert::SecretClass(_) => {} + } + } } } } From 32ed5f051e88ea2d0bf1c767f11b0b1fd74c8e59 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 14:07:08 +0200 Subject: [PATCH 15/25] fix: correct error message --- rust/operator-binary/src/history/controller.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/operator-binary/src/history/controller.rs b/rust/operator-binary/src/history/controller.rs index f7be24ec..2fcdfe5a 100644 --- a/rust/operator-binary/src/history/controller.rs +++ b/rust/operator-binary/src/history/controller.rs @@ -237,7 +237,7 @@ pub enum Error { source: stackable_operator::cluster_resources::Error, }, - #[snafu(display("failed to resolve product image"))] + #[snafu(display("failed to build metrics service"))] BuildMetricsService { source: service::Error }, } From d731d13fa6d1db9ff96f02768590b6fc10e19c45 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 14:13:08 +0200 Subject: [PATCH 16/25] fix: remove obsolete errors --- rust/operator-binary/src/crd/logdir.rs | 6 ----- .../src/spark_k8s_controller.rs | 25 ------------------- 2 files changed, 31 deletions(-) diff --git a/rust/operator-binary/src/crd/logdir.rs b/rust/operator-binary/src/crd/logdir.rs index 65f2e936..75b3c3c3 100644 --- a/rust/operator-binary/src/crd/logdir.rs +++ b/rust/operator-binary/src/crd/logdir.rs @@ -26,15 +26,9 @@ use crate::crd::{ #[strum_discriminants(derive(IntoStaticStr))] #[allow(clippy::enum_variant_names)] pub enum Error { - #[snafu(display("missing bucket name for history logs"))] - BucketNameMissing, - #[snafu(display("tls non-verification not supported"))] S3TlsNoVerificationNotSupported, - #[snafu(display("ca-cert verification not supported"))] - S3TlsCaVerificationNotSupported, - #[snafu(display("failed to build TLS certificate SecretClass Volume"))] TlsCertSecretClassVolumeBuild { source: SecretOperatorVolumeSourceBuilderError, diff --git a/rust/operator-binary/src/spark_k8s_controller.rs b/rust/operator-binary/src/spark_k8s_controller.rs index 4c428b38..ca95acdb 100644 --- a/rust/operator-binary/src/spark_k8s_controller.rs +++ b/rust/operator-binary/src/spark_k8s_controller.rs @@ -78,9 +78,6 @@ pub enum Error { #[snafu(display("missing secret lifetime"))] MissingSecretLifetime, - #[snafu(display("object has no namespace"))] - ObjectHasNoNamespace, - #[snafu(display("object is missing metadata to build owner reference"))] ObjectMissingMetadataForOwnerRef { source: stackable_operator::builder::meta::Error, @@ -112,20 +109,9 @@ pub enum Error { #[snafu(display("pod template serialization"))] PodTemplateSerde { source: serde_yaml::Error }, - #[snafu(display("failed to configure S3 bucket"))] - ConfigureS3Bucket { - source: stackable_operator::crd::s3::v1alpha1::BucketError, - }, - - #[snafu(display("ca-cert verification not supported"))] - S3TlsCaVerificationNotSupported, - #[snafu(display("failed to resolve and merge config"))] FailedToResolveConfig { source: crate::crd::Error }, - #[snafu(display("failed to recognise the container name"))] - UnrecognisedContainerName, - #[snafu(display("illegal container name"))] IllegalContainerName { source: stackable_operator::builder::pod::container::Error, @@ -149,11 +135,6 @@ pub enum Error { role: SparkApplicationRole, }, - #[snafu(display("failed to generate product config"))] - GenerateProductConfig { - source: stackable_operator::product_config_utils::Error, - }, - #[snafu(display("invalid submit config"))] SubmitConfig { source: crate::crd::Error }, @@ -167,12 +148,6 @@ pub enum Error { source: stackable_operator::builder::meta::Error, }, - #[snafu(display("failed to get required Labels"))] - GetRequiredLabels { - source: - stackable_operator::kvp::KeyValuePairError, - }, - #[snafu(display("failed to create Volumes for SparkApplication"))] CreateVolumes { source: crate::crd::Error }, From 7cc6cd27465dcd74af298a9ba169263fa7436d37 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 15:15:35 +0200 Subject: [PATCH 17/25] fix: regenerate crate hashes --- Cargo.nix | 18 +++++++++--------- crate-hashes.json | 18 +++++++++--------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/Cargo.nix b/Cargo.nix index b3bbcbf3..b7cf9ecb 100644 --- a/Cargo.nix +++ b/Cargo.nix @@ -4823,7 +4823,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "k8s_version"; authors = [ @@ -9471,7 +9471,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_certs"; authors = [ @@ -9574,7 +9574,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_operator"; authors = [ @@ -9754,7 +9754,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; procMacro = true; libName = "stackable_operator_derive"; @@ -9789,7 +9789,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_shared"; authors = [ @@ -9980,7 +9980,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_telemetry"; authors = [ @@ -10090,7 +10090,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_versioned"; authors = [ @@ -10140,7 +10140,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; procMacro = true; libName = "stackable_versioned_macros"; @@ -10208,7 +10208,7 @@ rec { src = pkgs.fetchgit { url = "https://github.com/stackabletech/operator-rs.git"; rev = "7a5f0c3fbcd091340214a23f0607fcd4b4fcc152"; - sha256 = "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by"; + sha256 = "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk"; }; libName = "stackable_webhook"; authors = [ diff --git a/crate-hashes.json b/crate-hashes.json index a6396ca0..86f2b840 100644 --- a/crate-hashes.json +++ b/crate-hashes.json @@ -1,12 +1,12 @@ { - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#k8s-version@0.1.3": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-certs@0.4.0": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-operator-derive@0.3.1": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-operator@0.111.1": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-shared@0.1.0": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-telemetry@0.6.3": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-versioned-macros@0.10.0": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-versioned@0.10.0": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", - "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-webhook@0.9.1": "0d58yvxvy8hbai12bjhcyvh4zw182j5dsfyqja4k2xc1vzjy29by", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#k8s-version@0.1.3": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-certs@0.4.0": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-operator-derive@0.3.1": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-operator@0.111.1": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-shared@0.1.0": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-telemetry@0.6.3": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-versioned-macros@0.10.0": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-versioned@0.10.0": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", + "git+https://github.com/stackabletech/operator-rs.git?tag=stackable-operator-0.111.1#stackable-webhook@0.9.1": "0lj969rjbxairjglrnaq0xhabvdrq5nd6wl1i0y9pr50nhh7zvgk", "git+https://github.com/stackabletech/product-config.git?tag=0.8.0#product-config@0.8.0": "1dz70kapm2wdqcr7ndyjji0lhsl98bsq95gnb2lw487wf6yr7987" } \ No newline at end of file From 56c0802dae010fbbd7f3e2989aae4f23f223316c Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 16:11:44 +0200 Subject: [PATCH 18/25] test(spark-connect): trim 10-assert to bare CR Available wait Co-Authored-By: Claude Sonnet 4.6 --- .../kuttl/spark-connect/10-assert.yaml | 25 +++---------------- 1 file changed, 3 insertions(+), 22 deletions(-) diff --git a/tests/templates/kuttl/spark-connect/10-assert.yaml b/tests/templates/kuttl/spark-connect/10-assert.yaml index 87f18f8a..41ae37b6 100644 --- a/tests/templates/kuttl/spark-connect/10-assert.yaml +++ b/tests/templates/kuttl/spark-connect/10-assert.yaml @@ -1,25 +1,6 @@ --- apiVersion: kuttl.dev/v1beta1 kind: TestAssert -timeout: 900 ---- -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: spark-connect-server -status: - readyReplicas: 1 ---- -apiVersion: v1 -kind: Service -metadata: - name: spark-connect-server -spec: - type: NodePort ---- -apiVersion: v1 -kind: Service -metadata: - name: spark-connect-server-headless -spec: - type: ClusterIP +timeout: 300 +commands: + - script: kubectl -n $NAMESPACE wait --for=condition=Available=true sparkconnectservers.spark.stackable.tech/spark-connect --timeout=301s From 4d78418f247938bdf7f8986910c57ba42e40a68f Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 16:27:20 +0200 Subject: [PATCH 19/25] test(smoke): add SparkHistoryServer resource snapshot assert Co-Authored-By: Claude Sonnet 4.6 --- tests/templates/kuttl/smoke/42-assert.yaml | 170 +++++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 tests/templates/kuttl/smoke/42-assert.yaml diff --git a/tests/templates/kuttl/smoke/42-assert.yaml b/tests/templates/kuttl/smoke/42-assert.yaml new file mode 100644 index 00000000..707c699b --- /dev/null +++ b/tests/templates/kuttl/smoke/42-assert.yaml @@ -0,0 +1,170 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 60 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: spark-history-node-default + labels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/name: spark-history + app.kubernetes.io/role-group: default +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-history-node-default + labels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +--- +apiVersion: v1 +kind: Service +metadata: + name: spark-history-node-default-metrics + annotations: + prometheus.io/path: /metrics + prometheus.io/port: "18081" + prometheus.io/scheme: http + prometheus.io/scrape: "true" + labels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + app.kubernetes.io/role-group: default + prometheus.io/scrape: "true" + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +spec: + clusterIP: None + ports: + - name: metrics + port: 18081 + protocol: TCP + targetPort: 18081 + publishNotReadyAddresses: true + selector: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/name: spark-history + app.kubernetes.io/role-group: default + type: ClusterIP +--- +apiVersion: listeners.stackable.tech/v1alpha1 +kind: Listener +metadata: + name: spark-history-node + labels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + app.kubernetes.io/role-group: none + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +spec: + className: cluster-internal + ports: + - name: http + port: 18080 + protocol: TCP +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: spark-history-serviceaccount + labels: + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: spark-history-rolebinding + labels: + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: spark-history-clusterrole +subjects: + - kind: ServiceAccount + name: spark-history-serviceaccount +--- +apiVersion: policy/v1 +kind: PodDisruptionBudget +metadata: + name: spark-history-node + labels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/managed-by: spark.stackable.tech_history + app.kubernetes.io/name: spark-history + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkHistoryServer + name: spark-history +spec: + maxUnavailable: 1 + selector: + matchLabels: + app.kubernetes.io/component: node + app.kubernetes.io/instance: spark-history + app.kubernetes.io/name: spark-history +status: + currentHealthy: 1 + disruptionsAllowed: 1 + expectedPods: 1 From fa03bdea3188e28035996f5034ce326e7180cbd9 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 17:01:02 +0200 Subject: [PATCH 20/25] test(smoke): add SparkHistoryServer ConfigMap data snapshot Co-Authored-By: Claude Sonnet 4.6 --- tests/templates/kuttl/smoke/43-assert.yaml.j2 | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 tests/templates/kuttl/smoke/43-assert.yaml.j2 diff --git a/tests/templates/kuttl/smoke/43-assert.yaml.j2 b/tests/templates/kuttl/smoke/43-assert.yaml.j2 new file mode 100644 index 00000000..8bdacc29 --- /dev/null +++ b/tests/templates/kuttl/smoke/43-assert.yaml.j2 @@ -0,0 +1,40 @@ +--- +# Snapshot the full `.data` of each operator-managed ConfigMap. +# Any code change that alters rendered config values will fail these diffs. +# +# Runs as its own step (after 40/41/42) so kuttl does not re-evaluate the heavy +# heredoc on every 1-second readiness retry of the install step. By this point +# the cluster is in steady state, so each script runs once. +# +# The heredoc is quoted (`<<'YAMLEOF'`) so shell substitution is disabled and +# any property-style escapes survive verbatim. Both sides are normalized to +# canonical JSON via `yq -o=json` before comparison. +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 60 +commands: + - script: | + expected=$(cat <<'YAMLEOF' | yq -o=json + security.properties: "" + spark-defaults.conf: |- +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + spark.hadoop.fs.s3a.endpoint https://eventlog-minio:9000/ +{% else %} + spark.hadoop.fs.s3a.endpoint http://eventlog-minio:9000/ +{% endif %} + spark.hadoop.fs.s3a.endpoint.region us-east-1 + spark.hadoop.fs.s3a.path.style.access true + spark.history.fs.cleaner.enabled true + spark.history.fs.logDirectory s3a://spark-logs/eventlogs/ + spark-env.sh: "" + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm spark-history-node-default -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap spark-history-node-default data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi From d884f616cd9fc4dd0fa1c899e25b21a80eb6b620 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 17:19:29 +0200 Subject: [PATCH 21/25] test(smoke): add SparkApplication resource snapshot assert Drift-detect the 3 ConfigMaps, ServiceAccount, and RoleBinding that persist after SparkApplication/spark-pi-s3-1 reaches phase: Succeeded. Co-Authored-By: Claude Sonnet 4.6 --- tests/templates/kuttl/smoke/52-assert.yaml | 96 ++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 tests/templates/kuttl/smoke/52-assert.yaml diff --git a/tests/templates/kuttl/smoke/52-assert.yaml b/tests/templates/kuttl/smoke/52-assert.yaml new file mode 100644 index 00000000..8404522b --- /dev/null +++ b/tests/templates/kuttl/smoke/52-assert.yaml @@ -0,0 +1,96 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 60 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-pi-s3-1-driver-pod-template + labels: + app.kubernetes.io/component: pod-templates + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkApplication + name: spark-pi-s3-1 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-pi-s3-1-executor-pod-template + labels: + app.kubernetes.io/component: pod-templates + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkApplication + name: spark-pi-s3-1 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-pi-s3-1-submit-job + labels: + app.kubernetes.io/component: spark-submit + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkApplication + name: spark-pi-s3-1 +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: spark-pi-s3-1 + labels: + app.kubernetes.io/component: service-account + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkApplication + name: spark-pi-s3-1 +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: spark-pi-s3-1 + labels: + app.kubernetes.io/component: role-binding + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkApplication + name: spark-pi-s3-1 +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: spark-k8s-clusterrole +subjects: + - kind: ServiceAccount + name: spark-pi-s3-1 From c1af068f6c19791b38d39e59e3b61313cbce3da2 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 18:42:14 +0200 Subject: [PATCH 22/25] test(smoke): add SparkApplication ConfigMap data snapshot Co-Authored-By: Claude Sonnet 4.6 --- tests/templates/kuttl/smoke/53-assert.yaml.j2 | 405 ++++++++++++++++++ 1 file changed, 405 insertions(+) create mode 100644 tests/templates/kuttl/smoke/53-assert.yaml.j2 diff --git a/tests/templates/kuttl/smoke/53-assert.yaml.j2 b/tests/templates/kuttl/smoke/53-assert.yaml.j2 new file mode 100644 index 00000000..55a96203 --- /dev/null +++ b/tests/templates/kuttl/smoke/53-assert.yaml.j2 @@ -0,0 +1,405 @@ +--- +# Snapshot the full `.data` of each SparkApplication-owned ConfigMap. +# Any code change that alters rendered config values will fail these diffs. +# +# Runs as its own step (after 50/51/52) so kuttl does not re-evaluate the heavy +# heredoc on every 1-second readiness retry of the install step. By this point +# the cluster is in steady state, so each script runs once. +# +# The heredoc is quoted (`<<'YAMLEOF'`) so shell substitution is disabled and +# any property-style escapes survive verbatim. Both sides are normalized to +# canonical JSON via `yq -o=json` before comparison. +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 60 +commands: + - script: | + expected=$(cat <<'YAMLEOF' | yq -o=json + log4j2.properties: |- + appenders = FILE, CONSOLE + + appender.CONSOLE.type = Console + appender.CONSOLE.name = CONSOLE + appender.CONSOLE.target = SYSTEM_ERR + appender.CONSOLE.layout.type = PatternLayout + appender.CONSOLE.layout.pattern = %d{ISO8601} %p [%t] %c - %m%n + appender.CONSOLE.filter.threshold.type = ThresholdFilter + appender.CONSOLE.filter.threshold.level = INFO + + appender.FILE.type = RollingFile + appender.FILE.name = FILE + appender.FILE.fileName = /stackable/log/spark/spark.log4j2.xml + appender.FILE.filePattern = /stackable/log/spark/spark.log4j2.xml.%i + appender.FILE.layout.type = XMLLayout + appender.FILE.policies.type = Policies + appender.FILE.policies.size.type = SizeBasedTriggeringPolicy + appender.FILE.policies.size.size = 5MB + appender.FILE.strategy.type = DefaultRolloverStrategy + appender.FILE.strategy.max = 1 + appender.FILE.filter.threshold.type = ThresholdFilter + appender.FILE.filter.threshold.level = INFO + + + rootLogger.level=INFO + rootLogger.appenderRefs = CONSOLE, FILE + rootLogger.appenderRef.CONSOLE.ref = CONSOLE + rootLogger.appenderRef.FILE.ref = FILE + security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 + spark-env.sh: "" + template.yaml: | + metadata: + labels: + app.kubernetes.io/component: spark + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + app.kubernetes.io/version: 4.1.1-stackable0.0.0-dev + prometheus.io/scrape: 'true' + stackable.tech/vendor: Stackable + name: spark + spec: + affinity: {} + containers: + - env: + - name: CONTAINERDEBUG_LOG_DIRECTORY + value: /stackable/log/containerdebug +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - name: STACKABLE_TLS_STORE_PASSWORD + value: changeit +{% endif %} + - name: _STACKABLE_PRE_HOOK + value: containerdebug --output=/stackable/log/containerdebug-state.json --loop & + image: oci.stackable.tech/sdp/spark-k8s:4.1.1-stackable0.0.0-dev + imagePullPolicy: IfNotPresent + name: spark + resources: + limits: + cpu: '1' + memory: 1Gi + requests: + cpu: 250m + memory: 1Gi + volumeMounts: + - mountPath: /stackable/secrets/s3-credentials-class + name: s3-credentials-class + - mountPath: /stackable/secrets/history-credentials-class + name: history-credentials-class + - mountPath: /stackable/log_config + name: log-config + - mountPath: /stackable/log + name: log +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - mountPath: /stackable/truststore + name: stackable-truststore + - mountPath: /stackable/mount_server_tls/minio-tls-eventlog + name: minio-tls-eventlog +{% endif %} + enableServiceLinks: false +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + initContainers: + - args: + - |- + cert-tools generate-pkcs12-truststore --pem /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem --out /stackable/truststore/truststore.p12 --out-password changeit + cert-tools generate-pkcs12-truststore --pkcs12 /stackable/truststore/truststore.p12:changeit --pkcs12 /stackable/mount_server_tls/minio-tls-eventlog/truststore.p12 --out /stackable/truststore/truststore.p12 --out-password changeit + command: + - /bin/bash + - -x + - -euo + - pipefail + - -c + image: oci.stackable.tech/sdp/spark-k8s:4.1.1-stackable0.0.0-dev + name: tls + resources: + limits: + cpu: 1000m + memory: 1024Mi + requests: + cpu: 250m + memory: 1024Mi + volumeMounts: + - mountPath: /stackable/mount_server_tls/minio-tls-eventlog + name: minio-tls-eventlog + - mountPath: /stackable/truststore + name: stackable-truststore +{% endif %} + securityContext: + fsGroup: 1000 + serviceAccountName: spark-pi-s3-1 + volumes: + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: history-credentials-class + secrets.stackable.tech/provision-parts: public-private + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: history-credentials-class + - emptyDir: + sizeLimit: 39Mi + name: log + - configMap: + name: spark-pi-s3-1-driver-pod-template + name: log-config +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/backend.autotls.cert.lifetime: 1d + secrets.stackable.tech/class: minio-tls-eventlog + secrets.stackable.tech/format: tls-pkcs12 + secrets.stackable.tech/provision-parts: public + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: minio-tls-eventlog +{% endif %} + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: s3-credentials-class + secrets.stackable.tech/provision-parts: public-private + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: s3-credentials-class +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - emptyDir: + sizeLimit: 5Mi + name: stackable-truststore +{% endif %} + - configMap: + name: spark-pi-s3-1-driver-pod-template + name: config + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm spark-pi-s3-1-driver-pod-template -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap spark-pi-s3-1-driver-pod-template data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi + - script: | + expected=$(cat <<'YAMLEOF' | yq -o=json + log4j2.properties: |- + appenders = FILE, CONSOLE + + appender.CONSOLE.type = Console + appender.CONSOLE.name = CONSOLE + appender.CONSOLE.target = SYSTEM_ERR + appender.CONSOLE.layout.type = PatternLayout + appender.CONSOLE.layout.pattern = %d{ISO8601} %p [%t] %c - %m%n + appender.CONSOLE.filter.threshold.type = ThresholdFilter + appender.CONSOLE.filter.threshold.level = INFO + + appender.FILE.type = RollingFile + appender.FILE.name = FILE + appender.FILE.fileName = /stackable/log/spark/spark.log4j2.xml + appender.FILE.filePattern = /stackable/log/spark/spark.log4j2.xml.%i + appender.FILE.layout.type = XMLLayout + appender.FILE.policies.type = Policies + appender.FILE.policies.size.type = SizeBasedTriggeringPolicy + appender.FILE.policies.size.size = 5MB + appender.FILE.strategy.type = DefaultRolloverStrategy + appender.FILE.strategy.max = 1 + appender.FILE.filter.threshold.type = ThresholdFilter + appender.FILE.filter.threshold.level = INFO + + + rootLogger.level=INFO + rootLogger.appenderRefs = CONSOLE, FILE + rootLogger.appenderRef.CONSOLE.ref = CONSOLE + rootLogger.appenderRef.FILE.ref = FILE + security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 + spark-env.sh: "" + template.yaml: | + metadata: + labels: + app.kubernetes.io/component: spark + app.kubernetes.io/instance: spark-pi-s3-1 + app.kubernetes.io/managed-by: spark.stackable.tech_sparkapplication + app.kubernetes.io/name: spark-k8s + app.kubernetes.io/role-group: sparkapplication + app.kubernetes.io/version: 4.1.1-stackable0.0.0-dev + stackable.tech/vendor: Stackable + name: spark + spec: + affinity: {} + containers: + - env: + - name: CONTAINERDEBUG_LOG_DIRECTORY + value: /stackable/log/containerdebug +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - name: STACKABLE_TLS_STORE_PASSWORD + value: changeit +{% endif %} + - name: _STACKABLE_PRE_HOOK + value: containerdebug --output=/stackable/log/containerdebug-state.json --loop & + image: oci.stackable.tech/sdp/spark-k8s:4.1.1-stackable0.0.0-dev + imagePullPolicy: IfNotPresent + name: spark + resources: + limits: + cpu: '1' + memory: 1Gi + requests: + cpu: 250m + memory: 1Gi + volumeMounts: + - mountPath: /stackable/secrets/s3-credentials-class + name: s3-credentials-class + - mountPath: /stackable/secrets/history-credentials-class + name: history-credentials-class + - mountPath: /stackable/log_config + name: log-config + - mountPath: /stackable/log + name: log +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - mountPath: /stackable/truststore + name: stackable-truststore + - mountPath: /stackable/mount_server_tls/minio-tls-eventlog + name: minio-tls-eventlog +{% endif %} + enableServiceLinks: false +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + initContainers: + - args: + - |- + cert-tools generate-pkcs12-truststore --pem /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem --out /stackable/truststore/truststore.p12 --out-password changeit + cert-tools generate-pkcs12-truststore --pkcs12 /stackable/truststore/truststore.p12:changeit --pkcs12 /stackable/mount_server_tls/minio-tls-eventlog/truststore.p12 --out /stackable/truststore/truststore.p12 --out-password changeit + command: + - /bin/bash + - -x + - -euo + - pipefail + - -c + image: oci.stackable.tech/sdp/spark-k8s:4.1.1-stackable0.0.0-dev + name: tls + resources: + limits: + cpu: 1000m + memory: 1024Mi + requests: + cpu: 250m + memory: 1024Mi + volumeMounts: + - mountPath: /stackable/mount_server_tls/minio-tls-eventlog + name: minio-tls-eventlog + - mountPath: /stackable/truststore + name: stackable-truststore +{% endif %} + securityContext: + fsGroup: 1000 + serviceAccountName: spark-pi-s3-1 + volumes: + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: history-credentials-class + secrets.stackable.tech/provision-parts: public-private + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: history-credentials-class + - emptyDir: + sizeLimit: 39Mi + name: log + - configMap: + name: spark-pi-s3-1-executor-pod-template + name: log-config +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/backend.autotls.cert.lifetime: 1d + secrets.stackable.tech/class: minio-tls-eventlog + secrets.stackable.tech/format: tls-pkcs12 + secrets.stackable.tech/provision-parts: public + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: minio-tls-eventlog +{% endif %} + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: s3-credentials-class + secrets.stackable.tech/provision-parts: public-private + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: s3-credentials-class +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - emptyDir: + sizeLimit: 5Mi + name: stackable-truststore +{% endif %} + - configMap: + name: spark-pi-s3-1-executor-pod-template + name: config + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm spark-pi-s3-1-executor-pod-template -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap spark-pi-s3-1-executor-pod-template data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi + - script: | + expected=$(cat <<'YAMLEOF' | yq -o=json + security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 + spark-env.sh: "" + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm spark-pi-s3-1-submit-job -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap spark-pi-s3-1-submit-job data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi From ecae1c44095723235477e661f0721813a7c3df47 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 19:28:19 +0200 Subject: [PATCH 23/25] test(spark-connect): add SparkConnectServer resource snapshot assert Co-Authored-By: Claude Opus 4.7 (1M context) --- .../kuttl/spark-connect/12-assert.yaml | 197 ++++++++++++++++++ 1 file changed, 197 insertions(+) create mode 100644 tests/templates/kuttl/spark-connect/12-assert.yaml diff --git a/tests/templates/kuttl/spark-connect/12-assert.yaml b/tests/templates/kuttl/spark-connect/12-assert.yaml new file mode 100644 index 00000000..d25e9b1b --- /dev/null +++ b/tests/templates/kuttl/spark-connect/12-assert.yaml @@ -0,0 +1,197 @@ +--- +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 60 +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: spark-connect-server + labels: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +spec: + replicas: 1 + selector: + matchLabels: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default +status: + readyReplicas: 1 + replicas: 1 +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-connect-server + labels: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: spark-connect-executor + labels: + app.kubernetes.io/component: executor + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +--- +apiVersion: v1 +kind: Service +metadata: + name: spark-connect-server-headless + labels: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +spec: + clusterIP: None + ports: + - name: grpc + port: 15002 + protocol: TCP + targetPort: 15002 + - name: http + port: 4040 + protocol: TCP + targetPort: 4040 + publishNotReadyAddresses: true + selector: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/name: spark-connect + type: ClusterIP +--- +apiVersion: v1 +kind: Service +metadata: + name: spark-connect-server-metrics + annotations: + prometheus.io/path: /metrics/prometheus + prometheus.io/port: "4040" + prometheus.io/scheme: http + prometheus.io/scrape: "true" + labels: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + prometheus.io/scrape: "true" + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +spec: + clusterIP: None + ports: + - name: metrics + port: 4040 + protocol: TCP + targetPort: 4040 + publishNotReadyAddresses: true + selector: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/name: spark-connect + type: ClusterIP +--- +apiVersion: listeners.stackable.tech/v1alpha1 +kind: Listener +metadata: + name: spark-connect-server + labels: + app.kubernetes.io/component: server + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + stackable.tech/vendor: Stackable + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +spec: + className: external-unstable + ports: + - name: grpc + port: 15002 + protocol: TCP + - name: http + port: 4040 + protocol: TCP +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: spark-connect-serviceaccount + labels: + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: spark-connect-rolebinding + labels: + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + ownerReferences: + - apiVersion: spark.stackable.tech/v1alpha1 + controller: true + kind: SparkConnectServer + name: spark-connect +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: spark-connect-clusterrole +subjects: + - kind: ServiceAccount + name: spark-connect-serviceaccount From f335abbeb06921c421bdcac5f4b3adcd06103357 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Wed, 20 May 2026 19:42:55 +0200 Subject: [PATCH 24/25] test(spark-connect): add SparkConnectServer ConfigMap data snapshot Snapshot the full `.data` of spark-connect-server and spark-connect-executor ConfigMaps. Uses `.yaml.j2` because spark-defaults.conf and template.yaml differ between s3-use-tls dimensions (https vs http, extraJavaOptions, initContainers, minio-tls-ca volume). The spark.kubernetes.namespace line is substituted at runtime via sed/__NAMESPACE__ to survive randomised kuttl namespaces. Co-Authored-By: Claude Sonnet 4.6 --- .../kuttl/spark-connect/13-assert.yaml.j2 | 213 ++++++++++++++++++ 1 file changed, 213 insertions(+) create mode 100644 tests/templates/kuttl/spark-connect/13-assert.yaml.j2 diff --git a/tests/templates/kuttl/spark-connect/13-assert.yaml.j2 b/tests/templates/kuttl/spark-connect/13-assert.yaml.j2 new file mode 100644 index 00000000..cd7dae3e --- /dev/null +++ b/tests/templates/kuttl/spark-connect/13-assert.yaml.j2 @@ -0,0 +1,213 @@ +--- +# Snapshot the full `.data` of each operator-managed ConfigMap. +# Any code change that alters rendered config values will fail these diffs. +# +# Runs as its own step (after 10/11/12) so kuttl does not re-evaluate the heavy +# heredoc on every 1-second readiness retry of the install step. +# +# The heredoc is quoted (`<<'YAMLEOF'`) so shell substitution is disabled and +# property-style escapes (`\:`, `\=`) survive verbatim. Only `__NAMESPACE__` is +# substituted afterwards via `sed`, because kuttl tests run in a randomized +# namespace per invocation. Both sides are normalized to canonical JSON via +# `yq -o=json` before comparison. +apiVersion: kuttl.dev/v1beta1 +kind: TestAssert +timeout: 60 +commands: + - script: | + expected=$(cat <<'YAMLEOF' | sed "s|__NAMESPACE__|$NAMESPACE|g" | yq -o=json + metrics.properties: | + *.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet + *.sink.prometheusServlet.path=/metrics/prometheus + security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 + spark-defaults.conf: | + spark.driver.defaultJavaOptions=-Djava.security.properties\=/stackable/spark/conf/security.properties\ -Dlog4j.configurationFile\=/stackable/log_config/log4j2.properties\ -Dmy.custom.jvm.arg\=customValue + spark.driver.extraClassPath=/stackable/spark/extra-jars/*\:/stackable/spark/connect/spark-connect-4.1.1.jar +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore\=/stackable/truststore/truststore.p12\ -Djavax.net.ssl.trustStorePassword\=changeit\ -Djavax.net.ssl.trustStoreType\=pkcs12 +{% endif %} + spark.driver.host=spark-connect-server-headless + spark.executor.defaultJavaOptions=-Djava.security.properties\=/stackable/spark/conf/security.properties\ -Dlog4j.configurationFile\=/stackable/log_config/log4j2.properties +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore\=/stackable/truststore/truststore.p12\ -Djavax.net.ssl.trustStorePassword\=changeit\ -Djavax.net.ssl.trustStoreType\=pkcs12 +{% endif %} + spark.executor.instances=1 + spark.executor.memory=1024M + spark.executor.memoryOverhead=1m + spark.hadoop.fs.s3a.access.key=${file\:UTF-8\:/stackable/secrets/minio-credentials-class/accessKey} + spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + spark.hadoop.fs.s3a.bucket.ingest-bucket.access.key=${file\:UTF-8\:/stackable/secrets/minio-credentials-class/accessKey} + spark.hadoop.fs.s3a.bucket.ingest-bucket.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + spark.hadoop.fs.s3a.bucket.ingest-bucket.endpoint=https\://minio\:9000/ +{% else %} + spark.hadoop.fs.s3a.bucket.ingest-bucket.endpoint=http\://minio\:9000/ +{% endif %} + spark.hadoop.fs.s3a.bucket.ingest-bucket.endpoint.region=us-east-1 + spark.hadoop.fs.s3a.bucket.ingest-bucket.path.style.access=true + spark.hadoop.fs.s3a.bucket.ingest-bucket.secret.key=${file\:UTF-8\:/stackable/secrets/minio-credentials-class/secretKey} +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + spark.hadoop.fs.s3a.endpoint=https\://minio\:9000/ +{% else %} + spark.hadoop.fs.s3a.endpoint=http\://minio\:9000/ +{% endif %} + spark.hadoop.fs.s3a.endpoint.region=us-east-1 + spark.hadoop.fs.s3a.path.style.access=true + spark.hadoop.fs.s3a.secret.key=${file\:UTF-8\:/stackable/secrets/minio-credentials-class/secretKey} + spark.jars.ivy=/tmp/ivy2 + spark.kubernetes.authenticate.driver.serviceAccountName=spark-connect-serviceaccount + spark.kubernetes.driver.container.image=oci.stackable.tech/sdp/spark-k8s\:4.1.1-stackable0.0.0-dev + spark.kubernetes.driver.pod.name=${env\:HOSTNAME} + spark.kubernetes.executor.container.image=oci.stackable.tech/sdp/spark-k8s\:4.1.1-stackable0.0.0-dev + spark.kubernetes.executor.limit.cores=1 + spark.kubernetes.executor.podTemplateContainerName=spark + spark.kubernetes.executor.podTemplateFile=/stackable/spark/conf/template.yaml + spark.kubernetes.executor.request.cores=1 + spark.kubernetes.namespace=__NAMESPACE__ + spark.metrics.conf=/stackable/spark/conf/metrics.properties + spark.ui.prometheus.enabled=true + template.yaml: | + metadata: + labels: + app.kubernetes.io/component: executor + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/managed-by: spark.stackable.tech_connect + app.kubernetes.io/name: spark-connect + app.kubernetes.io/role-group: default + app.kubernetes.io/version: 4.1.1-stackable0.0.0-dev + stackable.tech/vendor: Stackable + spec: + affinity: + podAntiAffinity: + preferredDuringSchedulingIgnoredDuringExecution: + - podAffinityTerm: + labelSelector: + matchLabels: + app.kubernetes.io/component: executor + app.kubernetes.io/instance: spark-connect + app.kubernetes.io/name: spark-connect + topologyKey: kubernetes.io/hostname + weight: 70 + containers: + - env: + - name: CONTAINERDEBUG_LOG_DIRECTORY + value: /stackable/log/containerdebug + name: spark + volumeMounts: + - mountPath: /stackable/spark/conf + name: config + - mountPath: /stackable/log + name: log + - mountPath: /stackable/secrets/minio-credentials-class + name: minio-credentials-class-s3-credentials +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - mountPath: /stackable/secrets/minio-tls-ca + name: minio-tls-ca-ca-cert +{% endif %} + - mountPath: /stackable/truststore + name: stackable-truststore + - mountPath: /stackable/log_config + name: log-config + enableServiceLinks: false +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + initContainers: + - command: + - /bin/bash + - -x + - -euo + - pipefail + - -c + - cert-tools generate-pkcs12-truststore --pem /etc/pki/ca-trust/extracted/pem/tls-ca-bundle.pem --out /stackable/truststore/truststore.p12 --out-password changeit && cert-tools generate-pkcs12-truststore --out /stackable/truststore/truststore.p12 --out-password changeit --pkcs12 /stackable/truststore/truststore.p12:changeit --pem /stackable/secrets/minio-tls-ca/ca.crt + image: oci.stackable.tech/sdp/spark-k8s:4.1.1-stackable0.0.0-dev + name: tls-truststore-init + resources: + limits: + cpu: 10m + memory: 128Mi + requests: + cpu: 10m + memory: 128Mi + volumeMounts: + - mountPath: /stackable/secrets/minio-credentials-class + name: minio-credentials-class-s3-credentials + - mountPath: /stackable/secrets/minio-tls-ca + name: minio-tls-ca-ca-cert + - mountPath: /stackable/truststore + name: stackable-truststore +{% endif %} + securityContext: + fsGroup: 1000 + volumes: + - emptyDir: + sizeLimit: 30Mi + name: log + - configMap: + name: spark-connect-executor + name: config + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: minio-credentials-class + secrets.stackable.tech/provision-parts: public-private + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: minio-credentials-class-s3-credentials +{% if test_scenario['values']['s3-use-tls'] == 'true' %} + - ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/class: minio-tls-ca + secrets.stackable.tech/provision-parts: public + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: '1' + storageClassName: secrets.stackable.tech + name: minio-tls-ca-ca-cert +{% endif %} + - emptyDir: {} + name: stackable-truststore + - configMap: + name: spark-connect-log-config + name: log-config + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm spark-connect-server -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap spark-connect-server data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi + - script: | + expected=$(cat <<'YAMLEOF' | yq -o=json + metrics.properties: | + *.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet + *.sink.prometheusServlet.path=/metrics/prometheus + security.properties: | + networkaddress.cache.negative.ttl=0 + networkaddress.cache.ttl=30 + YAMLEOF + ) + actual=$(kubectl -n $NAMESPACE get cm spark-connect-executor -o yaml | yq -o=json '.data') + if [ "$expected" != "$actual" ]; then + echo "ERROR: ConfigMap spark-connect-executor data drifted from snapshot." + echo "=== expected ===" + printf '%s\n' "$expected" + echo "=== actual ===" + printf '%s\n' "$actual" + exit 1 + fi From 3b68faee5c8ee32fd5f22d1c7b37dd169b16c2b6 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Thu, 21 May 2026 10:47:49 +0200 Subject: [PATCH 25/25] docs: adapt changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c176b375..8a83b3d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ All notable changes to this project will be documented in this file. - `SparkApplication`: `spark-env.sh` and `security.properties` - `SparkHistoryServer`: `spark-defaults.conf`, `spark-env.sh` and `security.properties` - `SparkConnectServer`: `spark-defaults.conf`, `metrics.properties` and `security.properties` +- Internal operator refactoring: introduce dereference() and validate() steps in the reconciler for spark application, spark connect and spark history server([#687]). Previously, arbitrary file names were silently accepted and ignored ([#679]). - Bump `stackable-operator` to 0.110.1 ([#679]). @@ -27,6 +28,7 @@ All notable changes to this project will be documented in this file. [#679]: https://github.com/stackabletech/spark-k8s-operator/pull/679 [#680]: https://github.com/stackabletech/spark-k8s-operator/pull/680 [#684]: https://github.com/stackabletech/spark-k8s-operator/pull/684 +[#687]: https://github.com/stackabletech/spark-k8s-operator/pull/687 ## [26.3.0] - 2026-03-16